diff --git a/fhem/contrib/PRESENCE/collectord b/fhem/contrib/PRESENCE/collectord index c22c62e61..1ba9d9c2e 100755 --- a/fhem/contrib/PRESENCE/collectord +++ b/fhem/contrib/PRESENCE/collectord @@ -218,6 +218,8 @@ my $log_queue = Thread::Queue->new(); my $logline; +print timestamp()."finished initialization. entering main loop\n" if($opt_v >= 2); + while(1) { @@ -237,7 +239,8 @@ while(1) while($status_queue->pending) { ($uuid,$room,$value,$name) = split(";", $status_queue->dequeue); - + + print timestamp()."processing state message for device ".(defined($name)?$name." ":"")."in room $room (UUID: $uuid)\n" if($opt_v >=2); if(not $value =~ /^(absence|present)$/) { @@ -478,7 +481,7 @@ while(1) # in case we have received a process signal, remove the pid file and shutdown if(defined($sig_received)) { - print timestamp()."Caught $sig_received exiting\n" if($opt_v); + print "\r".timestamp()."Caught $sig_received exiting\n" if($opt_v); unlink($opt_P); print timestamp()."removed PID-File $opt_P\n" if($opt_v); print timestamp()."server shutdown\n" if($opt_v); @@ -486,7 +489,11 @@ while(1) } + } + +print timestamp()."leaving main loop\n" if($opt_v >= 2); + ######################################################################################################################## # # Subroutine definitions @@ -548,18 +555,17 @@ sub doQuery($$$) my $client_socket = undef; # if the thread gets a termination signal, the thread must be shutdown by itself local $SIG{TERM} = sub { - $log_queue->enqueue("$room (".threads->tid()."): terminating thread ".threads->tid()." for $address\n"); - $client_socket->send("stop\n") if($client_socket); - sleep 2; + $log_queue->enqueue("$room (Thread No. ".threads->tid()."): terminating thread ".threads->tid()." for $address\n"); + $client->shutdown() if(defined($client)); $selector->remove($client_socket) if(defined($selector)); - close($client_socket) if($client_socket); + close($client_socket) if(defined($client_socket)); $client_socket = undef; - $log_queue->enqueue("$room (".threads->tid()."): exit thread ".threads->tid()."\n"); + $log_queue->enqueue("$room (Thread No. ".threads->tid()."): exit thread ".threads->tid()."\n"); threads->exit(); }; local $SIG{HUP} = sub { - $client_socket->send("now\n") if($client_socket); + $client_socket->send("now\n") if(defined($client_socket)); }; @@ -580,7 +586,7 @@ sub doQuery($$$) $selector = IO::Select->new($client_socket); - if($client_socket) + if(defined($client_socket)) { # send the given address to the presence daemon @@ -603,6 +609,7 @@ sub doQuery($$$) $log_queue->enqueue("$room (Thread No. ".threads->tid().") socket to ".$values{address}.":".$values{port}." did not report anything in expected time, resetting socket (last contact: ".strftime("%Y-%m-%d %H:%M:%S", localtime($last_contact)).")\n"); $selector->remove($client_socket); + shutdown($client_socket, 2); close($client_socket); $client_socket = undef; } @@ -630,7 +637,7 @@ sub doQuery($$$) Blocking => 1 ) or ( $reconnect_count++ ); - if($client_socket) + if(defined($client_socket)) { # give a success message $log_queue->enqueue("$room (Thread No. ".threads->tid().") reconnected to ".$values{address}.":".$values{port}." after $reconnect_count tries for device $do_address (UUID: $do_uuid)\n"); @@ -720,6 +727,8 @@ sub doQuery($$$) { $selector->remove($local_client); + + shutdown($local_client, 2); close($local_client); $client_socket = undef; @@ -730,7 +739,7 @@ sub doQuery($$$) } # Sleep for one second to avoid 100% cpu usage - sleep(1); + #sleep(1); } } diff --git a/fhem/contrib/PRESENCE/presenced b/fhem/contrib/PRESENCE/presenced index 5a3b11713..e4039df6e 100755 --- a/fhem/contrib/PRESENCE/presenced +++ b/fhem/contrib/PRESENCE/presenced @@ -34,12 +34,15 @@ use File::Basename; use Getopt::Long; use threads; use threads::shared; +use Thread::Queue; use Time::HiRes qw(gettimeofday); use warnings; use strict; +sub Log($$); + my $new_client; my $server; my $client; @@ -47,10 +50,13 @@ my $buf; my $querylocker :shared = int(time() - 15); +my %queues; + +my $log_queue = Thread::Queue->new(); my $opt_d; my $opt_h; -my $opt_v; +my $opt_v = 0; my $opt_p = 5111; my $opt_P = "/var/run/".basename($0).".pid"; my $opt_l; @@ -58,7 +64,7 @@ my $opt_l; Getopt::Long::Configure('bundling'); GetOptions( "d" => \$opt_d, "daemon" => \$opt_d, - "v" => \$opt_v, "verbose" => \$opt_v, + "v+" => \$opt_v, "verbose+" => \$opt_v, "l=s" => \$opt_l, "logfile=s" => \$opt_l, "p=i" => \$opt_p, "port=i" => \$opt_p, "P=s" => \$opt_P, "pid-file=s" => \$opt_P, @@ -66,20 +72,16 @@ GetOptions( -if($opt_l) -{ -open(STDOUT, ">>$opt_l") or die ("could not open logfile: $opt_l"); - -print timestamp()."=================================================\n" if($opt_v); -} +Log 0, "=================================================" if($opt_l); -print timestamp()."started with PID $$\n" if($opt_v); + +Log 1, "started with PID $$"; if(-e "$opt_P") { - print timestamp()."another process already running (PID file found at $opt_P)\n"; - print timestamp()."aborted...\n"; + print timestamp()." another process already running (PID file found at $opt_P)\n"; + print timestamp()." aborted...\n"; exit 1; } @@ -133,7 +135,7 @@ KeepAlive => 1, Blocking => 0 ) or die "error while creating socket: $!\n"; -print timestamp()."created socket on ".$server->sockhost()." with port ".$server->sockport()."\n" if($opt_v); +Log 0, "created socket on ".$server->sockhost().":".$server->sockport(); my $listener = IO::Select->new(); $listener->add($server); @@ -144,6 +146,7 @@ my @new_handles; my %child_handles; my %child_config; +my $thread_counter = 0; my $address; my $name; my $timeout; @@ -159,12 +162,18 @@ $SIG{TERM} = sub { $sig_received = "SIGTERM"; }; $SIG{KILL} = sub { $sig_received = "SIGKILL"; }; $SIG{QUIT} = sub { $sig_received = "SIGQUIT"; }; $SIG{ABRT} = sub { $sig_received = "SIGABRT"; }; +$SIG{PIPE} = sub { $sig_received = "SIGPIPE"; }; $server_pid = $$ unless(defined($server_pid)); while(1) { + if($log_queue->pending) + { + Log 2, $log_queue->dequeue; + } + if(@new_handles = $listener->can_read(1)) { foreach my $client (@new_handles) @@ -174,7 +183,7 @@ while(1) $new_client = $server->accept(); $listener->add($new_client); - print timestamp()."new connection from ".$new_client->peerhost()."\n" if($opt_v); + Log 1, "new connection from ".$new_client->peerhost().":".$new_client->peerport(); } else @@ -192,7 +201,7 @@ while(1) if($buf =~ /^\s*([0-9a-fA-F]{2}:){5}[0-9a-fA-F]{2}\s*\|\s*\d+\s*$/) { $client->send("command accepted\n"); - print timestamp()."received new command from ".$client->peerhost().": $buf\n" if($opt_v); + Log 2, "received new command from ".$client->peerhost().":".$client->peerport()." - $buf"; ($address, $timeout) = split("\\|", $buf); $address =~ s/\s*//g; @@ -202,31 +211,33 @@ while(1) if(defined($child_handles{$client})) { - print timestamp()."killing thread ".$child_handles{$client}->tid()." for client ".$client->peerhost()."\n" if($opt_v); - $child_handles{$client}->kill('KILL'); - delete($child_handles{$client}); + Log 2, "sending new command to thread ".$child_handles{$client}->tid()." for client ".$client->peerhost().":".$client->peerport(); + $queues{$child_handles{$client}->tid()}->enqueue("new|".$address."|".$timeout); } - + else + { + $thread_counter++; + $queues{$thread_counter} = Thread::Queue->new(); - my $new_thread = threads->new(\&doQuery, ($write_handle, $address, $timeout)); - print timestamp()."created thread ".$new_thread->tid()." for processing device $address within $timeout seconds for peer ".$client->peerhost()."\n" if($opt_v); + my $new_thread = threads->new(\&doQuery, ($write_handle, $address, $timeout)); + Log 2, "created thread ".$new_thread->tid()." for processing device $address within $timeout seconds for peer ".$client->peerhost().":".$client->peerport(); + - - $new_thread->detach(); + $new_thread->detach(); - $child_handles{$client} = $new_thread; + $child_handles{$client} = $new_thread; + } } elsif(lc($buf) =~ /^\s*now\s*$/) { - print timestamp()."received now command from client ".$client->peerhost()."\n" if($opt_v); + Log 2, "received now command from client ".$client->peerhost().":".$client->peerport(); if(defined($child_handles{$client})) { - print timestamp()."signalling thread ".$child_handles{$client}->tid()." for an instant test for client ".$client->peerhost()."\n" if($opt_v); - - $child_handles{$client}->kill('HUP'); + Log 2, "signalling thread ".$child_handles{$client}->tid()." for an instant test for client ".$client->peerhost().":".$client->peerport(); + $queues{$child_handles{$client}->tid()}->enqueue("now"); $client->send("command accepted\n"); } else @@ -236,12 +247,12 @@ while(1) } elsif(lc($buf) =~ /^\s*stop\s*$/) { - print timestamp()."received stop command from client ".$client->peerhost()."\n" if($opt_v); + Log 2, "received stop command from client ".$client->peerhost().":".$client->peerport(); if(defined($child_handles{$client})) { - print timestamp()."killing thread ".$child_handles{$client}->tid()." for client ".$client->peerhost()."\n" if($opt_v); - $child_handles{$client}->kill('KILL'); + Log 2, "sending thread ".$child_handles{$client}->tid()." the stop command for client ".$client->peerhost().":".$client->peerport(); + $queues{$child_handles{$client}->tid()}->enqueue("stop"); $client->send("command accepted\n"); delete($child_handles{$client}); @@ -255,24 +266,29 @@ while(1) { $client->send("command rejected\n"); - - print timestamp()."received invalid command >>$buf<< from client ".$client->peerhost()."\n" if($opt_v); + $queues{$child_handles{$client}->tid()}->enqueue("stop"); + Log 1, "received invalid command >>$buf<< from client ".$client->peerhost().":".$client->peerport(); } } else { - print timestamp()."closed connection from ".$client->peerhost()."\n" if($opt_v); + Log 1, "closed connection from ".$client->peerhost().":".$client->peerport(); $listener->remove($client); if(defined($child_handles{$client})) { - print timestamp()."killing thread ".$child_handles{$client}->tid()." for client ".$client->peerhost()."\n" if($opt_v); - $child_handles{$client}->kill('KILL'); + Log 2, "killing thread ".$child_handles{$client}->tid()." for client ".$client->peerhost(); + $queues{$child_handles{$client}->tid()}->enqueue("stop"); delete($child_handles{$client}); } - + shutdown($client, 2); close $client; + $client = undef; + + Log 1, "closed successfully all threads"; +; + } @@ -285,10 +301,10 @@ while(1) if(defined($sig_received)) { - print timestamp()."Caught $sig_received exiting\n" if($opt_v); + Log 0, "caught $sig_received"; unlink($opt_P); - print timestamp()."removed PID-File $opt_P\n" if($opt_v); - print timestamp()."server shutdown\n" if($opt_v); + Log 1, "removed PID-File $opt_P"; + Log 0, "exiting"; exit; } @@ -306,7 +322,7 @@ if($pid < 0) } elsif($pid) { - print timestamp()."forked with PID $pid\n"; + Log 0, "forked with PID $pid"; exit 0; } @@ -318,41 +334,61 @@ foreach (0 .. (POSIX::sysconf (&POSIX::_SC_OPEN_MAX) || 1024)) { POSIX::close $_ open (STDIN, "/dev/null"); open (STDERR, ">&STDOUT"); - - -if($opt_l) -{ -open(STDOUT, ">>$opt_l") or die ("could not open logfile: $opt_l"); -} - - } sub doQuery($$) { -local $SIG{KILL} = sub {threads->exit();}; - my ($write_handle, $address, $timeout) = @_; my $return; my $hcitool; my $nextrun = gettimeofday(); +my $cmd; +my $run = 1; + -local $SIG{HUP} = sub {$nextrun = gettimeofday();}; if($address and $timeout) { - while(1) + THREADLOOP: while($run) { + if(exists($queues{threads->tid()}) and $queues{threads->tid()}->pending) + { + $cmd = $queues{threads->tid()}->dequeue; + Log 2, threads->tid()."|received command: $cmd"; + if($cmd eq "now") + { + $log_queue->enqueue(threads->tid()."|performing an instant test"); + $nextrun = gettimeofday(); + } + elsif($cmd eq "stop") + { + $log_queue->enqueue(threads->tid()."|shutting down thread"); + $run = 0; + last THREADLOOP; + } + elsif($cmd =~ /^new\|/) + { + ($cmd, $address, $timeout) = split("\\|", $cmd); + $nextrun = gettimeofday(); + + $log_queue->enqueue(threads->tid()."|new address: $address - new timeout $timeout"); + + } + } + if($write_handle) { if($nextrun <= gettimeofday()) { { lock($querylocker); - if($querylocker gt (time() - 10)) + if($querylocker gt (gettimeofday() - 2)) { - sleep int(rand(9) + 1); + $log_queue->enqueue(threads->tid()."|waiting before hcitool command execution because last command was executed within the last 2 seconds"); + + sleep rand(1) + 1; + } $hcitool = qx(which hcitool); chomp $hcitool; @@ -362,48 +398,62 @@ local $SIG{HUP} = sub {$nextrun = gettimeofday();}; } else { - $write_handle->send("error\n"); + $write_handle->send("error\n") if(defined($write_handle)); } - $querylocker = time(); + $querylocker = gettimeofday(); } chomp $return; if(not $return =~ /^\s*$/) { - $write_handle->send("present;$return\n"); + $write_handle->send("present;$return\n") if(defined($write_handle)); } else { - $write_handle->send("absence\n"); + $write_handle->send("absence\n") if(defined($write_handle)); } $nextrun = gettimeofday() + $timeout; } + } - sleep 1; } } + + delete($queues{threads->tid()}) if(exists($queues{threads->tid()})); + } sub timestamp { -my ($sec, $min, $hour, $day, $mon, $year, undef, undef, undef) = localtime(time); - - -$mon++; -$year += 1900; - -$sec = ($sec < 10 ? "0".$sec : $sec); -$min = ($min < 10 ? "0".$min : $min); -$hour = ($hour < 10 ? "0".$hour : $hour); - -$day = ($day < 10 ? "0".$day : $day); - -$mon = ($mon < 10 ? "0".$mon : $mon); - -return "$year-$mon-$day $hour:$min:$sec - "; - +return POSIX::strftime("%Y-%m-%d %H:%M:%S",localtime); } +sub Log($$) +{ + my ($loglevel, $message) = @_; + my $thread = 0; + + if($message =~ /^\d+\|/) + { + ($thread, $message) = split("\\|", $message); + } + if($loglevel <= $opt_v) + { + if($opt_l) + { + open(LOGFILE, ">>$opt_l") or die ("could not open logfile: $opt_l"); + } + else + { + open (LOGFILE, ">&STDOUT") or die("cannot open STDOUT"); + } + + print LOGFILE "\r".timestamp()." - ".($opt_v >= 2 ? ($thread > 0 ? "(Thread $thread)" : "(Main Thread)")." - ":"").$message."\n"; + + close(LOGFILE); + } + +}