# $Id$
# collectord
# Connects to several presenced instances to check for multiple bluetooth devices
# for their presence state and report a summary state to the 73_PRESENCE.pm module
# Copyright by Markus Bloch
# e-mail: Notausstieg0309@googlemail.com
# This file is part of fhem.
# Fhem is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
# Fhem is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with fhem. If not, see <http://www.gnu.org/licenses/>.
use IO::Socket;
use IO::Select;
use POSIX;
#use Data::Dumper;
use File::Basename;
use Getopt::Long;
use threads;
use Thread::Queue;
use Time::HiRes;
use Time::HiRes qw(gettimeofday);
use warnings;
use strict;
use Digest::MD5;
my $new_client;
my $server;
my $client;
my $buf;
my $opt_d;
my $opt_h;
my $opt_v;
my $opt_p = 5222;
my $opt_P = "/var/run/".basename($0).".pid";
my $opt_l;
my $opt_c;
my %config;
my %state;
my %handle;
my %socket_to_handle;
my $uuid;
"d" => \$opt_d, "daemon" => \$opt_d,
"v" => \$opt_v, "verbose" => \$opt_v,
"l=s" => \$opt_l, "logfile=s" => \$opt_l,
"c=s" => \$opt_c, "configfile=s" => \$opt_c,
"p=i" => \$opt_p, "port=i" => \$opt_p,
"P=s" => \$opt_P, "pid-file=s" => \$opt_P,
"h" => \$opt_h, "help" => \$opt_h);
open(STDOUT, ">>$opt_l") or die ("could not open logfile: $opt_l");
print timestamp()."=================================================\n" if($opt_v);
sub print_usage () {
print "Usage:\n";
print " collectord -c <configfile> [-d] [-p <port>] [-P <pidfile>] \n";
print " collectord [-h | --help]\n";
print "\n\nOptions:\n";
print " -c, --configfile <configfile>\n";
print " The config file which contains the room and timeout definitions\n";
print " -p, --port\n";
print " TCP Port which should be used (Default: 5222)\n";
print " -P, --pid-file\n";
print " PID file for storing the local process id (Default: /var/run/".basename($0).".pid)\n";
print " -d, --daemon\n";
print " detach from terminal and run as background daemon\n";
print " -v, --verbose\n";
print " Print detailed log output\n";
print " -l, --logfile <logfile>\n";
print " log to the given logfile\n";
print " -h, --help\n";
print " Print detailed help screen\n";
if(not $opt_c)
print STDERR "no config file provided\n\n";
exit 1;
if(not -e "$opt_c" or not -r "$opt_c")
print STDERR "config-file $opt_c could not be loaded\n";
exit 1;
print timestamp()."started with PID $$\n" if($opt_v);
if(-e "$opt_P")
print STDERR timestamp()."another process already running (PID file found at $opt_P)\n";
print STDERR timestamp()."aborted...\n";
exit 1;
open(PIDFILE, ">$opt_P");
print PIDFILE $$."\n";
close PIDFILE;
$server = new IO::Socket::INET (
LocalPort => $opt_p,
Proto => 'tcp',
Listen => 5,
Reuse => 1,
KeepAlive => 1,
Blocking => 0
) or die "error while creating socket: $!\n";
print timestamp()."created socket on ".$server->sockhost()." with port ".$server->sockport()."\n" if($opt_v);
my $listener = IO::Select->new();
my @new_handles;
my %child_handles;
my %child_config;
my $address;
my $name;
my $timeout;
my $write_handle;
my $server_pid;
my @threads;
my $sig_received = undef;
$SIG{HUP} = sub { $sig_received = "SIGHUP"; };
$SIG{INT} = sub { $sig_received = "SIGINT"; };
$SIG{TERM} = sub { $sig_received = "SIGTERM"; };
$SIG{KILL} = sub { $sig_received = "SIGKILL"; };
$SIG{QUIT} = sub { $sig_received = "SIGQUIT"; };
$SIG{ABRT} = sub { $sig_received = "SIGABRT"; };
$server_pid = $$ unless(defined($server_pid));
my $value;
my $room;
my $rooms_ref;
my $result;
my $status_queue = Thread::Queue->new();
my $log_queue = Thread::Queue->new();
my $logline;
# Cleaning up the status hash for obsolete devices
foreach $uuid (keys %state)
my %handle_to_socket = reverse %socket_to_handle;
print timestamp()."cleaning up status values (UUID: $uuid)\n" if($opt_v);
delete $state{$uuid};
delete $socket_to_handle{$handle_to_socket{$uuid}};
# process all status messages from all threads via status queue
($uuid,$room,$value,$name) = split(";", $status_queue->dequeue);
if(not $value =~ /^(absence|present)$/)
$handle{$uuid}{client}->send("$value;$room\n") if(defined($handle{$uuid}{client}));
if($value eq "socket_closed")
$state{$uuid}{rooms}{$room} = $value.(defined($name)?";".$name:"");
$result = aggregateRooms($state{$uuid}{rooms});
if(not defined($state{$uuid}{lastresult}{value}) or (($state{$uuid}{lastresult}{value} eq "present;$result" and ($state{$uuid}{lastresult}{timestamp} + $handle{$uuid}{timeout}) < time()) or $state{$uuid}{lastresult}{value} ne "present;$result"))
$state{$uuid}{lastresult}{value} = "present;$result";
$state{$uuid}{lastresult}{timestamp} = time();
if(not defined($state{$uuid}{lastresult}{value}) or (($state{$uuid}{lastresult}{value} eq "absence" and ($state{$uuid}{lastresult}{timestamp} + $handle{$uuid}{timeout}) < time()) or $state{$uuid}{lastresult}{value} ne "absence"))
$state{$uuid}{lastresult}{value} = "absence";
$state{$uuid}{lastresult}{timestamp} = time();
#print Dumper(%state);
# If a thread has something reported via Log Queue, print it out if verbose is activated
$logline = $log_queue->dequeue;
print timestamp().$logline if($opt_v);
$logline = undef;
# If a INET socket has anything to report
if(@new_handles = $listener->can_read(1))
foreach my $client (@new_handles)
# if the socket is the server socket, accept new client and add it to the socket selector
if($client == $server)
$new_client = $server->accept();
print timestamp()."new connection from ".$new_client->peerhost().":".$new_client->peerport()."\n" if($opt_v);
else # else is must be a client, so read the message and process it
$buf = '';
$buf = <$client>;
# if the message is defined, it is a real message, else the connection is closed (EOF)
# replace leading and trailing white spaces
$buf =~ s/(^\s*|\s*$)//g;
# if the message is a new command, accept the command and create threads for all rooms to process the command
if($buf =~ /^\s*([0-9a-fA-F]{2}:){5}[0-9a-fA-F]{2}\s*\|\s*\d+\s*$/)
# send the acknowledgment back to the sender
$client->send("command accepted\n");
print timestamp()."received new command from ".$client->peerhost().":".$client->peerport()." - $buf\n" if($opt_v);
# Split the message into bluetooth address and the timeout value
# (timeout is ignored within the collectord, as it is given by configuration)
($address, $timeout) = split("\\|", $buf);
# remove any containing white spaces
$address =~ s/\s*//g;
$timeout =~ s/\s*//g;
# if the client has already a request running, stop at first the old request
$uuid = $socket_to_handle{$client};
# get all threads for this socket and send them a termination signal
my $temp = $handle{$uuid}{threads};
foreach $room (keys %$temp)
print timestamp()."killing thread ".$handle{$uuid}{threads}{$room}->tid()." for room $room for client ".$client->peerhost()."\n" if($opt_v);
# when all threads are signaled, delete all relationship entry for this client
# create a new uuid if not exist for socket
if(not defined($socket_to_handle{$client}))
$socket_to_handle{$client} = generateUUID();
print timestamp()."generating new UUID for client ".$client->peerhost()." - ".$socket_to_handle{$client}."\n" if($opt_v);
$uuid = $socket_to_handle{$client};
$handle{$uuid}{address} = $address;
$handle{$uuid}{client} = $client;
$handle{$uuid}{timeout} = $timeout;
$state{$uuid}{lastresult}{value} = "absence";
$state{$uuid}{lastresult}{timestamp} = 0;
# create a new reqester thread for each configured room to perform the query
while (($room, $value) = each %config)
my $new_thread = threads->new(\&doQuery, ($value, $room, $address, $uuid));
print timestamp()."created thread ".$new_thread->tid()." for processing device $address in room $room for peer ".$client->peerhost()." (UUID: $uuid)\n" if($opt_v);
# detach from the thread, so the thread starts processing independantly
# save the socket/room relationship to know which thread belongs to which client request (for stop command)
$handle{$uuid}{threads}{$room} = $new_thread;
elsif(lc($buf) =~ /^\s*stop\s*$/) # if a stop command is received, the running request threads must be stopped
print timestamp()."received stop command from client ".$client->peerhost()."\n" if($opt_v);
# just to be sure if the client has really a running request
$uuid = $socket_to_handle{$client};
# get all threads for this socket and send them a termination signal
my $temp = $handle{$uuid}{threads};
foreach $room (keys %$temp)
print timestamp()."killing thread ".$handle{$uuid}{threads}{$room}->tid()." for room $room for client ".$client->peerhost()."\n" if($opt_v);
# when all threads are signaled, delete all relationship entry for this client
# if there is no command running, just tell the client he's wrong
$client->send("no command running\n");
{ # if the message does not match a regular command or a stop signal, just tell the client and make a entry for logging.
$client->send("command rejected\n");
print timestamp()."received invalid command >>$buf<< from client ".$client->peerhost()."\n" if($opt_v);
else # if the message is not defined (EOF) the connection was closed. Now let's clean up
# make a log entry and remove the socket from the socket selector
print timestamp()."closed connection from ".$client->peerhost()."\n" if($opt_v);
# if there is a running command, stop it first and clean up (same as stop command, see above)
$uuid = $socket_to_handle{$client};
# get all threads for this socket and send them a termination signal
my $temp = $handle{$uuid}{threads};
foreach $room (keys %$temp)
print timestamp()."killing thread ".$handle{$uuid}{threads}{$room}->tid()." for room $room for client ".$client->peerhost()."\n" if($opt_v);
# when all threads are signaled, delete all relationship entry for this client
# now close the socket, that's it
close $client;
# in case we have received a process signal, remove the pid file and shutdown
print timestamp()."Caught $sig_received exiting\n" if($opt_v);
print timestamp()."removed PID-File $opt_P\n" if($opt_v);
print timestamp()."server shutdown\n" if($opt_v);
# Subroutine definitions
# to fork the process from the terminal
sub daemonize
POSIX::setsid or die "setsid $!";
my $pid = fork();
if($pid < 0)
print "fork: $!\n";
exit 1;
print timestamp()."forked with PID $pid\n";
exit 0;
chdir "/";
umask 0;
foreach (0 .. (POSIX::sysconf (&POSIX::_SC_OPEN_MAX) || 1024)) { POSIX::close $_ }
# cut off any input and output
open (STDIN, "</dev/null");
open (STDOUT, ">/dev/null");
open (STDERR, ">&STDOUT");
# in case the logging parameter is given, open the logfile
open(STDOUT, ">>$opt_l") or die ("could not open logfile: $opt_l");
# the thread subroutine which performs a request for a specific room
sub doQuery($$$)
my ($do_config, $do_room, $do_address, $do_uuid) = @_;
my $return;
my $socket;
my %values = %$do_config;
my $selector;
my @client_handle;
my $reconnect_count = 0;
my $last_contact = gettimeofday();
my $previous_state = "absence";
my $current_state = "absence";
my $client_socket = new IO::Socket::INET (
PeerHost => $values{address},
PeerPort => $values{port},
Proto => 'tcp',
KeepAlive => 1,
Blocking => 1
) or ( $log_queue->enqueue("$room: could not create socket to ".$values{address}." - $! - \n"));
# if the thread gets a termination signal, the thread must be shutdown by itself
local $SIG{TERM} = sub {
$log_queue->enqueue("$room: terminating thread ".threads->tid()." for $address\n");
$client_socket->send("stop\n") if($client_socket);
close($client_socket) if($client_socket);
$selector = IO::Select->new($client_socket);
# send the given address to the presence daemon
$client_socket = undef;
# thread main loop
if(defined($client_socket) and not $last_contact > (gettimeofday() - ($current_state eq "absence" ? $values{absence_timeout} : $values{presence_timeout}) - 60))
$log_queue->enqueue("$room: socket to ".$values{address}.":".$values{port}." did not report anything in expected time, resetting socket\n");
$client_socket = undef;
if(not defined($client_socket))
# if it's the first occurance
# Tell this the client;
# create a log message
$log_queue->enqueue("$room: socket to ".$values{address}.":".$values{port}." for device $do_address closed. Trying to reconnect...\n");
# now try to re-establish the connection
$client_socket = new IO::Socket::INET (
PeerHost => $values{address},
PeerPort => $values{port},
Proto => 'tcp',
KeepAlive => 1,
Blocking => 1
) or ( $reconnect_count++ );
# give a success message
$log_queue->enqueue("$room: reconnected to ".$values{address}.":".$values{port}." after $reconnect_count tries for device $do_address (UUID: $do_uuid)\n");
# reset the reconnect counter
$reconnect_count = 0;
# set the last contact date to now
$last_contact = gettimeofday();
# add the new established socket to the IO selector for incoming data monitoring.
# send the given address to the presence daemon
# if the socket has a message available
if(@client_handle = $selector->can_read(1))
# get all socket handles which has a message available
foreach my $local_client (@client_handle)
# get the message from the socket handle
$return = <$local_client>;
# if the message is defined (not EOF) handle the message...
# set the last contact date
$last_contact = gettimeofday();
# remove trailing whitespaces and newlines
# if the message is "command accepted"
if($return =~ /command accepted/)
# log this to the thread log queue
$log_queue->enqueue("$room: accepted command for $do_address\n");
elsif($return =~ /command rejected/) # if the message is "command rejected" also log it to the log queue
$log_queue->enqueue("$room: REJECTED command for $do_address\n");
else # else its a status message
# put the message to the status queue with uuid for identification and the room name
# if the state changes from present to absence
if(defined($previous_state) and $previous_state eq "present" and lc($return) =~ /^absence/)
# log the timout change to the log queue
$log_queue->enqueue("$room: changing to absence timeout (".$values{absence_timeout}.") for device $do_address\n");
$current_state = "absence";
# send the new command with the configured absence timeout
elsif(defined($previous_state) and $previous_state eq "absence" and lc($return) =~ /^present/)
$log_queue->enqueue("$room: changing to presence timeout (".$values{presence_timeout}.") for device $do_address\n");
$current_state = "present";
# if the state changes from absence to present, set the presence timeout
# set the previous state to the current state
($previous_state, undef) = split(";", lc($return));
else # the socket is EOF which means the connection was closed
# TODO: reconnect mechanism
$client_socket = undef;
# Sleep for one second to avoid 100% cpu usage
sub timestamp
my ($sec, $min, $hour, $day, $mon, $year, undef, undef, undef) = localtime(time);
$year += 1900;
$sec = ($sec < 10 ? "0".$sec : $sec);
$min = ($min < 10 ? "0".$min : $min);
$hour = ($hour < 10 ? "0".$hour : $hour);
$day = ($day < 10 ? "0".$day : $day);
$mon = ($mon < 10 ? "0".$mon : $mon);
return "$year-$mon-$day $hour:$min:$sec - ";
sub readConfig
my ($ini) = @_;
my $section;
my $keyword;
my $value;
my $errorcount = 0;
print timestamp()."reading configuration file\n" if($opt_v);
%config = ();
open (INI, "$ini") or (print timestamp()."Can't open $ini: $!\n" and exit(1));
while (<INI>) {
if (/^\s*?\[(\w+?)\]/) {
$section = $1;
if (/^\s*(\w+?)=(.+?)\s*(#.*)?$/) {
$keyword = $1;
$value = $2 ;
# put them into hash
$config{$section}{$keyword} = $value;
close (INI);
# validating config
foreach my $room (keys %config)
if(not exists($config{$room}{address}))
print timestamp()."room $room has no value for address configured\n";
if(not $config{$room}{address} =~ /^[a-zA-Z0-9.-]+$/)
print timestamp()."no valid address for room $room found: ".$config{$room}{address}."\n";
if(not exists($config{$room}{port}))
print timestamp()."room >>$room<< has no value for >>port<< configured\n";
if(not $config{$room}{port} =~ /^\d+$/)
print timestamp()."value >>port<< for room >>$room<< is not a number: ".$config{$room}{port}."\n";
if(not exists($config{$room}{absence_timeout}))
print timestamp()."room >>$room<< has no value for >>absence_timeout<< configured\n";
if(not $config{$room}{absence_timeout} =~ /^\d+$/)
print timestamp()."value >>absence_timeout<< value for room >>$room<< is not a number: ".$config{$room}{absence_timeout}."\n";
if(not exists($config{$room}{presence_timeout}))
print timestamp()."room >>$room<< has no value for >>presence_timeout<< configured\n";
if(not $config{$room}{presence_timeout} =~ /^\d+$/)
print timestamp()."value >>presence_timeout<< value for room >>$room<< is not a number: ".$config{$room}{presence_timeout}."\n";
foreach my $param (keys %{$config{$room}})
if(not $param =~ /(address|port|absence_timeout|presence_timeout)/)
print timestamp()."invalid parameter $param in room $room\n";
print timestamp()."found $errorcount config errors. exiting....\n";
exit 2;
print timestamp()."no config errors found\n";
sub aggregateRooms
my ($hash) = @_;
my $previous = "absence";
my @rooms;
my $key;
my $value;
my $temp_name;
foreach $key (keys %$hash)
($value, $name) = split(";", $hash->{$key});
if($value eq "present")
push @rooms, $key;
$temp_name = $name;
if(@rooms > 0)
return "$temp_name;".join(",",sort @rooms);
return undef;
sub generateUUID
my $uuid = Digest::MD5::md5_hex(rand);
$uuid = Digest::MD5::md5_hex(rand);
return $uuid;