2
0
mirror of https://github.com/fhem/fhem-mirror.git synced 2025-04-06 12:18:46 +00:00

improve socket handling, introduced verbose modes, changed thread handling

git-svn-id: https://svn.fhem.de/fhem/trunk@4022 2b470e98-0d58-463d-a4d8-8e2adae1ed80
This commit is contained in:
markusbloch 2013-10-08 18:40:29 +00:00
parent 16d796eb79
commit 4f59238494
2 changed files with 145 additions and 86 deletions

View File

@ -218,6 +218,8 @@ my $log_queue = Thread::Queue->new();
my $logline;
print timestamp()."finished initialization. entering main loop\n" if($opt_v >= 2);
while(1)
{
@ -237,7 +239,8 @@ while(1)
while($status_queue->pending)
{
($uuid,$room,$value,$name) = split(";", $status_queue->dequeue);
print timestamp()."processing state message for device ".(defined($name)?$name." ":"")."in room $room (UUID: $uuid)\n" if($opt_v >=2);
if(not $value =~ /^(absence|present)$/)
{
@ -478,7 +481,7 @@ while(1)
# in case we have received a process signal, remove the pid file and shutdown
if(defined($sig_received))
{
print timestamp()."Caught $sig_received exiting\n" if($opt_v);
print "\r".timestamp()."Caught $sig_received exiting\n" if($opt_v);
unlink($opt_P);
print timestamp()."removed PID-File $opt_P\n" if($opt_v);
print timestamp()."server shutdown\n" if($opt_v);
@ -486,7 +489,11 @@ while(1)
}
}
print timestamp()."leaving main loop\n" if($opt_v >= 2);
########################################################################################################################
#
# Subroutine definitions
@ -548,18 +555,17 @@ sub doQuery($$$)
my $client_socket = undef;
# if the thread gets a termination signal, the thread must be shutdown by itself
local $SIG{TERM} = sub {
$log_queue->enqueue("$room (".threads->tid()."): terminating thread ".threads->tid()." for $address\n");
$client_socket->send("stop\n") if($client_socket);
sleep 2;
$log_queue->enqueue("$room (Thread No. ".threads->tid()."): terminating thread ".threads->tid()." for $address\n");
$client->shutdown() if(defined($client));
$selector->remove($client_socket) if(defined($selector));
close($client_socket) if($client_socket);
close($client_socket) if(defined($client_socket));
$client_socket = undef;
$log_queue->enqueue("$room (".threads->tid()."): exit thread ".threads->tid()."\n");
$log_queue->enqueue("$room (Thread No. ".threads->tid()."): exit thread ".threads->tid()."\n");
threads->exit();
};
local $SIG{HUP} = sub {
$client_socket->send("now\n") if($client_socket);
$client_socket->send("now\n") if(defined($client_socket));
};
@ -580,7 +586,7 @@ sub doQuery($$$)
$selector = IO::Select->new($client_socket);
if($client_socket)
if(defined($client_socket))
{
# send the given address to the presence daemon
@ -603,6 +609,7 @@ sub doQuery($$$)
$log_queue->enqueue("$room (Thread No. ".threads->tid().") socket to ".$values{address}.":".$values{port}." did not report anything in expected time, resetting socket (last contact: ".strftime("%Y-%m-%d %H:%M:%S", localtime($last_contact)).")\n");
$selector->remove($client_socket);
shutdown($client_socket, 2);
close($client_socket);
$client_socket = undef;
}
@ -630,7 +637,7 @@ sub doQuery($$$)
Blocking => 1
) or ( $reconnect_count++ );
if($client_socket)
if(defined($client_socket))
{
# give a success message
$log_queue->enqueue("$room (Thread No. ".threads->tid().") reconnected to ".$values{address}.":".$values{port}." after $reconnect_count tries for device $do_address (UUID: $do_uuid)\n");
@ -720,6 +727,8 @@ sub doQuery($$$)
{
$selector->remove($local_client);
shutdown($local_client, 2);
close($local_client);
$client_socket = undef;
@ -730,7 +739,7 @@ sub doQuery($$$)
}
# Sleep for one second to avoid 100% cpu usage
sleep(1);
#sleep(1);
}
}

View File

@ -34,12 +34,15 @@ use File::Basename;
use Getopt::Long;
use threads;
use threads::shared;
use Thread::Queue;
use Time::HiRes qw(gettimeofday);
use warnings;
use strict;
sub Log($$);
my $new_client;
my $server;
my $client;
@ -47,10 +50,13 @@ my $buf;
my $querylocker :shared = int(time() - 15);
my %queues;
my $log_queue = Thread::Queue->new();
my $opt_d;
my $opt_h;
my $opt_v;
my $opt_v = 0;
my $opt_p = 5111;
my $opt_P = "/var/run/".basename($0).".pid";
my $opt_l;
@ -58,7 +64,7 @@ my $opt_l;
Getopt::Long::Configure('bundling');
GetOptions(
"d" => \$opt_d, "daemon" => \$opt_d,
"v" => \$opt_v, "verbose" => \$opt_v,
"v+" => \$opt_v, "verbose+" => \$opt_v,
"l=s" => \$opt_l, "logfile=s" => \$opt_l,
"p=i" => \$opt_p, "port=i" => \$opt_p,
"P=s" => \$opt_P, "pid-file=s" => \$opt_P,
@ -66,20 +72,16 @@ GetOptions(
if($opt_l)
{
open(STDOUT, ">>$opt_l") or die ("could not open logfile: $opt_l");
print timestamp()."=================================================\n" if($opt_v);
}
Log 0, "=================================================" if($opt_l);
print timestamp()."started with PID $$\n" if($opt_v);
Log 1, "started with PID $$";
if(-e "$opt_P")
{
print timestamp()."another process already running (PID file found at $opt_P)\n";
print timestamp()."aborted...\n";
print timestamp()." another process already running (PID file found at $opt_P)\n";
print timestamp()." aborted...\n";
exit 1;
}
@ -133,7 +135,7 @@ KeepAlive => 1,
Blocking => 0
) or die "error while creating socket: $!\n";
print timestamp()."created socket on ".$server->sockhost()." with port ".$server->sockport()."\n" if($opt_v);
Log 0, "created socket on ".$server->sockhost().":".$server->sockport();
my $listener = IO::Select->new();
$listener->add($server);
@ -144,6 +146,7 @@ my @new_handles;
my %child_handles;
my %child_config;
my $thread_counter = 0;
my $address;
my $name;
my $timeout;
@ -159,12 +162,18 @@ $SIG{TERM} = sub { $sig_received = "SIGTERM"; };
$SIG{KILL} = sub { $sig_received = "SIGKILL"; };
$SIG{QUIT} = sub { $sig_received = "SIGQUIT"; };
$SIG{ABRT} = sub { $sig_received = "SIGABRT"; };
$SIG{PIPE} = sub { $sig_received = "SIGPIPE"; };
$server_pid = $$ unless(defined($server_pid));
while(1)
{
if($log_queue->pending)
{
Log 2, $log_queue->dequeue;
}
if(@new_handles = $listener->can_read(1))
{
foreach my $client (@new_handles)
@ -174,7 +183,7 @@ while(1)
$new_client = $server->accept();
$listener->add($new_client);
print timestamp()."new connection from ".$new_client->peerhost()."\n" if($opt_v);
Log 1, "new connection from ".$new_client->peerhost().":".$new_client->peerport();
}
else
@ -192,7 +201,7 @@ while(1)
if($buf =~ /^\s*([0-9a-fA-F]{2}:){5}[0-9a-fA-F]{2}\s*\|\s*\d+\s*$/)
{
$client->send("command accepted\n");
print timestamp()."received new command from ".$client->peerhost().": $buf\n" if($opt_v);
Log 2, "received new command from ".$client->peerhost().":".$client->peerport()." - $buf";
($address, $timeout) = split("\\|", $buf);
$address =~ s/\s*//g;
@ -202,31 +211,33 @@ while(1)
if(defined($child_handles{$client}))
{
print timestamp()."killing thread ".$child_handles{$client}->tid()." for client ".$client->peerhost()."\n" if($opt_v);
$child_handles{$client}->kill('KILL');
delete($child_handles{$client});
Log 2, "sending new command to thread ".$child_handles{$client}->tid()." for client ".$client->peerhost().":".$client->peerport();
$queues{$child_handles{$client}->tid()}->enqueue("new|".$address."|".$timeout);
}
else
{
$thread_counter++;
$queues{$thread_counter} = Thread::Queue->new();
my $new_thread = threads->new(\&doQuery, ($write_handle, $address, $timeout));
print timestamp()."created thread ".$new_thread->tid()." for processing device $address within $timeout seconds for peer ".$client->peerhost()."\n" if($opt_v);
my $new_thread = threads->new(\&doQuery, ($write_handle, $address, $timeout));
Log 2, "created thread ".$new_thread->tid()." for processing device $address within $timeout seconds for peer ".$client->peerhost().":".$client->peerport();
$new_thread->detach();
$new_thread->detach();
$child_handles{$client} = $new_thread;
$child_handles{$client} = $new_thread;
}
}
elsif(lc($buf) =~ /^\s*now\s*$/)
{
print timestamp()."received now command from client ".$client->peerhost()."\n" if($opt_v);
Log 2, "received now command from client ".$client->peerhost().":".$client->peerport();
if(defined($child_handles{$client}))
{
print timestamp()."signalling thread ".$child_handles{$client}->tid()." for an instant test for client ".$client->peerhost()."\n" if($opt_v);
$child_handles{$client}->kill('HUP');
Log 2, "signalling thread ".$child_handles{$client}->tid()." for an instant test for client ".$client->peerhost().":".$client->peerport();
$queues{$child_handles{$client}->tid()}->enqueue("now");
$client->send("command accepted\n");
}
else
@ -236,12 +247,12 @@ while(1)
}
elsif(lc($buf) =~ /^\s*stop\s*$/)
{
print timestamp()."received stop command from client ".$client->peerhost()."\n" if($opt_v);
Log 2, "received stop command from client ".$client->peerhost().":".$client->peerport();
if(defined($child_handles{$client}))
{
print timestamp()."killing thread ".$child_handles{$client}->tid()." for client ".$client->peerhost()."\n" if($opt_v);
$child_handles{$client}->kill('KILL');
Log 2, "sending thread ".$child_handles{$client}->tid()." the stop command for client ".$client->peerhost().":".$client->peerport();
$queues{$child_handles{$client}->tid()}->enqueue("stop");
$client->send("command accepted\n");
delete($child_handles{$client});
@ -255,24 +266,29 @@ while(1)
{
$client->send("command rejected\n");
print timestamp()."received invalid command >>$buf<< from client ".$client->peerhost()."\n" if($opt_v);
$queues{$child_handles{$client}->tid()}->enqueue("stop");
Log 1, "received invalid command >>$buf<< from client ".$client->peerhost().":".$client->peerport();
}
}
else
{
print timestamp()."closed connection from ".$client->peerhost()."\n" if($opt_v);
Log 1, "closed connection from ".$client->peerhost().":".$client->peerport();
$listener->remove($client);
if(defined($child_handles{$client}))
{
print timestamp()."killing thread ".$child_handles{$client}->tid()." for client ".$client->peerhost()."\n" if($opt_v);
$child_handles{$client}->kill('KILL');
Log 2, "killing thread ".$child_handles{$client}->tid()." for client ".$client->peerhost();
$queues{$child_handles{$client}->tid()}->enqueue("stop");
delete($child_handles{$client});
}
shutdown($client, 2);
close $client;
$client = undef;
Log 1, "closed successfully all threads";
;
}
@ -285,10 +301,10 @@ while(1)
if(defined($sig_received))
{
print timestamp()."Caught $sig_received exiting\n" if($opt_v);
Log 0, "caught $sig_received";
unlink($opt_P);
print timestamp()."removed PID-File $opt_P\n" if($opt_v);
print timestamp()."server shutdown\n" if($opt_v);
Log 1, "removed PID-File $opt_P";
Log 0, "exiting";
exit;
}
@ -306,7 +322,7 @@ if($pid < 0)
}
elsif($pid)
{
print timestamp()."forked with PID $pid\n";
Log 0, "forked with PID $pid";
exit 0;
}
@ -318,41 +334,61 @@ foreach (0 .. (POSIX::sysconf (&POSIX::_SC_OPEN_MAX) || 1024)) { POSIX::close $_
open (STDIN, "</dev/null");
open (STDOUT, ">/dev/null");
open (STDERR, ">&STDOUT");
if($opt_l)
{
open(STDOUT, ">>$opt_l") or die ("could not open logfile: $opt_l");
}
}
sub doQuery($$)
{
local $SIG{KILL} = sub {threads->exit();};
my ($write_handle, $address, $timeout) = @_;
my $return;
my $hcitool;
my $nextrun = gettimeofday();
my $cmd;
my $run = 1;
local $SIG{HUP} = sub {$nextrun = gettimeofday();};
if($address and $timeout)
{
while(1)
THREADLOOP: while($run)
{
if(exists($queues{threads->tid()}) and $queues{threads->tid()}->pending)
{
$cmd = $queues{threads->tid()}->dequeue;
Log 2, threads->tid()."|received command: $cmd";
if($cmd eq "now")
{
$log_queue->enqueue(threads->tid()."|performing an instant test");
$nextrun = gettimeofday();
}
elsif($cmd eq "stop")
{
$log_queue->enqueue(threads->tid()."|shutting down thread");
$run = 0;
last THREADLOOP;
}
elsif($cmd =~ /^new\|/)
{
($cmd, $address, $timeout) = split("\\|", $cmd);
$nextrun = gettimeofday();
$log_queue->enqueue(threads->tid()."|new address: $address - new timeout $timeout");
}
}
if($write_handle)
{
if($nextrun <= gettimeofday())
{
{
lock($querylocker);
if($querylocker gt (time() - 10))
if($querylocker gt (gettimeofday() - 2))
{
sleep int(rand(9) + 1);
$log_queue->enqueue(threads->tid()."|waiting before hcitool command execution because last command was executed within the last 2 seconds");
sleep rand(1) + 1;
}
$hcitool = qx(which hcitool);
chomp $hcitool;
@ -362,48 +398,62 @@ local $SIG{HUP} = sub {$nextrun = gettimeofday();};
}
else
{
$write_handle->send("error\n");
$write_handle->send("error\n") if(defined($write_handle));
}
$querylocker = time();
$querylocker = gettimeofday();
}
chomp $return;
if(not $return =~ /^\s*$/)
{
$write_handle->send("present;$return\n");
$write_handle->send("present;$return\n") if(defined($write_handle));
}
else
{
$write_handle->send("absence\n");
$write_handle->send("absence\n") if(defined($write_handle));
}
$nextrun = gettimeofday() + $timeout;
}
}
sleep 1;
}
}
delete($queues{threads->tid()}) if(exists($queues{threads->tid()}));
}
sub timestamp
{
my ($sec, $min, $hour, $day, $mon, $year, undef, undef, undef) = localtime(time);
$mon++;
$year += 1900;
$sec = ($sec < 10 ? "0".$sec : $sec);
$min = ($min < 10 ? "0".$min : $min);
$hour = ($hour < 10 ? "0".$hour : $hour);
$day = ($day < 10 ? "0".$day : $day);
$mon = ($mon < 10 ? "0".$mon : $mon);
return "$year-$mon-$day $hour:$min:$sec - ";
return POSIX::strftime("%Y-%m-%d %H:%M:%S",localtime);
}
sub Log($$)
{
my ($loglevel, $message) = @_;
my $thread = 0;
if($message =~ /^\d+\|/)
{
($thread, $message) = split("\\|", $message);
}
if($loglevel <= $opt_v)
{
if($opt_l)
{
open(LOGFILE, ">>$opt_l") or die ("could not open logfile: $opt_l");
}
else
{
open (LOGFILE, ">&STDOUT") or die("cannot open STDOUT");
}
print LOGFILE "\r".timestamp()." - ".($opt_v >= 2 ? ($thread > 0 ? "(Thread $thread)" : "(Main Thread)")." - ":"").$message."\n";
close(LOGFILE);
}
}