diff --git a/fhem/contrib/PRESENCE/collectord b/fhem/contrib/PRESENCE/collectord index 35ac7f47f..f393393bc 100755 --- a/fhem/contrib/PRESENCE/collectord +++ b/fhem/contrib/PRESENCE/collectord @@ -49,7 +49,7 @@ my $client; my $buf; sub Log($$); - +sub parseParams($;$); my $opt_d; my $opt_h; @@ -67,7 +67,7 @@ my $thread_counter = 0; my %state; my %handle; my %socket_to_handle; -my $uuid; + $SIG{__DIE__} = sub { @@ -204,15 +204,8 @@ $SIG{ABRT} = sub { $sig_received = "SIGABRT"; }; $server_pid = $$ unless(defined($server_pid)); - - -my $value; -my $room; -my $rooms_ref; -my $result; my $status_queue = Thread::Queue->new(); my $log_queue = Thread::Queue->new(); -my $logline; Log 2, "finished initialization. entering main loop"; @@ -221,13 +214,13 @@ while(1) { # Cleaning up the status hash for obsolete devices - foreach $uuid (keys %state) + foreach my $uuid (keys %state) { my %handle_to_socket = reverse %socket_to_handle; unless(exists($handle_to_socket{$uuid})) { - Log 2, "cleaning up status values (UUID: $uuid)"; - delete $state{$uuid}; + Log 2, "cleaning up status values (UUID: $uuid)"; + delete $state{$uuid}; } } @@ -235,13 +228,13 @@ while(1) # process all status messages from all threads via status queue while($status_queue->pending) { - ($uuid,$room,$value,$name) = split(";", $status_queue->dequeue); + my ($uuid,$room,$value,$data) = split(";", $status_queue->dequeue, 4); - Log 2, "processing state message for device ".(defined($name)?$name." ":"")."in room $room (UUID: $uuid)"; + Log 2, "processing state message for device ".(defined($name)?$name." ":"")."in room $room (UUID: $uuid) - value: $value".(defined($data) ? " - data: $data" : ""); if(not $value =~ /^(absence|present)$/) { - $handle{$uuid}{client}->send("$value;$room\n") if(defined($handle{$uuid}{client})); + $handle{$uuid}{client}->send("$value;room=$room\n") if(defined($handle{$uuid}{client})); if($value eq "socket_closed") { @@ -250,9 +243,18 @@ while(1) } else { - $state{$uuid}{rooms}{$room} = $value.(defined($name)?";".$name:""); + $state{$uuid}{rooms}{$room}{state} = $value; + + if(defined($data)) + { + $state{$uuid}{rooms}{$room}{data} = $data; + } + else + { + delete $state{$uuid}{rooms}{$room}{data}; + } - $result = aggregateRooms($state{$uuid}{rooms}); + my $result = aggregateRooms($state{$uuid}{rooms}); if(defined($result)) { @@ -276,9 +278,7 @@ while(1) # If a thread has something reported via Log Queue, print it out if verbose is activated while($log_queue->pending) { - $logline = $log_queue->dequeue; - Log 2, $logline; - $logline = undef; + Log 2, $log_queue->dequeue; } @@ -326,14 +326,15 @@ while(1) # if the client has already a request running, stop at first the old request if(defined($socket_to_handle{$client})) { - $uuid = $socket_to_handle{$client}; + my $uuid = $socket_to_handle{$client}; + # get all threads for this socket and send them a termination signal - my $temp = $handle{$uuid}{threads}; - foreach $room (keys %$temp) + foreach my $room (keys %{$handle{$uuid}{threads}}) { Log 2, "sending thread ".$handle{$uuid}{threads}{$room}->tid()." new address $address for room $room"; $queues{$handle{$uuid}{threads}{$room}->tid()}->enqueue("new|$address"); - $state{$uuid}{rooms}{$room} = "" if(exists($state{$uuid}{rooms}{$room})); + $state{$uuid}{rooms}{$room}{state} = "" if(exists($state{$uuid}{rooms}{$room})); + delete($state{$uuid}{rooms}{$room}{data}); } $handle{$uuid}{timeout} = $timeout; @@ -348,7 +349,7 @@ while(1) Log 2, "generating new UUID for client ".$client->peerhost()." - ".$socket_to_handle{$client}; } - $uuid = $socket_to_handle{$client}; + my $uuid = $socket_to_handle{$client}; $handle{$uuid}{address} = $address; $handle{$uuid}{client} = $client; @@ -358,7 +359,7 @@ while(1) $state{$uuid}{lastresult}{timestamp} = 0; # create a new reqester thread for each configured room to perform the query - while (($room, $value) = each %config) + while (my ($room, $value) = each %config) { $thread_counter++; $queues{$thread_counter} = Thread::Queue->new(); @@ -370,7 +371,8 @@ while(1) # save the socket/room relationship to know which thread belongs to which client request (for stop command) $handle{$uuid}{threads}{$room} = $new_thread; - $state{$uuid}{rooms}{$room} = ""; + $state{$uuid}{rooms}{$room}{state} = ""; + delete($state{$uuid}{rooms}{$room}{data}); } } } @@ -381,14 +383,15 @@ while(1) # just to be sure if the client has really a running request if(defined($socket_to_handle{$client})) { - $uuid = $socket_to_handle{$client}; + my $uuid = $socket_to_handle{$client}; + # get all threads for this socket and send them a termination signal - my $temp = $handle{$uuid}{threads}; - foreach $room (keys %$temp) + foreach my $room (keys %{$handle{$uuid}{threads}}) { Log 2, "signalling thread ".$handle{$uuid}{threads}{$room}->tid()." to send \"now\"-request for room $room for client ".$client->peerhost(); $queues{$handle{$uuid}{threads}{$room}->tid()}->enqueue("now"); - $state{$uuid}{rooms}{$room} = "" if(exists($state{$uuid}{rooms}{$room})); + $state{$uuid}{rooms}{$room}{state} = "" if(exists($state{$uuid}{rooms}{$room})); + delete($state{$uuid}{rooms}{$room}{data}); } delete($state{$uuid}{lastresult}) if(exists($state{$uuid}{lastresult})); @@ -408,10 +411,10 @@ while(1) # just to be sure if the client has really a running request if(defined($socket_to_handle{$client})) { - $uuid = $socket_to_handle{$client}; + my $uuid = $socket_to_handle{$client}; + # get all threads for this socket and send them a termination signal - my $temp = $handle{$uuid}{threads}; - foreach $room (keys %$temp) + foreach my $room (keys %{$handle{$uuid}{threads}}) { Log 2, "killing thread ".$handle{$uuid}{threads}{$room}->tid()." for room $room for client ".$client->peerhost(); $queues{$handle{$uuid}{threads}{$room}->tid()}->enqueue("stop"); @@ -449,10 +452,10 @@ while(1) # if there is a running command, stop it first and clean up (same as stop command, see above) if(defined($socket_to_handle{$client})) { - $uuid = $socket_to_handle{$client}; + my $uuid = $socket_to_handle{$client}; + # get all threads for this socket and send them a termination signal - my $temp = $handle{$uuid}{threads}; - foreach $room (keys %$temp) + foreach my $room (keys %{$handle{$uuid}{threads}}) { Log 2, "killing thread ".$handle{$uuid}{threads}{$room}->tid()." for room $room for client ".$client->peerhost(); $queues{$handle{$uuid}{threads}{$room}->tid()}->enqueue("stop"); @@ -527,7 +530,7 @@ sub doQuery($$$) my ($do_config, $do_room, $do_address, $do_uuid) = @_; my $return; my $socket; - my %values = %$do_config; + my %values = %{$do_config}; my $selector; my $run = 1; my @client_handle; @@ -547,7 +550,7 @@ sub doQuery($$$) Type => SOCK_STREAM, KeepAlive => 1, Blocking => 1 - ) or ( $log_queue->enqueue(threads->tid()."|$room : could not create socket to ".$values{address}." - $! -")); + ) or ( $log_queue->enqueue(threads->tid()."|$do_room : could not create socket to ".$values{address}." - $! -")); $selector = IO::Select->new($client_socket); @@ -568,7 +571,7 @@ sub doQuery($$$) if(defined($client_socket) and not $last_contact > (gettimeofday() - ($current_state eq "absence" ? $values{absence_timeout} : $values{presence_timeout}) - 60)) { - $log_queue->enqueue(threads->tid()."|$room socket to ".$values{address}.":".$values{port}." did not report anything in expected time, resetting socket (last contact: ".strftime("%Y-%m-%d %H:%M:%S", localtime($last_contact)).")"); + $log_queue->enqueue(threads->tid()."|$do_room socket to ".$values{address}.":".$values{port}." did not report anything in expected time, resetting socket (last contact: ".strftime("%Y-%m-%d %H:%M:%S", localtime($last_contact)).")"); $selector->remove($client_socket); $client_socket->shutdown(2); @@ -588,7 +591,7 @@ sub doQuery($$$) } elsif($cmd eq "stop") { - $log_queue->enqueue(threads->tid()."|$room terminating thread ".threads->tid()." for $address"); + $log_queue->enqueue(threads->tid()."|$do_room terminating thread ".threads->tid()." for ".$values{address}); $client_socket->shutdown(2) if(defined($client_socket)); $selector->remove($client_socket) if(defined($selector)); close($client_socket) if(defined($client_socket)); @@ -620,10 +623,10 @@ sub doQuery($$$) if(!$reconnect_count) { # Tell this the client; - $status_queue->enqueue("$do_uuid;$room;socket_closed"); + $status_queue->enqueue("$do_uuid;$do_room;socket_closed"); # create a log message - $log_queue->enqueue(threads->tid()."|$room socket to ".$values{address}.":".$values{port}." for device $do_address closed. Trying to reconnect..."); + $log_queue->enqueue(threads->tid()."|$do_room socket to ".$values{address}.":".$values{port}." for device $do_address closed. Trying to reconnect..."); } # now try to re-establish the connection @@ -639,8 +642,8 @@ sub doQuery($$$) if(defined($client_socket)) { # give a success message - $log_queue->enqueue(threads->tid()."|$room reconnected to ".$values{address}.":".$values{port}." after $reconnect_count tries for device $do_address (UUID: $do_uuid)"); - $status_queue->enqueue("$do_uuid;$room;socket_reconnected"); + $log_queue->enqueue(threads->tid()."|$do_room reconnected to ".$values{address}.":".$values{port}." after $reconnect_count tries for device $do_address (UUID: $do_uuid)"); + $status_queue->enqueue("$do_uuid;$do_room;socket_reconnected"); # reset the reconnect counter $reconnect_count = 0; @@ -683,22 +686,22 @@ sub doQuery($$$) if($return =~ /command accepted/) { # log this to the thread log queue - $log_queue->enqueue(threads->tid()."|$room accepted command for $do_address"); + $log_queue->enqueue(threads->tid()."|$do_room accepted command for $do_address"); } elsif($return =~ /command rejected/) # if the message is "command rejected" also log it to the log queue { - $log_queue->enqueue(threads->tid()."|$room REJECTED command for $do_address"); + $log_queue->enqueue(threads->tid()."|$do_room REJECTED command for $do_address"); } else # else its a status message { # put the message to the status queue with uuid for identification and the room name - $status_queue->enqueue("$do_uuid;$room;".$return); + $status_queue->enqueue("$do_uuid;$do_room;".$return); # if the state changes from present to absence if(defined($previous_state) and $previous_state eq "present" and lc($return) =~ /^absence/) { # log the timout change to the log queue - $log_queue->enqueue(threads->tid()."|$room changing to absence timeout (".$values{absence_timeout}.") for device $do_address"); + $log_queue->enqueue(threads->tid()."|$do_room changing to absence timeout (".$values{absence_timeout}.") for device $do_address"); $current_state = "absence"; @@ -707,7 +710,7 @@ sub doQuery($$$) } elsif(defined($previous_state) and $previous_state eq "absence" and lc($return) =~ /^present/) { - $log_queue->enqueue(threads->tid()."|$room changing to presence timeout (".$values{presence_timeout}.") for device $do_address"); + $log_queue->enqueue(threads->tid()."|$do_room changing to presence timeout (".$values{presence_timeout}.") for device $do_address"); $current_state = "present"; @@ -862,30 +865,85 @@ sub aggregateRooms my @rooms; my $key; - my $value; - my $temp_name; + + my $first_key; + # get all present rooms foreach $key (keys %$hash) { - if($hash->{$key} ne "") + my $room_hash = $hash->{$key}; + + if(defined($room_hash->{state}) and $room_hash->{state} ne "") { - ($value, $name) = split(";", $hash->{$key}); + my ($value, $data) = split(";", $hash->{$key}); - if($value eq "present") + if($room_hash->{state} eq "present") { push @rooms, $key; - $temp_name = $name; } } else { + # if one room has no result return undef return undef; } } + + + # if multiple rooms are present, try selection by highest RSSI + if(@rooms > 0) + { + my $rssi_addon_data_key = "rssi"; + my $rssi_available = 1; + my $highest_value; + my $highest_key; + + foreach $key (@rooms) + { + my $data = $hash->{$key}{data}; + + if(defined($data)) + { + my ($a,$h) = parseParams($data,';'); + + if(@{$a} == 1 and keys(%{$h}) == 0) # old presenced device name => convert to new style + { + $hash->{$key}{data} = "device_name='".$hash->{$key}{data}."'"; + $rssi_available = 0; + } + elsif(@{$a} == 0 and keys(%{$h}) > 0) # new addon data style + { + # check rssi + if($rssi_available and exists($h->{$rssi_addon_data_key}) and $h->{$rssi_addon_data_key} =~ /^-?(?:\d+\.)\d+$/) + { + if(!defined($highest_value) or (defined($highest_value) and $h->{$rssi_addon_data_key} > $highest_value)) + { + $highest_value = $h->{rssi}; + $highest_key = $key; + } + } + else + { + $rssi_available = 0; + } + } + else + { + Log 1, "invalid addon data received from room $key: $data"; + } + } + } + + if($rssi_available and defined($highest_key)) + { + Log 2, "successful RSSI comparisation (highest $rssi_addon_data_key value $highest_value found in room $highest_key" if(@rooms > 1); + @rooms = ($highest_key); + } + } if(@rooms > 0) { - return "present;$temp_name;".join(",",sort @rooms); + return "present;rooms='".join(",",sort @rooms).(defined($hash->{$rooms[0]}{data}) ? "';".$hash->{$rooms[0]}{data} : ""); } else { @@ -939,3 +997,71 @@ sub Log($$) } } + +##################################### +# parseParams() from fhem.pl by justme1968 +sub parseParams($;$) +{ + my($cmd, $separator) = @_; + $separator = ' ' if( !$separator ); + my(@a, %h); + + my @params; + if( ref($cmd) eq 'ARRAY' ) { + @params = @{$cmd}; + } else { + @params = split($separator, $cmd); + } + + while (@params) { + my $param = shift(@params); + my ($key, $value) = split( '=', $param, 2 ); + + if( !defined( $value ) ) { + $value = $key; + $key = undef; + } + + #collect all parts until the closing ' or " + while( $param && $value =~ m/^('|")/ && $value !~ m/$1$/ ) { + my $next = shift(@params); + last if( !defined($next) ); + $value .= $separator . $next; + } + #remove matching ' or " from the start and end + if( $value =~ m/^('|")/ && $value =~ m/$1$/ ) { + $value =~ s/^.(.*).$/$1/; + } + + #collext all parts until opening { and closing } are matched + if( $value =~ m/^{/ ) { # } for match + my $count = 0; + for my $i (0..length($value)-1) { + my $c = substr($value, $i, 1); + ++$count if( $c eq '{' ); + --$count if( $c eq '}' ); + } + + while( $param && $count != 0 ) { + my $next = shift(@params); + last if( !defined($next) ); + $value .= $separator . $next; + + for my $i (0..length($next)-1) { + my $c = substr($next, $i, 1); + ++$count if( $c eq '{' ); + --$count if( $c eq '}' ); + } + } + } + + if( defined($key) ) { + $h{$key} = $value; + } else { + push @a, $value; + } + + } + + return(\@a, \%h); +}