2
0
mirror of https://github.com/fhem/fhem-mirror.git synced 2025-03-12 16:46:35 +00:00

collectord: support addon data. select room by highest RSSI if available (Forum: #54482)

git-svn-id: https://svn.fhem.de/fhem/trunk@13121 2b470e98-0d58-463d-a4d8-8e2adae1ed80
This commit is contained in:
markusbloch 2017-01-17 11:17:03 +00:00
parent 3b105f79a3
commit e72b949bc8

View File

@ -49,7 +49,7 @@ my $client;
my $buf;
sub Log($$);
sub parseParams($;$);
my $opt_d;
my $opt_h;
@ -67,7 +67,7 @@ my $thread_counter = 0;
my %state;
my %handle;
my %socket_to_handle;
my $uuid;
$SIG{__DIE__} = sub {
@ -204,15 +204,8 @@ $SIG{ABRT} = sub { $sig_received = "SIGABRT"; };
$server_pid = $$ unless(defined($server_pid));
my $value;
my $room;
my $rooms_ref;
my $result;
my $status_queue = Thread::Queue->new();
my $log_queue = Thread::Queue->new();
my $logline;
Log 2, "finished initialization. entering main loop";
@ -221,13 +214,13 @@ while(1)
{
# Cleaning up the status hash for obsolete devices
foreach $uuid (keys %state)
foreach my $uuid (keys %state)
{
my %handle_to_socket = reverse %socket_to_handle;
unless(exists($handle_to_socket{$uuid}))
{
Log 2, "cleaning up status values (UUID: $uuid)";
delete $state{$uuid};
Log 2, "cleaning up status values (UUID: $uuid)";
delete $state{$uuid};
}
}
@ -235,13 +228,13 @@ while(1)
# process all status messages from all threads via status queue
while($status_queue->pending)
{
($uuid,$room,$value,$name) = split(";", $status_queue->dequeue);
my ($uuid,$room,$value,$data) = split(";", $status_queue->dequeue, 4);
Log 2, "processing state message for device ".(defined($name)?$name." ":"")."in room $room (UUID: $uuid)";
Log 2, "processing state message for device ".(defined($name)?$name." ":"")."in room $room (UUID: $uuid) - value: $value".(defined($data) ? " - data: $data" : "");
if(not $value =~ /^(absence|present)$/)
{
$handle{$uuid}{client}->send("$value;$room\n") if(defined($handle{$uuid}{client}));
$handle{$uuid}{client}->send("$value;room=$room\n") if(defined($handle{$uuid}{client}));
if($value eq "socket_closed")
{
@ -250,9 +243,18 @@ while(1)
}
else
{
$state{$uuid}{rooms}{$room} = $value.(defined($name)?";".$name:"");
$state{$uuid}{rooms}{$room}{state} = $value;
if(defined($data))
{
$state{$uuid}{rooms}{$room}{data} = $data;
}
else
{
delete $state{$uuid}{rooms}{$room}{data};
}
$result = aggregateRooms($state{$uuid}{rooms});
my $result = aggregateRooms($state{$uuid}{rooms});
if(defined($result))
{
@ -276,9 +278,7 @@ while(1)
# If a thread has something reported via Log Queue, print it out if verbose is activated
while($log_queue->pending)
{
$logline = $log_queue->dequeue;
Log 2, $logline;
$logline = undef;
Log 2, $log_queue->dequeue;
}
@ -326,14 +326,15 @@ while(1)
# if the client has already a request running, stop at first the old request
if(defined($socket_to_handle{$client}))
{
$uuid = $socket_to_handle{$client};
my $uuid = $socket_to_handle{$client};
# get all threads for this socket and send them a termination signal
my $temp = $handle{$uuid}{threads};
foreach $room (keys %$temp)
foreach my $room (keys %{$handle{$uuid}{threads}})
{
Log 2, "sending thread ".$handle{$uuid}{threads}{$room}->tid()." new address $address for room $room";
$queues{$handle{$uuid}{threads}{$room}->tid()}->enqueue("new|$address");
$state{$uuid}{rooms}{$room} = "" if(exists($state{$uuid}{rooms}{$room}));
$state{$uuid}{rooms}{$room}{state} = "" if(exists($state{$uuid}{rooms}{$room}));
delete($state{$uuid}{rooms}{$room}{data});
}
$handle{$uuid}{timeout} = $timeout;
@ -348,7 +349,7 @@ while(1)
Log 2, "generating new UUID for client ".$client->peerhost()." - ".$socket_to_handle{$client};
}
$uuid = $socket_to_handle{$client};
my $uuid = $socket_to_handle{$client};
$handle{$uuid}{address} = $address;
$handle{$uuid}{client} = $client;
@ -358,7 +359,7 @@ while(1)
$state{$uuid}{lastresult}{timestamp} = 0;
# create a new reqester thread for each configured room to perform the query
while (($room, $value) = each %config)
while (my ($room, $value) = each %config)
{
$thread_counter++;
$queues{$thread_counter} = Thread::Queue->new();
@ -370,7 +371,8 @@ while(1)
# save the socket/room relationship to know which thread belongs to which client request (for stop command)
$handle{$uuid}{threads}{$room} = $new_thread;
$state{$uuid}{rooms}{$room} = "";
$state{$uuid}{rooms}{$room}{state} = "";
delete($state{$uuid}{rooms}{$room}{data});
}
}
}
@ -381,14 +383,15 @@ while(1)
# just to be sure if the client has really a running request
if(defined($socket_to_handle{$client}))
{
$uuid = $socket_to_handle{$client};
my $uuid = $socket_to_handle{$client};
# get all threads for this socket and send them a termination signal
my $temp = $handle{$uuid}{threads};
foreach $room (keys %$temp)
foreach my $room (keys %{$handle{$uuid}{threads}})
{
Log 2, "signalling thread ".$handle{$uuid}{threads}{$room}->tid()." to send \"now\"-request for room $room for client ".$client->peerhost();
$queues{$handle{$uuid}{threads}{$room}->tid()}->enqueue("now");
$state{$uuid}{rooms}{$room} = "" if(exists($state{$uuid}{rooms}{$room}));
$state{$uuid}{rooms}{$room}{state} = "" if(exists($state{$uuid}{rooms}{$room}));
delete($state{$uuid}{rooms}{$room}{data});
}
delete($state{$uuid}{lastresult}) if(exists($state{$uuid}{lastresult}));
@ -408,10 +411,10 @@ while(1)
# just to be sure if the client has really a running request
if(defined($socket_to_handle{$client}))
{
$uuid = $socket_to_handle{$client};
my $uuid = $socket_to_handle{$client};
# get all threads for this socket and send them a termination signal
my $temp = $handle{$uuid}{threads};
foreach $room (keys %$temp)
foreach my $room (keys %{$handle{$uuid}{threads}})
{
Log 2, "killing thread ".$handle{$uuid}{threads}{$room}->tid()." for room $room for client ".$client->peerhost();
$queues{$handle{$uuid}{threads}{$room}->tid()}->enqueue("stop");
@ -449,10 +452,10 @@ while(1)
# if there is a running command, stop it first and clean up (same as stop command, see above)
if(defined($socket_to_handle{$client}))
{
$uuid = $socket_to_handle{$client};
my $uuid = $socket_to_handle{$client};
# get all threads for this socket and send them a termination signal
my $temp = $handle{$uuid}{threads};
foreach $room (keys %$temp)
foreach my $room (keys %{$handle{$uuid}{threads}})
{
Log 2, "killing thread ".$handle{$uuid}{threads}{$room}->tid()." for room $room for client ".$client->peerhost();
$queues{$handle{$uuid}{threads}{$room}->tid()}->enqueue("stop");
@ -527,7 +530,7 @@ sub doQuery($$$)
my ($do_config, $do_room, $do_address, $do_uuid) = @_;
my $return;
my $socket;
my %values = %$do_config;
my %values = %{$do_config};
my $selector;
my $run = 1;
my @client_handle;
@ -547,7 +550,7 @@ sub doQuery($$$)
Type => SOCK_STREAM,
KeepAlive => 1,
Blocking => 1
) or ( $log_queue->enqueue(threads->tid()."|$room : could not create socket to ".$values{address}." - $! -"));
) or ( $log_queue->enqueue(threads->tid()."|$do_room : could not create socket to ".$values{address}." - $! -"));
$selector = IO::Select->new($client_socket);
@ -568,7 +571,7 @@ sub doQuery($$$)
if(defined($client_socket) and not $last_contact > (gettimeofday() - ($current_state eq "absence" ? $values{absence_timeout} : $values{presence_timeout}) - 60))
{
$log_queue->enqueue(threads->tid()."|$room socket to ".$values{address}.":".$values{port}." did not report anything in expected time, resetting socket (last contact: ".strftime("%Y-%m-%d %H:%M:%S", localtime($last_contact)).")");
$log_queue->enqueue(threads->tid()."|$do_room socket to ".$values{address}.":".$values{port}." did not report anything in expected time, resetting socket (last contact: ".strftime("%Y-%m-%d %H:%M:%S", localtime($last_contact)).")");
$selector->remove($client_socket);
$client_socket->shutdown(2);
@ -588,7 +591,7 @@ sub doQuery($$$)
}
elsif($cmd eq "stop")
{
$log_queue->enqueue(threads->tid()."|$room terminating thread ".threads->tid()." for $address");
$log_queue->enqueue(threads->tid()."|$do_room terminating thread ".threads->tid()." for ".$values{address});
$client_socket->shutdown(2) if(defined($client_socket));
$selector->remove($client_socket) if(defined($selector));
close($client_socket) if(defined($client_socket));
@ -620,10 +623,10 @@ sub doQuery($$$)
if(!$reconnect_count)
{
# Tell this the client;
$status_queue->enqueue("$do_uuid;$room;socket_closed");
$status_queue->enqueue("$do_uuid;$do_room;socket_closed");
# create a log message
$log_queue->enqueue(threads->tid()."|$room socket to ".$values{address}.":".$values{port}." for device $do_address closed. Trying to reconnect...");
$log_queue->enqueue(threads->tid()."|$do_room socket to ".$values{address}.":".$values{port}." for device $do_address closed. Trying to reconnect...");
}
# now try to re-establish the connection
@ -639,8 +642,8 @@ sub doQuery($$$)
if(defined($client_socket))
{
# give a success message
$log_queue->enqueue(threads->tid()."|$room reconnected to ".$values{address}.":".$values{port}." after $reconnect_count tries for device $do_address (UUID: $do_uuid)");
$status_queue->enqueue("$do_uuid;$room;socket_reconnected");
$log_queue->enqueue(threads->tid()."|$do_room reconnected to ".$values{address}.":".$values{port}." after $reconnect_count tries for device $do_address (UUID: $do_uuid)");
$status_queue->enqueue("$do_uuid;$do_room;socket_reconnected");
# reset the reconnect counter
$reconnect_count = 0;
@ -683,22 +686,22 @@ sub doQuery($$$)
if($return =~ /command accepted/)
{
# log this to the thread log queue
$log_queue->enqueue(threads->tid()."|$room accepted command for $do_address");
$log_queue->enqueue(threads->tid()."|$do_room accepted command for $do_address");
}
elsif($return =~ /command rejected/) # if the message is "command rejected" also log it to the log queue
{
$log_queue->enqueue(threads->tid()."|$room REJECTED command for $do_address");
$log_queue->enqueue(threads->tid()."|$do_room REJECTED command for $do_address");
}
else # else its a status message
{
# put the message to the status queue with uuid for identification and the room name
$status_queue->enqueue("$do_uuid;$room;".$return);
$status_queue->enqueue("$do_uuid;$do_room;".$return);
# if the state changes from present to absence
if(defined($previous_state) and $previous_state eq "present" and lc($return) =~ /^absence/)
{
# log the timout change to the log queue
$log_queue->enqueue(threads->tid()."|$room changing to absence timeout (".$values{absence_timeout}.") for device $do_address");
$log_queue->enqueue(threads->tid()."|$do_room changing to absence timeout (".$values{absence_timeout}.") for device $do_address");
$current_state = "absence";
@ -707,7 +710,7 @@ sub doQuery($$$)
}
elsif(defined($previous_state) and $previous_state eq "absence" and lc($return) =~ /^present/)
{
$log_queue->enqueue(threads->tid()."|$room changing to presence timeout (".$values{presence_timeout}.") for device $do_address");
$log_queue->enqueue(threads->tid()."|$do_room changing to presence timeout (".$values{presence_timeout}.") for device $do_address");
$current_state = "present";
@ -862,30 +865,85 @@ sub aggregateRooms
my @rooms;
my $key;
my $value;
my $temp_name;
my $first_key;
# get all present rooms
foreach $key (keys %$hash)
{
if($hash->{$key} ne "")
my $room_hash = $hash->{$key};
if(defined($room_hash->{state}) and $room_hash->{state} ne "")
{
($value, $name) = split(";", $hash->{$key});
my ($value, $data) = split(";", $hash->{$key});
if($value eq "present")
if($room_hash->{state} eq "present")
{
push @rooms, $key;
$temp_name = $name;
}
}
else
{
# if one room has no result return undef
return undef;
}
}
# if multiple rooms are present, try selection by highest RSSI
if(@rooms > 0)
{
my $rssi_addon_data_key = "rssi";
my $rssi_available = 1;
my $highest_value;
my $highest_key;
foreach $key (@rooms)
{
my $data = $hash->{$key}{data};
if(defined($data))
{
my ($a,$h) = parseParams($data,';');
if(@{$a} == 1 and keys(%{$h}) == 0) # old presenced device name => convert to new style
{
$hash->{$key}{data} = "device_name='".$hash->{$key}{data}."'";
$rssi_available = 0;
}
elsif(@{$a} == 0 and keys(%{$h}) > 0) # new addon data style
{
# check rssi
if($rssi_available and exists($h->{$rssi_addon_data_key}) and $h->{$rssi_addon_data_key} =~ /^-?(?:\d+\.)\d+$/)
{
if(!defined($highest_value) or (defined($highest_value) and $h->{$rssi_addon_data_key} > $highest_value))
{
$highest_value = $h->{rssi};
$highest_key = $key;
}
}
else
{
$rssi_available = 0;
}
}
else
{
Log 1, "invalid addon data received from room $key: $data";
}
}
}
if($rssi_available and defined($highest_key))
{
Log 2, "successful RSSI comparisation (highest $rssi_addon_data_key value $highest_value found in room $highest_key" if(@rooms > 1);
@rooms = ($highest_key);
}
}
if(@rooms > 0)
{
return "present;$temp_name;".join(",",sort @rooms);
return "present;rooms='".join(",",sort @rooms).(defined($hash->{$rooms[0]}{data}) ? "';".$hash->{$rooms[0]}{data} : "");
}
else
{
@ -939,3 +997,71 @@ sub Log($$)
}
}
#####################################
# parseParams() from fhem.pl by justme1968
sub parseParams($;$)
{
my($cmd, $separator) = @_;
$separator = ' ' if( !$separator );
my(@a, %h);
my @params;
if( ref($cmd) eq 'ARRAY' ) {
@params = @{$cmd};
} else {
@params = split($separator, $cmd);
}
while (@params) {
my $param = shift(@params);
my ($key, $value) = split( '=', $param, 2 );
if( !defined( $value ) ) {
$value = $key;
$key = undef;
}
#collect all parts until the closing ' or "
while( $param && $value =~ m/^('|")/ && $value !~ m/$1$/ ) {
my $next = shift(@params);
last if( !defined($next) );
$value .= $separator . $next;
}
#remove matching ' or " from the start and end
if( $value =~ m/^('|")/ && $value =~ m/$1$/ ) {
$value =~ s/^.(.*).$/$1/;
}
#collext all parts until opening { and closing } are matched
if( $value =~ m/^{/ ) { # } for match
my $count = 0;
for my $i (0..length($value)-1) {
my $c = substr($value, $i, 1);
++$count if( $c eq '{' );
--$count if( $c eq '}' );
}
while( $param && $count != 0 ) {
my $next = shift(@params);
last if( !defined($next) );
$value .= $separator . $next;
for my $i (0..length($next)-1) {
my $c = substr($next, $i, 1);
++$count if( $c eq '{' );
--$count if( $c eq '}' );
}
}
}
if( defined($key) ) {
$h{$key} = $value;
} else {
push @a, $value;
}
}
return(\@a, \%h);
}