mirror of
https://github.com/fhem/fhem-mirror.git
synced 2025-01-31 18:59:33 +00:00
d2e6431c34
git-svn-id: https://svn.fhem.de/fhem/trunk@16035 2b470e98-0d58-463d-a4d8-8e2adae1ed80
1098 lines
36 KiB
Perl
Executable File
1098 lines
36 KiB
Perl
Executable File
#!/usr/bin/perl
|
|
#############################################################################
|
|
# $Id$
|
|
##############################################################################
|
|
#
|
|
# collectord
|
|
# Connects to several presenced instances to check for multiple bluetooth devices
|
|
# for their presence state and report a summary state to the 73_PRESENCE.pm module
|
|
#
|
|
# Copyright by Markus Bloch
|
|
# e-mail: Notausstieg0309@googlemail.com
|
|
#
|
|
# This file is part of fhem.
|
|
#
|
|
# Fhem is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# Fhem is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with fhem. If not, see <http://www.gnu.org/licenses/>.
|
|
#
|
|
##############################################################################
|
|
|
|
|
|
use IO::Socket;
|
|
use IO::Select;
|
|
use POSIX;
|
|
#use Data::Dumper;
|
|
use File::Basename;
|
|
use Getopt::Long;
|
|
use threads;
|
|
use Thread::Queue;
|
|
use Time::HiRes;
|
|
use Time::HiRes qw(gettimeofday);
|
|
|
|
use warnings;
|
|
use strict;
|
|
use Digest::MD5;
|
|
|
|
my $new_client;
|
|
my $server;
|
|
my $client;
|
|
my $buf;
|
|
|
|
sub Log($$);
|
|
sub parseParams($;$);
|
|
sub closeClientConnection($);
|
|
sub stopClientThreads($);
|
|
|
|
my $opt_d;
|
|
my $opt_h;
|
|
my $opt_v = 0;
|
|
my $opt_p = 5222;
|
|
my $opt_P = "/var/run/".basename($0).".pid";
|
|
my $opt_l;
|
|
my $opt_c;
|
|
my $opt_n;
|
|
|
|
my %config;
|
|
|
|
my %queues;
|
|
my $thread_counter = 0;
|
|
|
|
my %state;
|
|
my %handle;
|
|
my %socket_to_handle;
|
|
|
|
|
|
|
|
$SIG{__DIE__} = sub {
|
|
my ($msg) = @_;
|
|
|
|
Log 1, "PERL ERROR: $msg";
|
|
|
|
};
|
|
|
|
|
|
$SIG{__WARN__} = sub {
|
|
my ($msg) = @_;
|
|
|
|
Log 1, "PERL WARN: $msg";
|
|
|
|
};
|
|
|
|
|
|
Getopt::Long::Configure('bundling');
|
|
GetOptions(
|
|
"d" => \$opt_d, "daemon" => \$opt_d,
|
|
"n" => \$opt_n, "no-timestamps" => \$opt_n,
|
|
"v+" => \$opt_v, "verbose+" => \$opt_v,
|
|
"l=s" => \$opt_l, "logfile=s" => \$opt_l,
|
|
"c=s" => \$opt_c, "configfile=s" => \$opt_c,
|
|
"p=i" => \$opt_p, "port=i" => \$opt_p,
|
|
"P=s" => \$opt_P, "pid-file=s" => \$opt_P,
|
|
"h" => \$opt_h, "help" => \$opt_h
|
|
);
|
|
|
|
|
|
Log 0, "=================================================" if($opt_l);
|
|
|
|
sub print_usage () {
|
|
print "Usage:\n";
|
|
print " collectord -c <configfile> [-d] [-p <port>] [-P <pidfile>] \n";
|
|
print " collectord [-h | --help]\n";
|
|
print "\n\nOptions:\n";
|
|
print " -c, --configfile <configfile>\n";
|
|
print " The config file which contains the room and timeout definitions\n";
|
|
print " -p, --port\n";
|
|
print " TCP Port which should be used (Default: 5222)\n";
|
|
print " -P, --pid-file\n";
|
|
print " PID file for storing the local process id (Default: /var/run/".basename($0).".pid)\n";
|
|
print " -d, --daemon\n";
|
|
print " detach from terminal and run as background daemon\n";
|
|
print " -n, --no-timestamps\n";
|
|
print " do not output timestamps in log messages\n";
|
|
print " -v, --verbose\n";
|
|
print " Print detailed log output (can be used multiple times to increase the loglevel, max. 2 times)\n";
|
|
print " -l, --logfile <logfile>\n";
|
|
print " log to the given logfile\n";
|
|
print " -h, --help\n";
|
|
print " Print detailed help screen\n";
|
|
}
|
|
|
|
if($opt_h)
|
|
{
|
|
print_usage();
|
|
exit;
|
|
}
|
|
|
|
if(-e "$opt_P")
|
|
{
|
|
print STDERR timestamp()."another process already running (PID file found at $opt_P)\n";
|
|
print STDERR timestamp()."aborted...\n";
|
|
exit 1;
|
|
}
|
|
|
|
if(not $opt_c)
|
|
{
|
|
print STDERR "no config file provided\n\n";
|
|
print_usage();
|
|
exit 1;
|
|
|
|
|
|
}
|
|
|
|
if(not -e "$opt_c" or not -r "$opt_c")
|
|
{
|
|
|
|
print STDERR "config-file $opt_c could not be loaded\n";
|
|
exit 1;
|
|
|
|
}
|
|
|
|
Log 0, "started with PID $$";
|
|
|
|
readConfig($opt_c);
|
|
|
|
if($opt_d)
|
|
{
|
|
daemonize();
|
|
}
|
|
|
|
# Write PID file
|
|
open(PIDFILE, ">$opt_P");
|
|
print PIDFILE $$."\n";
|
|
close PIDFILE;
|
|
|
|
$server = new IO::Socket::INET (
|
|
LocalPort => $opt_p,
|
|
Proto => 'tcp',
|
|
Listen => 5,
|
|
Reuse => 1,
|
|
Type => SOCK_STREAM,
|
|
KeepAlive => 1,
|
|
Blocking => 0
|
|
) or die "error while creating socket: $!\n";
|
|
|
|
Log 1, "created socket on ".$server->sockhost()." with port ".$server->sockport();
|
|
|
|
my $listener = IO::Select->new();
|
|
$listener->add($server);
|
|
|
|
|
|
|
|
my @new_handles;
|
|
my %child_handles;
|
|
my %child_config;
|
|
|
|
my $address;
|
|
my $name;
|
|
my $timeout;
|
|
my $write_handle;
|
|
my $server_pid;
|
|
my @threads;
|
|
|
|
my $sig_received = undef;
|
|
|
|
$SIG{HUP} = sub { $sig_received = "SIGHUP"; };
|
|
$SIG{INT} = sub { $sig_received = "SIGINT"; };
|
|
$SIG{TERM} = sub { $sig_received = "SIGTERM"; };
|
|
$SIG{KILL} = sub { $sig_received = "SIGKILL"; };
|
|
$SIG{QUIT} = sub { $sig_received = "SIGQUIT"; };
|
|
$SIG{ABRT} = sub { $sig_received = "SIGABRT"; };
|
|
|
|
$server_pid = $$ unless(defined($server_pid));
|
|
|
|
my $status_queue = Thread::Queue->new();
|
|
my $log_queue = Thread::Queue->new();
|
|
|
|
|
|
Log 2, "finished initialization. entering main loop";
|
|
|
|
while(1)
|
|
{
|
|
|
|
# Cleaning up the status hash for obsolete devices
|
|
foreach my $uuid (keys %state)
|
|
{
|
|
my %handle_to_socket = reverse %socket_to_handle;
|
|
unless(exists($handle_to_socket{$uuid}))
|
|
{
|
|
Log 2, "cleaning up status values (UUID: $uuid)";
|
|
delete $state{$uuid};
|
|
}
|
|
|
|
}
|
|
|
|
# process all status messages from all threads via status queue
|
|
while($status_queue->pending)
|
|
{
|
|
my ($uuid,$room,$value,$data) = split(";", $status_queue->dequeue, 4);
|
|
|
|
Log 2, "processing state message for device ".(defined($name)?$name." ":"")."in room $room (UUID: $uuid) - value: $value".(defined($data) ? " - data: $data" : "");
|
|
|
|
if(not $value =~ /^(absence|present)$/)
|
|
{
|
|
$handle{$uuid}{client}->send("$value;room=$room\n") if(defined($handle{$uuid}{client}));
|
|
|
|
if($value eq "socket_closed")
|
|
{
|
|
delete($state{$uuid}{rooms}{$room});
|
|
}
|
|
}
|
|
else
|
|
{
|
|
$state{$uuid}{rooms}{$room}{state} = $value;
|
|
|
|
if(defined($data))
|
|
{
|
|
$state{$uuid}{rooms}{$room}{data} = $data;
|
|
}
|
|
else
|
|
{
|
|
delete $state{$uuid}{rooms}{$room}{data};
|
|
}
|
|
|
|
my $result = aggregateRooms($state{$uuid}{rooms});
|
|
|
|
if(defined($result))
|
|
{
|
|
if(not defined($state{$uuid}{lastresult}{value}) or (($state{$uuid}{lastresult}{value} eq "$result" and ($state{$uuid}{lastresult}{timestamp} + $handle{$uuid}{timeout}) < time()) or $state{$uuid}{lastresult}{value} ne "$result"))
|
|
{
|
|
if(defined($handle{$uuid}{client}))
|
|
{
|
|
$handle{$uuid}{client}->send("$result\n");
|
|
$state{$uuid}{lastresult}{value} = "$result";
|
|
$state{$uuid}{lastresult}{timestamp} = time();
|
|
}
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
#print Dumper(\%state);
|
|
}
|
|
|
|
|
|
# If a thread has something reported via Log Queue, print it out if verbose is activated
|
|
while($log_queue->pending)
|
|
{
|
|
Log 2, $log_queue->dequeue;
|
|
}
|
|
|
|
|
|
|
|
# If a INET socket has anything to report
|
|
if(@new_handles = $listener->can_read(1))
|
|
{
|
|
|
|
foreach my $client (@new_handles)
|
|
{
|
|
# if the socket is the server socket, accept new client and add it to the socket selector
|
|
if($client == $server)
|
|
{
|
|
$new_client = $server->accept();
|
|
|
|
setsockopt($new_client, SOL_SOCKET, SO_KEEPALIVE, 1); # activate keep-alive
|
|
|
|
$listener->add($new_client);
|
|
Log 1, "new connection from ".$new_client->peerhost().":".$new_client->peerport();
|
|
}
|
|
else # else is must be a client, so read the message and process it
|
|
{
|
|
$buf = '';
|
|
$buf = <$client>;
|
|
|
|
# if the message is defined, it is a real message, else the connection is closed (EOF)
|
|
if($buf)
|
|
{
|
|
# replace leading and trailing white spaces
|
|
$buf =~ s/(^\s*|\s*$)//g;
|
|
|
|
# if the message is a new command, accept the command and create threads for all rooms to process the command
|
|
if($buf =~ /^\s*([0-9a-fA-F]{2}:){5}[0-9a-fA-F]{2}\s*\|\s*\d+\s*$/)
|
|
{
|
|
# send the acknowledgment back to the sender
|
|
$client->send("command accepted\n");
|
|
Log 2, "received new command from ".$client->peerhost().":".$client->peerport()." - $buf";
|
|
|
|
# Split the message into bluetooth address and the timeout value
|
|
# (timeout is ignored within the collectord, as it is given by configuration)
|
|
($address, $timeout) = split("\\|", $buf);
|
|
|
|
# remove any containing white spaces
|
|
$address =~ s/\s*//g;
|
|
$timeout =~ s/\s*//g;
|
|
|
|
# if the client has already a request running, stop at first the old request
|
|
if(defined($socket_to_handle{$client}))
|
|
{
|
|
my $uuid = $socket_to_handle{$client};
|
|
|
|
# get all threads for this socket and send them a termination signal
|
|
foreach my $room (keys %{$handle{$uuid}{threads}})
|
|
{
|
|
Log 2, "sending thread ".$handle{$uuid}{threads}{$room}->tid()." new address $address for room $room";
|
|
$queues{$handle{$uuid}{threads}{$room}->tid()}->enqueue("new|$address");
|
|
$state{$uuid}{rooms}{$room}{state} = "" if(exists($state{$uuid}{rooms}{$room}));
|
|
delete($state{$uuid}{rooms}{$room}{data});
|
|
}
|
|
|
|
$handle{$uuid}{timeout} = $timeout;
|
|
$state{$uuid}{lastresult}{timestamp} = 0;
|
|
}
|
|
else
|
|
{
|
|
# create a new uuid if not exist for socket
|
|
if(not defined($socket_to_handle{$client}))
|
|
{
|
|
$socket_to_handle{$client} = generateUUID();
|
|
Log 2, "generating new UUID for client ".$client->peerhost()." - ".$socket_to_handle{$client};
|
|
}
|
|
|
|
my $uuid = $socket_to_handle{$client};
|
|
|
|
$handle{$uuid}{address} = $address;
|
|
$handle{$uuid}{client} = $client;
|
|
$handle{$uuid}{timeout} = $timeout;
|
|
|
|
$state{$uuid}{lastresult}{value} = "absence";
|
|
$state{$uuid}{lastresult}{timestamp} = 0;
|
|
|
|
# create a new reqester thread for each configured room to perform the query
|
|
while (my ($room, $value) = each %config)
|
|
{
|
|
$thread_counter++;
|
|
$queues{$thread_counter} = Thread::Queue->new();
|
|
my $new_thread = threads->new(\&doQuery, ($value, $room, $address, $uuid));
|
|
Log 1, "created thread ".$new_thread->tid()." for processing device $address in room $room for peer ".$client->peerhost()." (UUID: $uuid)";
|
|
|
|
# detach from the thread, so the thread starts processing independantly
|
|
$new_thread->detach();
|
|
|
|
# save the socket/room relationship to know which thread belongs to which client request (for stop command)
|
|
$handle{$uuid}{threads}{$room} = $new_thread;
|
|
$state{$uuid}{rooms}{$room}{state} = "";
|
|
delete($state{$uuid}{rooms}{$room}{data});
|
|
}
|
|
}
|
|
}
|
|
elsif(lc($buf) =~ /^\s*now\s*$/) # if a now command is received, all threads need to be signaled to send a now command to the presenced server
|
|
{
|
|
Log 2, "received now command from client ".$client->peerhost();
|
|
|
|
# just to be sure if the client has really a running request
|
|
if(defined($socket_to_handle{$client}))
|
|
{
|
|
my $uuid = $socket_to_handle{$client};
|
|
|
|
# get all threads for this socket and send them a now command
|
|
foreach my $room (keys %{$handle{$uuid}{threads}})
|
|
{
|
|
Log 2, "signalling thread ".$handle{$uuid}{threads}{$room}->tid()." to send \"now\"-request for room $room for client ".$client->peerhost();
|
|
|
|
$queues{$handle{$uuid}{threads}{$room}->tid()}->enqueue("now");
|
|
|
|
# delete state and room data to get a fresh state
|
|
$state{$uuid}{rooms}{$room}{state} = "" if(exists($state{$uuid}{rooms}{$room}));
|
|
delete($state{$uuid}{rooms}{$room}{data});
|
|
}
|
|
|
|
delete($state{$uuid}{lastresult}) if(exists($state{$uuid}{lastresult}));
|
|
$client->send("command accepted\n");
|
|
}
|
|
else
|
|
{
|
|
# if there is no command running, just tell the client he's wrong
|
|
$client->send("no command running\n");
|
|
}
|
|
}
|
|
elsif(lc($buf) =~ /^\s*stop\s*$/) # if a stop command is received, the running request threads must be stopped
|
|
{
|
|
Log 1, "received stop command from client ".$client->peerhost();
|
|
|
|
# just to be sure if the client has really a running request
|
|
if(stopClientThreads($client))
|
|
{
|
|
$client->send("command accepted\n");
|
|
}
|
|
else
|
|
{
|
|
# if there is no command running, just tell the client he's wrong
|
|
$client->send("no command running\n");
|
|
}
|
|
}
|
|
elsif(lc($buf) =~ /^\s*ping\s*$/) #
|
|
{
|
|
Log 1, "received ping command from client ".$client->peerhost();
|
|
$client->send("pong\n");
|
|
|
|
closeClientConnection($client);
|
|
}
|
|
else
|
|
{ # if the message does not match a regular command or a stop signal, just tell the client and make a entry for logging.
|
|
$client->send("command rejected\n");
|
|
Log 1, "received invalid command >>$buf<< from client ".$client->peerhost();
|
|
}
|
|
|
|
}
|
|
else # if the message is not defined (EOF) the connection was closed. Now let's clean up
|
|
{
|
|
closeClientConnection($client);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
# in case we have received a process signal, remove the pid file and shutdown
|
|
if(defined($sig_received))
|
|
{
|
|
Log 1, "Caught $sig_received exiting";
|
|
unlink($opt_P);
|
|
Log 1, "removed PID-File $opt_P";
|
|
Log 1, "server shutdown";
|
|
exit;
|
|
}
|
|
}
|
|
|
|
Log 2, "leaving main loop";
|
|
|
|
########################################################################################################################
|
|
#
|
|
# Subroutine definitions
|
|
#
|
|
########################################################################################################################
|
|
|
|
|
|
# to fork the process from the terminal
|
|
sub daemonize
|
|
{
|
|
POSIX::setsid or die "setsid $!";
|
|
|
|
my $pid = fork();
|
|
|
|
if($pid < 0)
|
|
{
|
|
print STDERR "cannot fork: $!\n";
|
|
exit 1;
|
|
}
|
|
elsif($pid)
|
|
{
|
|
Log 0, "forked with PID $pid";
|
|
exit 0;
|
|
}
|
|
|
|
chdir "/";
|
|
umask 0;
|
|
|
|
foreach (0 .. (POSIX::sysconf (&POSIX::_SC_OPEN_MAX) || 1024)) { POSIX::close $_ }
|
|
|
|
# cut off any input and output
|
|
open (STDIN, "</dev/null");
|
|
open (STDOUT, ">/dev/null");
|
|
open (STDERR, ">&STDOUT");
|
|
}
|
|
|
|
# the thread subroutine which performs a request for a specific room
|
|
sub doQuery($$$)
|
|
{
|
|
|
|
my ($do_config, $do_room, $do_address, $do_uuid) = @_;
|
|
my $return;
|
|
my $socket;
|
|
my %values = %{$do_config};
|
|
my $selector;
|
|
my $run = 1;
|
|
my @client_handle;
|
|
my $reconnect_count = 0;
|
|
my $client_socket = undef;
|
|
|
|
my $last_contact = gettimeofday();
|
|
|
|
my $cmd;
|
|
my $previous_state = "absence";
|
|
my $current_state = "absence";
|
|
|
|
$client_socket = new IO::Socket::INET (
|
|
PeerHost => $values{address},
|
|
PeerPort => $values{port},
|
|
Proto => 'tcp',
|
|
Type => SOCK_STREAM,
|
|
KeepAlive => 1,
|
|
Blocking => 1
|
|
) or ( $log_queue->enqueue(threads->tid()."|$do_room : could not create socket to ".$values{address}." - $! -"));
|
|
|
|
$selector = IO::Select->new($client_socket);
|
|
|
|
if(defined($client_socket))
|
|
{
|
|
# send the given address to the presence daemon
|
|
$client_socket->send($do_address."|".$values{absence_timeout}."\n");
|
|
}
|
|
else
|
|
{
|
|
$selector->remove($client_socket);
|
|
$client_socket = undef;
|
|
}
|
|
|
|
# thread main loop
|
|
THREADLOOP: while($run)
|
|
{
|
|
|
|
if(defined($client_socket) and not $last_contact > (gettimeofday() - ($current_state eq "absence" ? $values{absence_timeout} : $values{presence_timeout}) - 60))
|
|
{
|
|
$log_queue->enqueue(threads->tid()."|$do_room socket to ".$values{address}.":".$values{port}." did not report anything in expected time, resetting socket (last contact: ".strftime("%Y-%m-%d %H:%M:%S", localtime($last_contact)).")");
|
|
|
|
$selector->remove($client_socket);
|
|
$client_socket->shutdown(2);
|
|
close($client_socket);
|
|
$client_socket = undef;
|
|
}
|
|
|
|
if(exists($queues{threads->tid()}) and $queues{threads->tid()}->pending)
|
|
{
|
|
$cmd = $queues{threads->tid()}->dequeue;
|
|
$log_queue->enqueue(threads->tid()."|received command: $cmd");
|
|
|
|
if($cmd eq "now")
|
|
{
|
|
$log_queue->enqueue(threads->tid()."|sending \"now\" command to ".$values{address}.":".$values{port});
|
|
$client_socket->send("now\n") if(defined($client_socket));
|
|
}
|
|
elsif($cmd eq "stop")
|
|
{
|
|
$log_queue->enqueue(threads->tid()."|$do_room terminating thread ".threads->tid()." for ".$values{address});
|
|
$client_socket->shutdown(2) if(defined($client_socket));
|
|
$selector->remove($client_socket) if(defined($selector));
|
|
close($client_socket) if(defined($client_socket));
|
|
$client_socket = undef;
|
|
delete($queues{threads->tid()}) if(exists($queues{threads->tid()}));
|
|
$run = 0;
|
|
last THREADLOOP;
|
|
}
|
|
elsif($cmd =~ /^new\|/)
|
|
{
|
|
($cmd, $do_address) = split("\\|", $cmd);
|
|
|
|
$log_queue->enqueue(threads->tid()."|sending new address $do_address to ".$values{address}.":".$values{port});
|
|
|
|
if($current_state eq "present")
|
|
{
|
|
$client_socket->send($do_address."|".$values{presence_timeout}."\n") if(defined($client_socket));
|
|
}
|
|
else
|
|
{
|
|
$client_socket->send($do_address."|".$values{absence_timeout}."\n") if(defined($client_socket));
|
|
}
|
|
}
|
|
}
|
|
|
|
if(not defined($client_socket))
|
|
{
|
|
# if it's the first occurance
|
|
if(!$reconnect_count)
|
|
{
|
|
# Tell this the client;
|
|
$status_queue->enqueue("$do_uuid;$do_room;socket_closed");
|
|
|
|
# create a log message
|
|
$log_queue->enqueue(threads->tid()."|$do_room socket to ".$values{address}.":".$values{port}." for device $do_address closed. Trying to reconnect...");
|
|
}
|
|
|
|
# now try to re-establish the connection
|
|
$client_socket = new IO::Socket::INET (
|
|
PeerHost => $values{address},
|
|
PeerPort => $values{port},
|
|
Proto => 'tcp',
|
|
Type => SOCK_STREAM,
|
|
KeepAlive => 1,
|
|
Blocking => 1
|
|
) or ( $reconnect_count++ );
|
|
|
|
if(defined($client_socket))
|
|
{
|
|
# give a success message
|
|
$log_queue->enqueue(threads->tid()."|$do_room reconnected to ".$values{address}.":".$values{port}." after $reconnect_count tries for device $do_address (UUID: $do_uuid)");
|
|
$status_queue->enqueue("$do_uuid;$do_room;socket_reconnected");
|
|
|
|
# reset the reconnect counter
|
|
$reconnect_count = 0;
|
|
|
|
# set the last contact date to now
|
|
$last_contact = gettimeofday();
|
|
|
|
# add the new established socket to the IO selector for incoming data monitoring.
|
|
$selector->add($client_socket);
|
|
# send the given address to the presence daemon
|
|
$client_socket->send($do_address."|".$values{absence_timeout}."\n");
|
|
}
|
|
else
|
|
{
|
|
sleep(9);
|
|
}
|
|
}
|
|
|
|
# if the socket has a message available
|
|
if(@client_handle = $selector->can_read(1))
|
|
{
|
|
|
|
# get all socket handles which has a message available
|
|
foreach my $local_client (@client_handle)
|
|
{
|
|
# get the message from the socket handle
|
|
$return = <$local_client>;
|
|
|
|
# if the message is defined (not EOF) handle the message...
|
|
if($return)
|
|
{
|
|
|
|
# set the last contact date
|
|
$last_contact = gettimeofday();
|
|
|
|
# remove trailing whitespaces and newlines
|
|
chomp($return);
|
|
|
|
# if the message is "command accepted"
|
|
if($return =~ /command accepted/)
|
|
{
|
|
# log this to the thread log queue
|
|
$log_queue->enqueue(threads->tid()."|$do_room accepted command for $do_address");
|
|
}
|
|
elsif($return =~ /command rejected/) # if the message is "command rejected" also log it to the log queue
|
|
{
|
|
$log_queue->enqueue(threads->tid()."|$do_room REJECTED command for $do_address");
|
|
}
|
|
else # else its a status message
|
|
{
|
|
# put the message to the status queue with uuid for identification and the room name
|
|
$status_queue->enqueue("$do_uuid;$do_room;".$return);
|
|
|
|
# if the state changes from present to absence
|
|
if(defined($previous_state) and $previous_state eq "present" and lc($return) =~ /^absence/)
|
|
{
|
|
# log the timout change to the log queue
|
|
$log_queue->enqueue(threads->tid()."|$do_room changing to absence timeout (".$values{absence_timeout}.") for device $do_address");
|
|
|
|
$current_state = "absence";
|
|
|
|
# send the new command with the configured absence timeout
|
|
$local_client->send($do_address."|".$values{absence_timeout}."\n");
|
|
}
|
|
elsif(defined($previous_state) and $previous_state eq "absence" and lc($return) =~ /^present/)
|
|
{
|
|
$log_queue->enqueue(threads->tid()."|$do_room changing to presence timeout (".$values{presence_timeout}.") for device $do_address");
|
|
|
|
$current_state = "present";
|
|
|
|
# if the state changes from absence to present, set the presence timeout
|
|
$local_client->send($do_address."|".$values{presence_timeout}."\n");
|
|
}
|
|
|
|
# set the previous state to the current state
|
|
($previous_state, undef) = split(";", lc($return));
|
|
}
|
|
}
|
|
else # the socket is EOF which means the connection was closed
|
|
{
|
|
|
|
$selector->remove($local_client);
|
|
|
|
$local_client->shutdown(2);
|
|
close($local_client);
|
|
$client_socket = undef;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
$log_queue->enqueue(threads->tid()."|exiting thread");
|
|
}
|
|
|
|
|
|
|
|
sub readConfig
|
|
{
|
|
|
|
my ($ini) = @_;
|
|
|
|
my $section;
|
|
my $keyword;
|
|
my $value;
|
|
|
|
my $errorcount = 0;
|
|
|
|
Log 1, "reading configuration file";
|
|
|
|
%config = ();
|
|
|
|
open (INI, "$ini") or (print STDERR timestamp()."Can't open $ini: $!\n" and exit(1));
|
|
while (<INI>) {
|
|
chomp;
|
|
if (/^\s*?\[([^\]\n\r]+?)\]/) {
|
|
$section = $1;
|
|
|
|
}
|
|
if (/^\s*(\w+?)=(.+?)\s*(#.*)?$/ and defined($section)) {
|
|
$keyword = $1;
|
|
$value = $2 ;
|
|
# put them into hash
|
|
$config{$section}{$keyword} = $value;
|
|
}
|
|
}
|
|
close (INI);
|
|
|
|
|
|
# validating config
|
|
foreach my $room (keys %config)
|
|
{
|
|
|
|
if(not exists($config{$room}{address}))
|
|
{
|
|
Log 0, "room $room has no value for address configured";
|
|
$errorcount++;
|
|
}
|
|
else
|
|
{
|
|
if(not $config{$room}{address} =~ /^[a-zA-Z0-9.-]+$/)
|
|
{
|
|
Log 0, "no valid address for room $room found: ".$config{$room}{address};
|
|
$errorcount++;
|
|
}
|
|
|
|
}
|
|
|
|
if(not exists($config{$room}{port}))
|
|
{
|
|
Log 0, "room >>$room<< has no value for >>port<< configured";
|
|
$errorcount++;
|
|
}
|
|
else
|
|
{
|
|
if(not $config{$room}{port} =~ /^\d+$/)
|
|
{
|
|
Log 0, "value >>port<< for room >>$room<< is not a number: ".$config{$room}{port};
|
|
$errorcount++;
|
|
}
|
|
}
|
|
|
|
if(not exists($config{$room}{absence_timeout}))
|
|
{
|
|
Log 0, "room >>$room<< has no value for >>absence_timeout<< configured";
|
|
$errorcount++;
|
|
}
|
|
else
|
|
{
|
|
if(not $config{$room}{absence_timeout} =~ /^\d+$/)
|
|
{
|
|
Log 0, "value >>absence_timeout<< value for room >>$room<< is not a number: ".$config{$room}{absence_timeout};
|
|
$errorcount++;
|
|
}
|
|
}
|
|
|
|
if(not exists($config{$room}{presence_timeout}))
|
|
{
|
|
Log 0, "room >>$room<< has no value for >>presence_timeout<< configured";
|
|
$errorcount++;
|
|
}
|
|
else
|
|
{
|
|
if(not $config{$room}{presence_timeout} =~ /^\d+$/)
|
|
{
|
|
Log 0, "value >>presence_timeout<< value for room >>$room<< is not a number: ".$config{$room}{presence_timeout};
|
|
$errorcount++;
|
|
}
|
|
}
|
|
|
|
foreach my $param (keys %{$config{$room}})
|
|
{
|
|
if(not $param =~ /(address|port|absence_timeout|presence_timeout)/)
|
|
{
|
|
Log 0, "invalid parameter $param in room $room";
|
|
$errorcount++;
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
if($errorcount)
|
|
{
|
|
print STDERR timestamp()."found $errorcount config errors. exiting....\n";
|
|
exit 2;
|
|
}
|
|
else
|
|
{
|
|
Log 0, "no config errors found";
|
|
}
|
|
}
|
|
|
|
|
|
sub aggregateRooms
|
|
{
|
|
|
|
my ($hash) = @_;
|
|
|
|
my $previous = "absence";
|
|
my %rssi_results = ();
|
|
my $hroom;
|
|
my @rooms;
|
|
my $key;
|
|
|
|
my $first_key;
|
|
|
|
# get all present rooms
|
|
foreach $key (keys %$hash)
|
|
{
|
|
my $room_hash = $hash->{$key};
|
|
|
|
if(defined($room_hash->{state}) and $room_hash->{state} ne "")
|
|
{
|
|
my ($value, $data) = split(";", $hash->{$key});
|
|
|
|
if($room_hash->{state} eq "present")
|
|
{
|
|
push @rooms, $key;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
# if one room has no result return undef
|
|
return undef;
|
|
}
|
|
}
|
|
|
|
|
|
# if multiple rooms are present, try selection by highest RSSI
|
|
if(@rooms > 0)
|
|
{
|
|
my $rssi_addon_data_key = "rssi";
|
|
my $rssi_available = 1;
|
|
my $highest_value;
|
|
my $highest_key;
|
|
|
|
foreach $key (@rooms)
|
|
{
|
|
my $data = $hash->{$key}{data};
|
|
|
|
if(defined($data))
|
|
{
|
|
my ($a,$h) = parseParams($data,';');
|
|
|
|
if(@{$a} == 1 and keys(%{$h}) == 0) # old presenced device name => convert to new style
|
|
{
|
|
$hash->{$key}{data} = "device_name='".$hash->{$key}{data}."'";
|
|
$rssi_available = 0;
|
|
}
|
|
elsif(@{$a} == 0 and keys(%{$h}) > 0) # new addon data style
|
|
{
|
|
# check rssi
|
|
if(exists($h->{$rssi_addon_data_key}) and $h->{$rssi_addon_data_key} =~ /^-?(?:\d+\.)?\d+$/)
|
|
{
|
|
if(!defined($highest_value) or (defined($highest_value) and $h->{$rssi_addon_data_key} > $highest_value))
|
|
{
|
|
$highest_value = $h->{$rssi_addon_data_key};
|
|
$highest_key = $key;
|
|
}
|
|
|
|
$rssi_results{$key} = $h->{$rssi_addon_data_key};
|
|
}
|
|
else
|
|
{
|
|
$rssi_available = 0;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
Log 1, "invalid addon data received from room $key: $data";
|
|
}
|
|
}
|
|
}
|
|
|
|
if($rssi_available and defined($highest_key))
|
|
{
|
|
Log 2, "successful RSSI comparisation (highest $rssi_addon_data_key value $highest_value found in room $highest_key" if(@rooms > 1);
|
|
$hroom = $highest_key;
|
|
}
|
|
}
|
|
|
|
if(@rooms > 0)
|
|
{
|
|
my $rssi_data = join(";", map("rssi_".$_."='".$rssi_results{$_}."'", map {s/\s+/_/rg } keys %rssi_results));
|
|
my $ret = "present".
|
|
(defined($hroom) ? ";room='".$hroom."'" : "").
|
|
";rooms='".join(",",sort @rooms)."'".
|
|
(defined($hroom) ? ";".$hash->{$hroom}{data} : (defined($hash->{$rooms[0]}{data}) ? ";".$hash->{$rooms[0]}{data} : "")).
|
|
(defined($rssi_data) ? ";".$rssi_data : "");
|
|
}
|
|
else
|
|
{
|
|
return "absence;room=;rooms=;rssi=;";
|
|
}
|
|
}
|
|
|
|
|
|
sub generateUUID
|
|
{
|
|
my $uuid = Digest::MD5::md5_hex(rand);
|
|
|
|
while(defined($handle{$uuid}))
|
|
{
|
|
$uuid = Digest::MD5::md5_hex(rand);
|
|
}
|
|
|
|
return $uuid;
|
|
}
|
|
|
|
sub timestamp
|
|
{
|
|
return POSIX::strftime("%Y-%m-%d %H:%M:%S - ",localtime) unless($opt_n);
|
|
return "";
|
|
}
|
|
|
|
|
|
sub Log($$)
|
|
{
|
|
my ($loglevel, $message) = @_;
|
|
my $thread = 0;
|
|
|
|
if($message =~ /^\d+\|/)
|
|
{
|
|
($thread, $message) = split("\\|", $message);
|
|
}
|
|
|
|
if($loglevel <= $opt_v)
|
|
{
|
|
if($opt_l)
|
|
{
|
|
open(LOGFILE, ">>$opt_l") or die ("could not open logfile: $opt_l");
|
|
}
|
|
else
|
|
{
|
|
open (LOGFILE, ">&STDOUT") or die("cannot open STDOUT");
|
|
}
|
|
|
|
print LOGFILE ($opt_l?"":"\r").timestamp().($opt_v >= 2 ? ($thread > 0 ? "(Thread $thread)" : "(Main Thread)")." - ":"").$message."\n";
|
|
|
|
close(LOGFILE);
|
|
}
|
|
}
|
|
|
|
|
|
#####################################
|
|
# parseParams() from fhem.pl by justme1968
|
|
sub parseParams($;$)
|
|
{
|
|
my($cmd, $separator) = @_;
|
|
$separator = ' ' if( !$separator );
|
|
my(@a, %h);
|
|
|
|
my @params;
|
|
if( ref($cmd) eq 'ARRAY' ) {
|
|
@params = @{$cmd};
|
|
} else {
|
|
@params = split($separator, $cmd);
|
|
}
|
|
|
|
while (@params) {
|
|
my $param = shift(@params);
|
|
my ($key, $value) = split( '=', $param, 2 );
|
|
|
|
if( !defined( $value ) ) {
|
|
$value = $key;
|
|
$key = undef;
|
|
}
|
|
|
|
#collect all parts until the closing ' or "
|
|
while( $param && $value =~ m/^('|")/ && $value !~ m/$1$/ ) {
|
|
my $next = shift(@params);
|
|
last if( !defined($next) );
|
|
$value .= $separator . $next;
|
|
}
|
|
#remove matching ' or " from the start and end
|
|
if( $value =~ m/^('|")/ && $value =~ m/$1$/ ) {
|
|
$value =~ s/^.(.*).$/$1/;
|
|
}
|
|
|
|
#collext all parts until opening { and closing } are matched
|
|
if( $value =~ m/^{/ ) { # } for match
|
|
my $count = 0;
|
|
for my $i (0..length($value)-1) {
|
|
my $c = substr($value, $i, 1);
|
|
++$count if( $c eq '{' );
|
|
--$count if( $c eq '}' );
|
|
}
|
|
|
|
while( $param && $count != 0 ) {
|
|
my $next = shift(@params);
|
|
last if( !defined($next) );
|
|
$value .= $separator . $next;
|
|
|
|
for my $i (0..length($next)-1) {
|
|
my $c = substr($next, $i, 1);
|
|
++$count if( $c eq '{' );
|
|
--$count if( $c eq '}' );
|
|
}
|
|
}
|
|
}
|
|
|
|
if( defined($key) ) {
|
|
$h{$key} = $value;
|
|
} else {
|
|
push @a, $value;
|
|
}
|
|
|
|
}
|
|
|
|
return(\@a, \%h);
|
|
}
|
|
|
|
sub closeClientConnection($)
|
|
{
|
|
my ($client) = @_;
|
|
|
|
# make a log entry and remove the socket from the socket selector
|
|
Log 1, "closed connection from ".$client->peerhost();
|
|
$listener->remove($client);
|
|
|
|
# if there is a running command, stop it first and clean up
|
|
stopClientThreads($client);
|
|
|
|
# now close the socket, that's it
|
|
close $client;
|
|
}
|
|
|
|
sub stopClientThreads($)
|
|
{
|
|
my ($client) = @_;
|
|
|
|
# if there is a running command, stop it first and clean up (same as stop command, see above)
|
|
if(defined($socket_to_handle{$client}))
|
|
{
|
|
my $uuid = $socket_to_handle{$client};
|
|
|
|
# get all threads for this socket and send them a termination signal
|
|
foreach my $room (keys %{$handle{$uuid}{threads}})
|
|
{
|
|
Log 2, "killing thread ".$handle{$uuid}{threads}{$room}->tid()." for room $room for client ".$client->peerhost();
|
|
$queues{$handle{$uuid}{threads}{$room}->tid()}->enqueue("stop");
|
|
delete($handle{$uuid}{threads}{$room});
|
|
}
|
|
|
|
# when all threads are signaled, delete all relationship entry for this client
|
|
delete($handle{$uuid});
|
|
delete($socket_to_handle{$client});
|
|
|
|
return 1;
|
|
}
|
|
else
|
|
{
|
|
return 0;
|
|
}
|
|
}
|