mirror of
https://github.com/fhem/fhem-mirror.git
synced 2025-02-01 01:09:47 +00:00
PRESENCE/collectord: allow spaces for room names in config file (Forum: #19169), code make-up
git-svn-id: https://svn.fhem.de/fhem/trunk@8558 2b470e98-0d58-463d-a4d8-8e2adae1ed80
This commit is contained in:
parent
76db86933e
commit
e7ff7770f0
@ -65,46 +65,32 @@ my %queues;
|
|||||||
my $thread_counter = 0;
|
my $thread_counter = 0;
|
||||||
|
|
||||||
my %state;
|
my %state;
|
||||||
|
|
||||||
my %handle;
|
my %handle;
|
||||||
|
|
||||||
my %socket_to_handle;
|
my %socket_to_handle;
|
||||||
|
|
||||||
my $uuid;
|
my $uuid;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
Getopt::Long::Configure('bundling');
|
Getopt::Long::Configure('bundling');
|
||||||
GetOptions(
|
GetOptions(
|
||||||
"d" => \$opt_d, "daemon" => \$opt_d,
|
"d" => \$opt_d, "daemon" => \$opt_d,
|
||||||
"v+" => \$opt_v, "verbose+" => \$opt_v,
|
"v+" => \$opt_v, "verbose+" => \$opt_v,
|
||||||
"l=s" => \$opt_l, "logfile=s" => \$opt_l,
|
"l=s" => \$opt_l, "logfile=s" => \$opt_l,
|
||||||
"c=s" => \$opt_c, "configfile=s" => \$opt_c,
|
"c=s" => \$opt_c, "configfile=s" => \$opt_c,
|
||||||
"p=i" => \$opt_p, "port=i" => \$opt_p,
|
"p=i" => \$opt_p, "port=i" => \$opt_p,
|
||||||
"P=s" => \$opt_P, "pid-file=s" => \$opt_P,
|
"P=s" => \$opt_P, "pid-file=s" => \$opt_P,
|
||||||
"h" => \$opt_h, "help" => \$opt_h);
|
"h" => \$opt_h, "help" => \$opt_h
|
||||||
|
);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
Log 0, "=================================================" if($opt_l);
|
Log 0, "=================================================" if($opt_l);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
sub print_usage () {
|
sub print_usage () {
|
||||||
print "Usage:\n";
|
print "Usage:\n";
|
||||||
print " collectord -c <configfile> [-d] [-p <port>] [-P <pidfile>] \n";
|
print " collectord -c <configfile> [-d] [-p <port>] [-P <pidfile>] \n";
|
||||||
print " collectord [-h | --help]\n";
|
print " collectord [-h | --help]\n";
|
||||||
print "\n\nOptions:\n";
|
print "\n\nOptions:\n";
|
||||||
print " -c, --configfile <configfile>\n";
|
print " -c, --configfile <configfile>\n";
|
||||||
print " The config file which contains the room and timeout definitions\n";
|
print " The config file which contains the room and timeout definitions\n";
|
||||||
print " -p, --port\n";
|
print " -p, --port\n";
|
||||||
print " TCP Port which should be used (Default: 5222)\n";
|
print " TCP Port which should be used (Default: 5222)\n";
|
||||||
print " -P, --pid-file\n";
|
print " -P, --pid-file\n";
|
||||||
@ -117,11 +103,8 @@ sub print_usage () {
|
|||||||
print " log to the given logfile\n";
|
print " log to the given logfile\n";
|
||||||
print " -h, --help\n";
|
print " -h, --help\n";
|
||||||
print " Print detailed help screen\n";
|
print " Print detailed help screen\n";
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if($opt_h)
|
if($opt_h)
|
||||||
{
|
{
|
||||||
print_usage();
|
print_usage();
|
||||||
@ -135,7 +118,6 @@ if(-e "$opt_P")
|
|||||||
exit 1;
|
exit 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
if(not $opt_c)
|
if(not $opt_c)
|
||||||
{
|
{
|
||||||
print STDERR "no config file provided\n\n";
|
print STDERR "no config file provided\n\n";
|
||||||
@ -153,7 +135,6 @@ if(not -e "$opt_c" or not -r "$opt_c")
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
Log 0, "started with PID $$";
|
Log 0, "started with PID $$";
|
||||||
|
|
||||||
readConfig($opt_c);
|
readConfig($opt_c);
|
||||||
@ -163,21 +144,19 @@ if($opt_d)
|
|||||||
daemonize();
|
daemonize();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# Write PID file
|
||||||
|
|
||||||
open(PIDFILE, ">$opt_P");
|
open(PIDFILE, ">$opt_P");
|
||||||
print PIDFILE $$."\n";
|
print PIDFILE $$."\n";
|
||||||
close PIDFILE;
|
close PIDFILE;
|
||||||
|
|
||||||
|
|
||||||
$server = new IO::Socket::INET (
|
$server = new IO::Socket::INET (
|
||||||
LocalPort => $opt_p,
|
LocalPort => $opt_p,
|
||||||
Proto => 'tcp',
|
Proto => 'tcp',
|
||||||
Listen => 5,
|
Listen => 5,
|
||||||
Reuse => 1,
|
Reuse => 1,
|
||||||
Type => SOCK_STREAM,
|
Type => SOCK_STREAM,
|
||||||
KeepAlive => 1,
|
KeepAlive => 1,
|
||||||
Blocking => 0
|
Blocking => 0
|
||||||
) or die "error while creating socket: $!\n";
|
) or die "error while creating socket: $!\n";
|
||||||
|
|
||||||
Log 1, "created socket on ".$server->sockhost()." with port ".$server->sockport();
|
Log 1, "created socket on ".$server->sockhost()." with port ".$server->sockport();
|
||||||
@ -246,31 +225,31 @@ while(1)
|
|||||||
|
|
||||||
if(not $value =~ /^(absence|present)$/)
|
if(not $value =~ /^(absence|present)$/)
|
||||||
{
|
{
|
||||||
$handle{$uuid}{client}->send("$value;$room\n") if(defined($handle{$uuid}{client}));
|
$handle{$uuid}{client}->send("$value;$room\n") if(defined($handle{$uuid}{client}));
|
||||||
|
|
||||||
if($value eq "socket_closed")
|
if($value eq "socket_closed")
|
||||||
{
|
{
|
||||||
delete($state{$uuid}{rooms}{$room});
|
delete($state{$uuid}{rooms}{$room});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
$state{$uuid}{rooms}{$room} = $value.(defined($name)?";".$name:"");
|
$state{$uuid}{rooms}{$room} = $value.(defined($name)?";".$name:"");
|
||||||
|
|
||||||
$result = aggregateRooms($state{$uuid}{rooms});
|
$result = aggregateRooms($state{$uuid}{rooms});
|
||||||
|
|
||||||
if(defined($result))
|
if(defined($result))
|
||||||
{
|
{
|
||||||
if(not defined($state{$uuid}{lastresult}{value}) or (($state{$uuid}{lastresult}{value} eq "$result" and ($state{$uuid}{lastresult}{timestamp} + $handle{$uuid}{timeout}) < time()) or $state{$uuid}{lastresult}{value} ne "$result"))
|
if(not defined($state{$uuid}{lastresult}{value}) or (($state{$uuid}{lastresult}{value} eq "$result" and ($state{$uuid}{lastresult}{timestamp} + $handle{$uuid}{timeout}) < time()) or $state{$uuid}{lastresult}{value} ne "$result"))
|
||||||
{
|
{
|
||||||
if(defined($handle{$uuid}{client}))
|
if(defined($handle{$uuid}{client}))
|
||||||
{
|
{
|
||||||
$handle{$uuid}{client}->send("$result\n");
|
$handle{$uuid}{client}->send("$result\n");
|
||||||
$state{$uuid}{lastresult}{value} = "$result";
|
$state{$uuid}{lastresult}{value} = "$result";
|
||||||
$state{$uuid}{lastresult}{timestamp} = time();
|
$state{$uuid}{lastresult}{timestamp} = time();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -311,7 +290,7 @@ while(1)
|
|||||||
if($buf)
|
if($buf)
|
||||||
{
|
{
|
||||||
# replace leading and trailing white spaces
|
# replace leading and trailing white spaces
|
||||||
$buf =~ s/(^\s*|\s*$)//g;
|
$buf =~ s/(^\s*|\s*$)//g;
|
||||||
|
|
||||||
# if the message is a new command, accept the command and create threads for all rooms to process the command
|
# if the message is a new command, accept the command and create threads for all rooms to process the command
|
||||||
if($buf =~ /^\s*([0-9a-fA-F]{2}:){5}[0-9a-fA-F]{2}\s*\|\s*\d+\s*$/)
|
if($buf =~ /^\s*([0-9a-fA-F]{2}:){5}[0-9a-fA-F]{2}\s*\|\s*\d+\s*$/)
|
||||||
@ -349,8 +328,8 @@ while(1)
|
|||||||
# create a new uuid if not exist for socket
|
# create a new uuid if not exist for socket
|
||||||
if(not defined($socket_to_handle{$client}))
|
if(not defined($socket_to_handle{$client}))
|
||||||
{
|
{
|
||||||
$socket_to_handle{$client} = generateUUID();
|
$socket_to_handle{$client} = generateUUID();
|
||||||
Log 2, "generating new UUID for client ".$client->peerhost()." - ".$socket_to_handle{$client};
|
Log 2, "generating new UUID for client ".$client->peerhost()." - ".$socket_to_handle{$client};
|
||||||
}
|
}
|
||||||
|
|
||||||
$uuid = $socket_to_handle{$client};
|
$uuid = $socket_to_handle{$client};
|
||||||
@ -366,17 +345,17 @@ while(1)
|
|||||||
while (($room, $value) = each %config)
|
while (($room, $value) = each %config)
|
||||||
{
|
{
|
||||||
|
|
||||||
$thread_counter++;
|
$thread_counter++;
|
||||||
$queues{$thread_counter} = Thread::Queue->new();
|
$queues{$thread_counter} = Thread::Queue->new();
|
||||||
my $new_thread = threads->new(\&doQuery, ($value, $room, $address, $uuid));
|
my $new_thread = threads->new(\&doQuery, ($value, $room, $address, $uuid));
|
||||||
Log 1, "created thread ".$new_thread->tid()." for processing device $address in room $room for peer ".$client->peerhost()." (UUID: $uuid)";
|
Log 1, "created thread ".$new_thread->tid()." for processing device $address in room $room for peer ".$client->peerhost()." (UUID: $uuid)";
|
||||||
|
|
||||||
# detach from the thread, so the thread starts processing independantly
|
# detach from the thread, so the thread starts processing independantly
|
||||||
$new_thread->detach();
|
$new_thread->detach();
|
||||||
|
|
||||||
# save the socket/room relationship to know which thread belongs to which client request (for stop command)
|
# save the socket/room relationship to know which thread belongs to which client request (for stop command)
|
||||||
$handle{$uuid}{threads}{$room} = $new_thread;
|
$handle{$uuid}{threads}{$room} = $new_thread;
|
||||||
$state{$uuid}{rooms}{$room} = "";
|
$state{$uuid}{rooms}{$room} = "";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -472,7 +451,6 @@ while(1)
|
|||||||
|
|
||||||
# now close the socket, that's it
|
# now close the socket, that's it
|
||||||
close $client;
|
close $client;
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -487,9 +465,6 @@ while(1)
|
|||||||
Log 1, "server shutdown";
|
Log 1, "server shutdown";
|
||||||
exit;
|
exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Log 2, "leaving main loop";
|
Log 2, "leaving main loop";
|
||||||
@ -504,20 +479,19 @@ Log 2, "leaving main loop";
|
|||||||
# to fork the process from the terminal
|
# to fork the process from the terminal
|
||||||
sub daemonize
|
sub daemonize
|
||||||
{
|
{
|
||||||
|
|
||||||
POSIX::setsid or die "setsid $!";
|
POSIX::setsid or die "setsid $!";
|
||||||
|
|
||||||
my $pid = fork();
|
my $pid = fork();
|
||||||
|
|
||||||
if($pid < 0)
|
if($pid < 0)
|
||||||
{
|
{
|
||||||
print STDERR "cannot fork: $!\n";
|
print STDERR "cannot fork: $!\n";
|
||||||
exit 1;
|
exit 1;
|
||||||
}
|
}
|
||||||
elsif($pid)
|
elsif($pid)
|
||||||
{
|
{
|
||||||
Log 0, "forked with PID $pid";
|
Log 0, "forked with PID $pid";
|
||||||
exit 0;
|
exit 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
chdir "/";
|
chdir "/";
|
||||||
@ -529,16 +503,12 @@ sub daemonize
|
|||||||
open (STDIN, "</dev/null");
|
open (STDIN, "</dev/null");
|
||||||
open (STDOUT, ">/dev/null");
|
open (STDOUT, ">/dev/null");
|
||||||
open (STDERR, ">&STDOUT");
|
open (STDERR, ">&STDOUT");
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
# the thread subroutine which performs a request for a specific room
|
# the thread subroutine which performs a request for a specific room
|
||||||
sub doQuery($$$)
|
sub doQuery($$$)
|
||||||
{
|
{
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
my ($do_config, $do_room, $do_address, $do_uuid) = @_;
|
my ($do_config, $do_room, $do_address, $do_uuid) = @_;
|
||||||
my $return;
|
my $return;
|
||||||
my $socket;
|
my $socket;
|
||||||
@ -548,8 +518,6 @@ sub doQuery($$$)
|
|||||||
my @client_handle;
|
my @client_handle;
|
||||||
my $reconnect_count = 0;
|
my $reconnect_count = 0;
|
||||||
my $client_socket = undef;
|
my $client_socket = undef;
|
||||||
# if the thread gets a termination signal, the thread must be shutdown by itself
|
|
||||||
|
|
||||||
|
|
||||||
my $last_contact = gettimeofday();
|
my $last_contact = gettimeofday();
|
||||||
|
|
||||||
@ -558,206 +526,195 @@ sub doQuery($$$)
|
|||||||
my $current_state = "absence";
|
my $current_state = "absence";
|
||||||
|
|
||||||
$client_socket = new IO::Socket::INET (
|
$client_socket = new IO::Socket::INET (
|
||||||
PeerHost => $values{address},
|
PeerHost => $values{address},
|
||||||
PeerPort => $values{port},
|
PeerPort => $values{port},
|
||||||
Proto => 'tcp',
|
Proto => 'tcp',
|
||||||
Type => SOCK_STREAM,
|
Type => SOCK_STREAM,
|
||||||
KeepAlive => 1,
|
KeepAlive => 1,
|
||||||
Blocking => 1
|
Blocking => 1
|
||||||
) or ( $log_queue->enqueue(threads->tid()."|$room : could not create socket to ".$values{address}." - $! -"));
|
) or ( $log_queue->enqueue(threads->tid()."|$room : could not create socket to ".$values{address}." - $! -"));
|
||||||
|
|
||||||
$selector = IO::Select->new($client_socket);
|
$selector = IO::Select->new($client_socket);
|
||||||
|
|
||||||
if(defined($client_socket))
|
if(defined($client_socket))
|
||||||
{
|
{
|
||||||
|
# send the given address to the presence daemon
|
||||||
# send the given address to the presence daemon
|
$client_socket->send($do_address."|".$values{absence_timeout}."\n");
|
||||||
$client_socket->send($do_address."|".$values{absence_timeout}."\n");
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
$selector->remove($client_socket);
|
$selector->remove($client_socket);
|
||||||
$client_socket = undef;
|
$client_socket = undef;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
# thread main loop
|
# thread main loop
|
||||||
THREADLOOP: while($run)
|
THREADLOOP: while($run)
|
||||||
{
|
{
|
||||||
|
|
||||||
if(defined($client_socket) and not $last_contact > (gettimeofday() - ($current_state eq "absence" ? $values{absence_timeout} : $values{presence_timeout}) - 60))
|
if(defined($client_socket) and not $last_contact > (gettimeofday() - ($current_state eq "absence" ? $values{absence_timeout} : $values{presence_timeout}) - 60))
|
||||||
{
|
{
|
||||||
$log_queue->enqueue(threads->tid()."|$room socket to ".$values{address}.":".$values{port}." did not report anything in expected time, resetting socket (last contact: ".strftime("%Y-%m-%d %H:%M:%S", localtime($last_contact)).")");
|
$log_queue->enqueue(threads->tid()."|$room socket to ".$values{address}.":".$values{port}." did not report anything in expected time, resetting socket (last contact: ".strftime("%Y-%m-%d %H:%M:%S", localtime($last_contact)).")");
|
||||||
|
|
||||||
$selector->remove($client_socket);
|
$selector->remove($client_socket);
|
||||||
shutdown($client_socket, 2);
|
shutdown($client_socket, 2);
|
||||||
close($client_socket);
|
close($client_socket);
|
||||||
$client_socket = undef;
|
$client_socket = undef;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(exists($queues{threads->tid()}) and $queues{threads->tid()}->pending)
|
if(exists($queues{threads->tid()}) and $queues{threads->tid()}->pending)
|
||||||
{
|
{
|
||||||
$cmd = $queues{threads->tid()}->dequeue;
|
$cmd = $queues{threads->tid()}->dequeue;
|
||||||
$log_queue->enqueue(threads->tid()."|received command: $cmd");
|
$log_queue->enqueue(threads->tid()."|received command: $cmd");
|
||||||
|
|
||||||
if($cmd eq "now")
|
if($cmd eq "now")
|
||||||
{
|
{
|
||||||
$log_queue->enqueue(threads->tid()."|sending \"now\" command to ".$values{address}.":".$values{port});
|
$log_queue->enqueue(threads->tid()."|sending \"now\" command to ".$values{address}.":".$values{port});
|
||||||
$client_socket->send("now\n") if(defined($client_socket));
|
$client_socket->send("now\n") if(defined($client_socket));
|
||||||
}
|
}
|
||||||
elsif($cmd eq "stop")
|
elsif($cmd eq "stop")
|
||||||
{
|
{
|
||||||
$log_queue->enqueue(threads->tid()."|$room terminating thread ".threads->tid()." for $address");
|
$log_queue->enqueue(threads->tid()."|$room terminating thread ".threads->tid()." for $address");
|
||||||
$client_socket->shutdown() if(defined($client_socket));
|
$client_socket->shutdown() if(defined($client_socket));
|
||||||
$selector->remove($client_socket) if(defined($selector));
|
$selector->remove($client_socket) if(defined($selector));
|
||||||
close($client_socket) if(defined($client_socket));
|
close($client_socket) if(defined($client_socket));
|
||||||
$client_socket = undef;
|
$client_socket = undef;
|
||||||
delete($queues{threads->tid()}) if(exists($queues{threads->tid()}));
|
delete($queues{threads->tid()}) if(exists($queues{threads->tid()}));
|
||||||
$run = 0;
|
$run = 0;
|
||||||
last THREADLOOP;
|
last THREADLOOP;
|
||||||
}
|
}
|
||||||
elsif($cmd =~ /^new\|/)
|
elsif($cmd =~ /^new\|/)
|
||||||
{
|
{
|
||||||
($cmd, $do_address) = split("\\|", $cmd);
|
($cmd, $do_address) = split("\\|", $cmd);
|
||||||
|
|
||||||
$log_queue->enqueue(threads->tid()."|sending new address $do_address to ".$values{address}.":".$values{port});
|
$log_queue->enqueue(threads->tid()."|sending new address $do_address to ".$values{address}.":".$values{port});
|
||||||
|
|
||||||
if($current_state eq "present")
|
if($current_state eq "present")
|
||||||
{
|
{
|
||||||
$client_socket->send($do_address."|".$values{presence_timeout}."\n") if(defined($client_socket));
|
$client_socket->send($do_address."|".$values{presence_timeout}."\n") if(defined($client_socket));
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
$client_socket->send($do_address."|".$values{absence_timeout}."\n") if(defined($client_socket));
|
$client_socket->send($do_address."|".$values{absence_timeout}."\n") if(defined($client_socket));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if(not defined($client_socket))
|
if(not defined($client_socket))
|
||||||
{
|
{
|
||||||
# if it's the first occurance
|
# if it's the first occurance
|
||||||
if(!$reconnect_count)
|
if(!$reconnect_count)
|
||||||
{
|
{
|
||||||
# Tell this the client;
|
# Tell this the client;
|
||||||
$status_queue->enqueue("$do_uuid;$room;socket_closed");
|
$status_queue->enqueue("$do_uuid;$room;socket_closed");
|
||||||
|
|
||||||
# create a log message
|
# create a log message
|
||||||
$log_queue->enqueue(threads->tid()."|$room socket to ".$values{address}.":".$values{port}." for device $do_address closed. Trying to reconnect...");
|
$log_queue->enqueue(threads->tid()."|$room socket to ".$values{address}.":".$values{port}." for device $do_address closed. Trying to reconnect...");
|
||||||
}
|
}
|
||||||
|
|
||||||
# now try to re-establish the connection
|
# now try to re-establish the connection
|
||||||
$client_socket = new IO::Socket::INET (
|
$client_socket = new IO::Socket::INET (
|
||||||
PeerHost => $values{address},
|
PeerHost => $values{address},
|
||||||
PeerPort => $values{port},
|
PeerPort => $values{port},
|
||||||
Proto => 'tcp',
|
Proto => 'tcp',
|
||||||
Type => SOCK_STREAM,
|
Type => SOCK_STREAM,
|
||||||
KeepAlive => 1,
|
KeepAlive => 1,
|
||||||
Blocking => 1
|
Blocking => 1
|
||||||
) or ( $reconnect_count++ );
|
) or ( $reconnect_count++ );
|
||||||
|
|
||||||
if(defined($client_socket))
|
if(defined($client_socket))
|
||||||
{
|
{
|
||||||
# give a success message
|
# give a success message
|
||||||
$log_queue->enqueue(threads->tid()."|$room reconnected to ".$values{address}.":".$values{port}." after $reconnect_count tries for device $do_address (UUID: $do_uuid)");
|
$log_queue->enqueue(threads->tid()."|$room reconnected to ".$values{address}.":".$values{port}." after $reconnect_count tries for device $do_address (UUID: $do_uuid)");
|
||||||
$status_queue->enqueue("$do_uuid;$room;socket_reconnected");
|
$status_queue->enqueue("$do_uuid;$room;socket_reconnected");
|
||||||
|
|
||||||
# reset the reconnect counter
|
# reset the reconnect counter
|
||||||
$reconnect_count = 0;
|
$reconnect_count = 0;
|
||||||
|
|
||||||
# set the last contact date to now
|
# set the last contact date to now
|
||||||
$last_contact = gettimeofday();
|
$last_contact = gettimeofday();
|
||||||
|
|
||||||
# add the new established socket to the IO selector for incoming data monitoring.
|
# add the new established socket to the IO selector for incoming data monitoring.
|
||||||
$selector->add($client_socket);
|
$selector->add($client_socket);
|
||||||
# send the given address to the presence daemon
|
# send the given address to the presence daemon
|
||||||
$client_socket->send($do_address."|".$values{absence_timeout}."\n");
|
$client_socket->send($do_address."|".$values{absence_timeout}."\n");
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
sleep(9);
|
sleep(9);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
# if the socket has a message available
|
# if the socket has a message available
|
||||||
if(@client_handle = $selector->can_read(1))
|
if(@client_handle = $selector->can_read(1))
|
||||||
{
|
{
|
||||||
|
|
||||||
# get all socket handles which has a message available
|
# get all socket handles which has a message available
|
||||||
foreach my $local_client (@client_handle)
|
foreach my $local_client (@client_handle)
|
||||||
{
|
{
|
||||||
# get the message from the socket handle
|
# get the message from the socket handle
|
||||||
$return = <$local_client>;
|
$return = <$local_client>;
|
||||||
|
|
||||||
# if the message is defined (not EOF) handle the message...
|
# if the message is defined (not EOF) handle the message...
|
||||||
if($return)
|
if($return)
|
||||||
{
|
{
|
||||||
|
|
||||||
# set the last contact date
|
# set the last contact date
|
||||||
$last_contact = gettimeofday();
|
$last_contact = gettimeofday();
|
||||||
|
|
||||||
# remove trailing whitespaces and newlines
|
# remove trailing whitespaces and newlines
|
||||||
chomp($return);
|
chomp($return);
|
||||||
|
|
||||||
# if the message is "command accepted"
|
# if the message is "command accepted"
|
||||||
if($return =~ /command accepted/)
|
if($return =~ /command accepted/)
|
||||||
{
|
{
|
||||||
# log this to the thread log queue
|
# log this to the thread log queue
|
||||||
$log_queue->enqueue(threads->tid()."|$room accepted command for $do_address");
|
$log_queue->enqueue(threads->tid()."|$room accepted command for $do_address");
|
||||||
}
|
}
|
||||||
elsif($return =~ /command rejected/) # if the message is "command rejected" also log it to the log queue
|
elsif($return =~ /command rejected/) # if the message is "command rejected" also log it to the log queue
|
||||||
{
|
{
|
||||||
$log_queue->enqueue(threads->tid()."|$room REJECTED command for $do_address");
|
$log_queue->enqueue(threads->tid()."|$room REJECTED command for $do_address");
|
||||||
}
|
}
|
||||||
else # else its a status message
|
else # else its a status message
|
||||||
{
|
{
|
||||||
# put the message to the status queue with uuid for identification and the room name
|
# put the message to the status queue with uuid for identification and the room name
|
||||||
$status_queue->enqueue("$do_uuid;$room;".$return);
|
$status_queue->enqueue("$do_uuid;$room;".$return);
|
||||||
|
|
||||||
# if the state changes from present to absence
|
# if the state changes from present to absence
|
||||||
if(defined($previous_state) and $previous_state eq "present" and lc($return) =~ /^absence/)
|
if(defined($previous_state) and $previous_state eq "present" and lc($return) =~ /^absence/)
|
||||||
{
|
{
|
||||||
# log the timout change to the log queue
|
# log the timout change to the log queue
|
||||||
$log_queue->enqueue(threads->tid()."|$room changing to absence timeout (".$values{absence_timeout}.") for device $do_address");
|
$log_queue->enqueue(threads->tid()."|$room changing to absence timeout (".$values{absence_timeout}.") for device $do_address");
|
||||||
|
|
||||||
$current_state = "absence";
|
$current_state = "absence";
|
||||||
|
|
||||||
# send the new command with the configured absence timeout
|
# send the new command with the configured absence timeout
|
||||||
$local_client->send($do_address."|".$values{absence_timeout}."\n");
|
$local_client->send($do_address."|".$values{absence_timeout}."\n");
|
||||||
}
|
}
|
||||||
elsif(defined($previous_state) and $previous_state eq "absence" and lc($return) =~ /^present/)
|
elsif(defined($previous_state) and $previous_state eq "absence" and lc($return) =~ /^present/)
|
||||||
{
|
{
|
||||||
$log_queue->enqueue(threads->tid()."|$room changing to presence timeout (".$values{presence_timeout}.") for device $do_address");
|
$log_queue->enqueue(threads->tid()."|$room changing to presence timeout (".$values{presence_timeout}.") for device $do_address");
|
||||||
|
|
||||||
$current_state = "present";
|
$current_state = "present";
|
||||||
|
|
||||||
# if the state changes from absence to present, set the presence timeout
|
# if the state changes from absence to present, set the presence timeout
|
||||||
$local_client->send($do_address."|".$values{presence_timeout}."\n");
|
$local_client->send($do_address."|".$values{presence_timeout}."\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
# set the previous state to the current state
|
# set the previous state to the current state
|
||||||
($previous_state, undef) = split(";", lc($return));
|
($previous_state, undef) = split(";", lc($return));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
else # the socket is EOF which means the connection was closed
|
||||||
|
{
|
||||||
}
|
|
||||||
else # the socket is EOF which means the connection was closed
|
|
||||||
{
|
|
||||||
|
|
||||||
$selector->remove($local_client);
|
|
||||||
|
|
||||||
shutdown($local_client, 2);
|
|
||||||
close($local_client);
|
|
||||||
$client_socket = undef;
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
$selector->remove($local_client);
|
||||||
|
|
||||||
|
shutdown($local_client, 2);
|
||||||
|
close($local_client);
|
||||||
|
$client_socket = undef;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
$log_queue->enqueue(threads->tid()."|exiting thread");
|
$log_queue->enqueue(threads->tid()."|exiting thread");
|
||||||
@ -768,14 +725,13 @@ sub doQuery($$$)
|
|||||||
sub readConfig
|
sub readConfig
|
||||||
{
|
{
|
||||||
|
|
||||||
my ($ini) = @_;
|
my ($ini) = @_;
|
||||||
|
|
||||||
my $section;
|
my $section;
|
||||||
my $keyword;
|
my $keyword;
|
||||||
my $value;
|
my $value;
|
||||||
|
|
||||||
my $errorcount = 0;
|
|
||||||
|
|
||||||
|
my $errorcount = 0;
|
||||||
|
|
||||||
Log 1, "reading configuration file";
|
Log 1, "reading configuration file";
|
||||||
|
|
||||||
@ -784,11 +740,11 @@ my $errorcount = 0;
|
|||||||
open (INI, "$ini") or (print STDERR timestamp()."Can't open $ini: $!\n" and exit(1));
|
open (INI, "$ini") or (print STDERR timestamp()."Can't open $ini: $!\n" and exit(1));
|
||||||
while (<INI>) {
|
while (<INI>) {
|
||||||
chomp;
|
chomp;
|
||||||
if (/^\s*?\[(\w+?)\]/) {
|
if (/^\s*?\[([^\]\n\r]+?)\]/) {
|
||||||
$section = $1;
|
$section = $1;
|
||||||
|
|
||||||
}
|
}
|
||||||
if (/^\s*(\w+?)=(.+?)\s*(#.*)?$/) {
|
if (/^\s*(\w+?)=(.+?)\s*(#.*)?$/ and defined($section)) {
|
||||||
$keyword = $1;
|
$keyword = $1;
|
||||||
$value = $2 ;
|
$value = $2 ;
|
||||||
# put them into hash
|
# put them into hash
|
||||||
@ -802,130 +758,124 @@ my $errorcount = 0;
|
|||||||
foreach my $room (keys %config)
|
foreach my $room (keys %config)
|
||||||
{
|
{
|
||||||
|
|
||||||
if(not exists($config{$room}{address}))
|
if(not exists($config{$room}{address}))
|
||||||
{
|
{
|
||||||
Log 0, "room $room has no value for address configured";
|
Log 0, "room $room has no value for address configured";
|
||||||
$errorcount++;
|
$errorcount++;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
if(not $config{$room}{address} =~ /^[a-zA-Z0-9.-]+$/)
|
if(not $config{$room}{address} =~ /^[a-zA-Z0-9.-]+$/)
|
||||||
{
|
{
|
||||||
Log 0, "no valid address for room $room found: ".$config{$room}{address};
|
Log 0, "no valid address for room $room found: ".$config{$room}{address};
|
||||||
$errorcount++;
|
$errorcount++;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if(not exists($config{$room}{port}))
|
||||||
|
{
|
||||||
|
Log 0, "room >>$room<< has no value for >>port<< configured";
|
||||||
|
$errorcount++;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if(not $config{$room}{port} =~ /^\d+$/)
|
||||||
|
{
|
||||||
|
Log 0, "value >>port<< for room >>$room<< is not a number: ".$config{$room}{port};
|
||||||
|
$errorcount++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if(not exists($config{$room}{absence_timeout}))
|
||||||
|
{
|
||||||
|
Log 0, "room >>$room<< has no value for >>absence_timeout<< configured";
|
||||||
|
$errorcount++;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if(not $config{$room}{absence_timeout} =~ /^\d+$/)
|
||||||
|
{
|
||||||
|
Log 0, "value >>absence_timeout<< value for room >>$room<< is not a number: ".$config{$room}{absence_timeout};
|
||||||
|
$errorcount++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if(not exists($config{$room}{port}))
|
if(not exists($config{$room}{presence_timeout}))
|
||||||
{
|
{
|
||||||
Log 0, "room >>$room<< has no value for >>port<< configured";
|
Log 0, "room >>$room<< has no value for >>presence_timeout<< configured";
|
||||||
$errorcount++;
|
$errorcount++;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
if(not $config{$room}{port} =~ /^\d+$/)
|
if(not $config{$room}{presence_timeout} =~ /^\d+$/)
|
||||||
{
|
{
|
||||||
Log 0, "value >>port<< for room >>$room<< is not a number: ".$config{$room}{port};
|
Log 0, "value >>presence_timeout<< value for room >>$room<< is not a number: ".$config{$room}{presence_timeout};
|
||||||
$errorcount++;
|
$errorcount++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if(not exists($config{$room}{absence_timeout}))
|
foreach my $param (keys %{$config{$room}})
|
||||||
{
|
{
|
||||||
Log 0, "room >>$room<< has no value for >>absence_timeout<< configured";
|
if(not $param =~ /(address|port|absence_timeout|presence_timeout)/)
|
||||||
$errorcount++;
|
{
|
||||||
}
|
Log 0, "invalid parameter $param in room $room";
|
||||||
else
|
$errorcount++;
|
||||||
{
|
}
|
||||||
if(not $config{$room}{absence_timeout} =~ /^\d+$/)
|
|
||||||
{
|
|
||||||
Log 0, "value >>absence_timeout<< value for room >>$room<< is not a number: ".$config{$room}{absence_timeout};
|
|
||||||
$errorcount++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if(not exists($config{$room}{presence_timeout}))
|
}
|
||||||
{
|
|
||||||
Log 0, "room >>$room<< has no value for >>presence_timeout<< configured";
|
|
||||||
$errorcount++;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
if(not $config{$room}{presence_timeout} =~ /^\d+$/)
|
|
||||||
{
|
|
||||||
Log 0, "value >>presence_timeout<< value for room >>$room<< is not a number: ".$config{$room}{presence_timeout};
|
|
||||||
$errorcount++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
foreach my $param (keys %{$config{$room}})
|
|
||||||
{
|
|
||||||
if(not $param =~ /(address|port|absence_timeout|presence_timeout)/)
|
|
||||||
{
|
|
||||||
Log 0, "invalid parameter $param in room $room";
|
|
||||||
$errorcount++;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if($errorcount)
|
if($errorcount)
|
||||||
{
|
{
|
||||||
print STDERR timestamp()." found $errorcount config errors. exiting....\n";
|
print STDERR timestamp()." found $errorcount config errors. exiting....\n";
|
||||||
exit 2;
|
exit 2;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
Log 0, "no config errors found";
|
Log 0, "no config errors found";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
sub aggregateRooms
|
sub aggregateRooms
|
||||||
|
|
||||||
{
|
{
|
||||||
|
|
||||||
my ($hash) = @_;
|
my ($hash) = @_;
|
||||||
|
|
||||||
my $previous = "absence";
|
my $previous = "absence";
|
||||||
|
|
||||||
my @rooms;
|
my @rooms;
|
||||||
my $key;
|
my $key;
|
||||||
my $value;
|
my $value;
|
||||||
my $temp_name;
|
my $temp_name;
|
||||||
foreach $key (keys %$hash)
|
|
||||||
{
|
|
||||||
if($hash->{$key} ne "")
|
|
||||||
{
|
|
||||||
|
|
||||||
($value, $name) = split(";", $hash->{$key});
|
foreach $key (keys %$hash)
|
||||||
if($value eq "present")
|
{
|
||||||
{
|
if($hash->{$key} ne "")
|
||||||
push @rooms, $key;
|
{
|
||||||
$temp_name = $name;
|
($value, $name) = split(";", $hash->{$key});
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
return undef;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
if(@rooms > 0)
|
|
||||||
{
|
|
||||||
return "present;$temp_name;".join(",",sort @rooms);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
|
|
||||||
return "absence";
|
|
||||||
}
|
|
||||||
|
|
||||||
|
if($value eq "present")
|
||||||
|
{
|
||||||
|
push @rooms, $key;
|
||||||
|
$temp_name = $name;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
return undef;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if(@rooms > 0)
|
||||||
|
{
|
||||||
|
return "present;$temp_name;".join(",",sort @rooms);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
return "absence";
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -935,17 +885,15 @@ sub generateUUID
|
|||||||
|
|
||||||
while(defined($handle{$uuid}))
|
while(defined($handle{$uuid}))
|
||||||
{
|
{
|
||||||
$uuid = Digest::MD5::md5_hex(rand);
|
$uuid = Digest::MD5::md5_hex(rand);
|
||||||
}
|
}
|
||||||
|
|
||||||
return $uuid;
|
return $uuid;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
sub timestamp
|
sub timestamp
|
||||||
{
|
{
|
||||||
|
return POSIX::strftime("%Y-%m-%d %H:%M:%S",localtime);
|
||||||
return POSIX::strftime("%Y-%m-%d %H:%M:%S",localtime);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -956,8 +904,9 @@ sub Log($$)
|
|||||||
|
|
||||||
if($message =~ /^\d+\|/)
|
if($message =~ /^\d+\|/)
|
||||||
{
|
{
|
||||||
($thread, $message) = split("\\|", $message);
|
($thread, $message) = split("\\|", $message);
|
||||||
}
|
}
|
||||||
|
|
||||||
if($loglevel <= $opt_v)
|
if($loglevel <= $opt_v)
|
||||||
{
|
{
|
||||||
if($opt_l)
|
if($opt_l)
|
||||||
@ -973,6 +922,5 @@ sub Log($$)
|
|||||||
|
|
||||||
close(LOGFILE);
|
close(LOGFILE);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user