#!/usr/bin/perl ############################################################################# # $Id$ ############################################################################## # # collectord # Connects to several presenced instances to check for multiple bluetooth devices # for their presence state and report a summary state to the 73_PRESENCE.pm module # # Copyright by Markus Bloch # e-mail: Notausstieg0309@googlemail.com # # This file is part of fhem. # # Fhem is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 2 of the License, or # (at your option) any later version. # # Fhem is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with fhem. If not, see . # ############################################################################## use IO::Socket; use IO::Select; use POSIX; #use Data::Dumper; use File::Basename; use Getopt::Long; use threads; use Thread::Queue; use Time::HiRes; use Time::HiRes qw(gettimeofday); use warnings; use strict; use Digest::MD5; my $new_client; my $server; my $client; my $buf; sub Log($$); sub parseParams($;$); sub closeClientConnection($); sub stopClientThreads($); my $opt_d; my $opt_h; my $opt_v = 0; my $opt_p = 5222; my $opt_P = "/var/run/".basename($0).".pid"; my $opt_l; my $opt_c; my $opt_n; my %config; my %queues; my $thread_counter = 0; my %state; my %handle; my %socket_to_handle; $SIG{__DIE__} = sub { my ($msg) = @_; Log 1, "PERL ERROR: $msg"; }; $SIG{__WARN__} = sub { my ($msg) = @_; Log 1, "PERL WARN: $msg"; }; Getopt::Long::Configure('bundling'); GetOptions( "d" => \$opt_d, "daemon" => \$opt_d, "n" => \$opt_n, "no-timestamps" => \$opt_n, "v+" => \$opt_v, "verbose+" => \$opt_v, "l=s" => \$opt_l, "logfile=s" => \$opt_l, "c=s" => \$opt_c, "configfile=s" => \$opt_c, "p=i" => \$opt_p, "port=i" => \$opt_p, "P=s" => \$opt_P, "pid-file=s" => \$opt_P, "h" => \$opt_h, "help" => \$opt_h ); Log 0, "=================================================" if($opt_l); sub print_usage () { print "Usage:\n"; print " collectord -c [-d] [-p ] [-P ] \n"; print " collectord [-h | --help]\n"; print "\n\nOptions:\n"; print " -c, --configfile \n"; print " The config file which contains the room and timeout definitions\n"; print " -p, --port\n"; print " TCP Port which should be used (Default: 5222)\n"; print " -P, --pid-file\n"; print " PID file for storing the local process id (Default: /var/run/".basename($0).".pid)\n"; print " -d, --daemon\n"; print " detach from terminal and run as background daemon\n"; print " -n, --no-timestamps\n"; print " do not output timestamps in log messages\n"; print " -v, --verbose\n"; print " Print detailed log output (can be used multiple times to increase the loglevel, max. 2 times)\n"; print " -l, --logfile \n"; print " log to the given logfile\n"; print " -h, --help\n"; print " Print detailed help screen\n"; } if($opt_h) { print_usage(); exit; } if(-e "$opt_P") { print STDERR timestamp()."another process already running (PID file found at $opt_P)\n"; print STDERR timestamp()."aborted...\n"; exit 1; } if(not $opt_c) { print STDERR "no config file provided\n\n"; print_usage(); exit 1; } if(not -e "$opt_c" or not -r "$opt_c") { print STDERR "config-file $opt_c could not be loaded\n"; exit 1; } Log 0, "started with PID $$"; readConfig($opt_c); if($opt_d) { daemonize(); } # Write PID file open(PIDFILE, ">$opt_P"); print PIDFILE $$."\n"; close PIDFILE; $server = new IO::Socket::INET ( LocalPort => $opt_p, Proto => 'tcp', Listen => 5, Reuse => 1, Type => SOCK_STREAM, KeepAlive => 1, Blocking => 0 ) or die "error while creating socket: $!\n"; Log 1, "created socket on ".$server->sockhost()." with port ".$server->sockport(); my $listener = IO::Select->new(); $listener->add($server); my @new_handles; my %child_handles; my %child_config; my $address; my $name; my $timeout; my $write_handle; my $server_pid; my @threads; my $sig_received = undef; $SIG{HUP} = sub { $sig_received = "SIGHUP"; }; $SIG{INT} = sub { $sig_received = "SIGINT"; }; $SIG{TERM} = sub { $sig_received = "SIGTERM"; }; $SIG{KILL} = sub { $sig_received = "SIGKILL"; }; $SIG{QUIT} = sub { $sig_received = "SIGQUIT"; }; $SIG{ABRT} = sub { $sig_received = "SIGABRT"; }; $server_pid = $$ unless(defined($server_pid)); my $status_queue = Thread::Queue->new(); my $log_queue = Thread::Queue->new(); Log 2, "finished initialization. entering main loop"; while(1) { # Cleaning up the status hash for obsolete devices foreach my $uuid (keys %state) { my %handle_to_socket = reverse %socket_to_handle; unless(exists($handle_to_socket{$uuid})) { Log 2, "cleaning up status values (UUID: $uuid)"; delete $state{$uuid}; } } # process all status messages from all threads via status queue while($status_queue->pending) { my ($uuid,$room,$value,$data) = split(";", $status_queue->dequeue, 4); Log 2, "processing state message for device ".(defined($name)?$name." ":"")."in room $room (UUID: $uuid) - value: $value".(defined($data) ? " - data: $data" : ""); if(not $value =~ /^(absence|present)$/) { $handle{$uuid}{client}->send("$value;room=$room\n") if(defined($handle{$uuid}{client})); if($value eq "socket_closed") { delete($state{$uuid}{rooms}{$room}); } } else { $state{$uuid}{rooms}{$room}{state} = $value; if(defined($data)) { $state{$uuid}{rooms}{$room}{data} = $data; } else { delete $state{$uuid}{rooms}{$room}{data}; } my $result = aggregateRooms($state{$uuid}{rooms}); if(defined($result)) { if(not defined($state{$uuid}{lastresult}{value}) or (($state{$uuid}{lastresult}{value} eq "$result" and ($state{$uuid}{lastresult}{timestamp} + $handle{$uuid}{timeout}) < time()) or $state{$uuid}{lastresult}{value} ne "$result")) { if(defined($handle{$uuid}{client})) { $handle{$uuid}{client}->send("$result\n"); $state{$uuid}{lastresult}{value} = "$result"; $state{$uuid}{lastresult}{timestamp} = time(); } } } } #print Dumper(\%state); } # If a thread has something reported via Log Queue, print it out if verbose is activated while($log_queue->pending) { Log 2, $log_queue->dequeue; } # If a INET socket has anything to report if(@new_handles = $listener->can_read(1)) { foreach my $client (@new_handles) { # if the socket is the server socket, accept new client and add it to the socket selector if($client == $server) { $new_client = $server->accept(); setsockopt($new_client, SOL_SOCKET, SO_KEEPALIVE, 1); # activate keep-alive $listener->add($new_client); Log 1, "new connection from ".$new_client->peerhost().":".$new_client->peerport(); } else # else is must be a client, so read the message and process it { $buf = ''; $buf = <$client>; # if the message is defined, it is a real message, else the connection is closed (EOF) if($buf) { # replace leading and trailing white spaces $buf =~ s/(^\s*|\s*$)//g; # if the message is a new command, accept the command and create threads for all rooms to process the command if($buf =~ /^\s*([0-9a-fA-F]{2}:){5}[0-9a-fA-F]{2}\s*\|\s*\d+\s*$/) { # send the acknowledgment back to the sender $client->send("command accepted\n"); Log 2, "received new command from ".$client->peerhost().":".$client->peerport()." - $buf"; # Split the message into bluetooth address and the timeout value # (timeout is ignored within the collectord, as it is given by configuration) ($address, $timeout) = split("\\|", $buf); # remove any containing white spaces $address =~ s/\s*//g; $timeout =~ s/\s*//g; # if the client has already a request running, stop at first the old request if(defined($socket_to_handle{$client})) { my $uuid = $socket_to_handle{$client}; # get all threads for this socket and send them a termination signal foreach my $room (keys %{$handle{$uuid}{threads}}) { Log 2, "sending thread ".$handle{$uuid}{threads}{$room}->tid()." new address $address for room $room"; $queues{$handle{$uuid}{threads}{$room}->tid()}->enqueue("new|$address"); $state{$uuid}{rooms}{$room}{state} = "" if(exists($state{$uuid}{rooms}{$room})); delete($state{$uuid}{rooms}{$room}{data}); } $handle{$uuid}{timeout} = $timeout; $state{$uuid}{lastresult}{timestamp} = 0; } else { # create a new uuid if not exist for socket if(not defined($socket_to_handle{$client})) { $socket_to_handle{$client} = generateUUID(); Log 2, "generating new UUID for client ".$client->peerhost()." - ".$socket_to_handle{$client}; } my $uuid = $socket_to_handle{$client}; $handle{$uuid}{address} = $address; $handle{$uuid}{client} = $client; $handle{$uuid}{timeout} = $timeout; $state{$uuid}{lastresult}{value} = "absence"; $state{$uuid}{lastresult}{timestamp} = 0; # create a new reqester thread for each configured room to perform the query while (my ($room, $value) = each %config) { $thread_counter++; $queues{$thread_counter} = Thread::Queue->new(); my $new_thread = threads->new(\&doQuery, ($value, $room, $address, $uuid)); Log 1, "created thread ".$new_thread->tid()." for processing device $address in room $room for peer ".$client->peerhost()." (UUID: $uuid)"; # detach from the thread, so the thread starts processing independantly $new_thread->detach(); # save the socket/room relationship to know which thread belongs to which client request (for stop command) $handle{$uuid}{threads}{$room} = $new_thread; $state{$uuid}{rooms}{$room}{state} = ""; delete($state{$uuid}{rooms}{$room}{data}); } } } elsif(lc($buf) =~ /^\s*now\s*$/) # if a now command is received, all threads need to be signaled to send a now command to the presenced server { Log 2, "received now command from client ".$client->peerhost(); # just to be sure if the client has really a running request if(defined($socket_to_handle{$client})) { my $uuid = $socket_to_handle{$client}; # get all threads for this socket and send them a now command foreach my $room (keys %{$handle{$uuid}{threads}}) { Log 2, "signalling thread ".$handle{$uuid}{threads}{$room}->tid()." to send \"now\"-request for room $room for client ".$client->peerhost(); $queues{$handle{$uuid}{threads}{$room}->tid()}->enqueue("now"); # delete state and room data to get a fresh state $state{$uuid}{rooms}{$room}{state} = "" if(exists($state{$uuid}{rooms}{$room})); delete($state{$uuid}{rooms}{$room}{data}); } delete($state{$uuid}{lastresult}) if(exists($state{$uuid}{lastresult})); $client->send("command accepted\n"); } else { # if there is no command running, just tell the client he's wrong $client->send("no command running\n"); } } elsif(lc($buf) =~ /^\s*stop\s*$/) # if a stop command is received, the running request threads must be stopped { Log 1, "received stop command from client ".$client->peerhost(); # just to be sure if the client has really a running request if(stopClientThreads($client)) { $client->send("command accepted\n"); } else { # if there is no command running, just tell the client he's wrong $client->send("no command running\n"); } } elsif(lc($buf) =~ /^\s*ping\s*$/) # { Log 1, "received ping command from client ".$client->peerhost(); $client->send("pong\n"); closeClientConnection($client); } else { # if the message does not match a regular command or a stop signal, just tell the client and make a entry for logging. $client->send("command rejected\n"); Log 1, "received invalid command >>$buf<< from client ".$client->peerhost(); } } else # if the message is not defined (EOF) the connection was closed. Now let's clean up { closeClientConnection($client); } } } } # in case we have received a process signal, remove the pid file and shutdown if(defined($sig_received)) { Log 1, "Caught $sig_received exiting"; unlink($opt_P); Log 1, "removed PID-File $opt_P"; Log 1, "server shutdown"; exit; } } Log 2, "leaving main loop"; ######################################################################################################################## # # Subroutine definitions # ######################################################################################################################## # to fork the process from the terminal sub daemonize { POSIX::setsid or die "setsid $!"; my $pid = fork(); if($pid < 0) { print STDERR "cannot fork: $!\n"; exit 1; } elsif($pid) { Log 0, "forked with PID $pid"; exit 0; } chdir "/"; umask 0; foreach (0 .. (POSIX::sysconf (&POSIX::_SC_OPEN_MAX) || 1024)) { POSIX::close $_ } # cut off any input and output open (STDIN, "/dev/null"); open (STDERR, ">&STDOUT"); } # the thread subroutine which performs a request for a specific room sub doQuery($$$) { my ($do_config, $do_room, $do_address, $do_uuid) = @_; my $return; my $socket; my %values = %{$do_config}; my $selector; my $run = 1; my @client_handle; my $reconnect_count = 0; my $client_socket = undef; my $last_contact = gettimeofday(); my $cmd; my $previous_state = "absence"; my $current_state = "absence"; $client_socket = new IO::Socket::INET ( PeerHost => $values{address}, PeerPort => $values{port}, Proto => 'tcp', Type => SOCK_STREAM, KeepAlive => 1, Blocking => 1 ) or ( $log_queue->enqueue(threads->tid()."|$do_room : could not create socket to ".$values{address}." - $! -")); $selector = IO::Select->new($client_socket); if(defined($client_socket)) { # send the given address to the presence daemon $client_socket->send($do_address."|".$values{absence_timeout}."\n"); } else { $selector->remove($client_socket); $client_socket = undef; } # thread main loop THREADLOOP: while($run) { if(defined($client_socket) and not $last_contact > (gettimeofday() - ($current_state eq "absence" ? $values{absence_timeout} : $values{presence_timeout}) - 60)) { $log_queue->enqueue(threads->tid()."|$do_room socket to ".$values{address}.":".$values{port}." did not report anything in expected time, resetting socket (last contact: ".strftime("%Y-%m-%d %H:%M:%S", localtime($last_contact)).")"); $selector->remove($client_socket); $client_socket->shutdown(2); close($client_socket); $client_socket = undef; } if(exists($queues{threads->tid()}) and $queues{threads->tid()}->pending) { $cmd = $queues{threads->tid()}->dequeue; $log_queue->enqueue(threads->tid()."|received command: $cmd"); if($cmd eq "now") { $log_queue->enqueue(threads->tid()."|sending \"now\" command to ".$values{address}.":".$values{port}); $client_socket->send("now\n") if(defined($client_socket)); } elsif($cmd eq "stop") { $log_queue->enqueue(threads->tid()."|$do_room terminating thread ".threads->tid()." for ".$values{address}); $client_socket->shutdown(2) if(defined($client_socket)); $selector->remove($client_socket) if(defined($selector)); close($client_socket) if(defined($client_socket)); $client_socket = undef; delete($queues{threads->tid()}) if(exists($queues{threads->tid()})); $run = 0; last THREADLOOP; } elsif($cmd =~ /^new\|/) { ($cmd, $do_address) = split("\\|", $cmd); $log_queue->enqueue(threads->tid()."|sending new address $do_address to ".$values{address}.":".$values{port}); if($current_state eq "present") { $client_socket->send($do_address."|".$values{presence_timeout}."\n") if(defined($client_socket)); } else { $client_socket->send($do_address."|".$values{absence_timeout}."\n") if(defined($client_socket)); } } } if(not defined($client_socket)) { # if it's the first occurance if(!$reconnect_count) { # Tell this the client; $status_queue->enqueue("$do_uuid;$do_room;socket_closed"); # create a log message $log_queue->enqueue(threads->tid()."|$do_room socket to ".$values{address}.":".$values{port}." for device $do_address closed. Trying to reconnect..."); } # now try to re-establish the connection $client_socket = new IO::Socket::INET ( PeerHost => $values{address}, PeerPort => $values{port}, Proto => 'tcp', Type => SOCK_STREAM, KeepAlive => 1, Blocking => 1 ) or ( $reconnect_count++ ); if(defined($client_socket)) { # give a success message $log_queue->enqueue(threads->tid()."|$do_room reconnected to ".$values{address}.":".$values{port}." after $reconnect_count tries for device $do_address (UUID: $do_uuid)"); $status_queue->enqueue("$do_uuid;$do_room;socket_reconnected"); # reset the reconnect counter $reconnect_count = 0; # set the last contact date to now $last_contact = gettimeofday(); # add the new established socket to the IO selector for incoming data monitoring. $selector->add($client_socket); # send the given address to the presence daemon $client_socket->send($do_address."|".$values{absence_timeout}."\n"); } else { sleep(9); } } # if the socket has a message available if(@client_handle = $selector->can_read(1)) { # get all socket handles which has a message available foreach my $local_client (@client_handle) { # get the message from the socket handle $return = <$local_client>; # if the message is defined (not EOF) handle the message... if($return) { # set the last contact date $last_contact = gettimeofday(); # remove trailing whitespaces and newlines chomp($return); # if the message is "command accepted" if($return =~ /command accepted/) { # log this to the thread log queue $log_queue->enqueue(threads->tid()."|$do_room accepted command for $do_address"); } elsif($return =~ /command rejected/) # if the message is "command rejected" also log it to the log queue { $log_queue->enqueue(threads->tid()."|$do_room REJECTED command for $do_address"); } else # else its a status message { # put the message to the status queue with uuid for identification and the room name $status_queue->enqueue("$do_uuid;$do_room;".$return); # if the state changes from present to absence if(defined($previous_state) and $previous_state eq "present" and lc($return) =~ /^absence/) { # log the timout change to the log queue $log_queue->enqueue(threads->tid()."|$do_room changing to absence timeout (".$values{absence_timeout}.") for device $do_address"); $current_state = "absence"; # send the new command with the configured absence timeout $local_client->send($do_address."|".$values{absence_timeout}."\n"); } elsif(defined($previous_state) and $previous_state eq "absence" and lc($return) =~ /^present/) { $log_queue->enqueue(threads->tid()."|$do_room changing to presence timeout (".$values{presence_timeout}.") for device $do_address"); $current_state = "present"; # if the state changes from absence to present, set the presence timeout $local_client->send($do_address."|".$values{presence_timeout}."\n"); } # set the previous state to the current state ($previous_state, undef) = split(";", lc($return)); } } else # the socket is EOF which means the connection was closed { $selector->remove($local_client); $local_client->shutdown(2); close($local_client); $client_socket = undef; } } } } $log_queue->enqueue(threads->tid()."|exiting thread"); } sub readConfig { my ($ini) = @_; my $section; my $keyword; my $value; my $errorcount = 0; Log 1, "reading configuration file"; %config = (); open (INI, "$ini") or (print STDERR timestamp()."Can't open $ini: $!\n" and exit(1)); while () { chomp; if (/^\s*?\[([^\]\n\r]+?)\]/) { $section = $1; } if (/^\s*(\w+?)=(.+?)\s*(#.*)?$/ and defined($section)) { $keyword = $1; $value = $2 ; # put them into hash $config{$section}{$keyword} = $value; } } close (INI); # validating config foreach my $room (keys %config) { if(not exists($config{$room}{address})) { Log 0, "room $room has no value for address configured"; $errorcount++; } else { if(not $config{$room}{address} =~ /^[a-zA-Z0-9.-]+$/) { Log 0, "no valid address for room $room found: ".$config{$room}{address}; $errorcount++; } } if(not exists($config{$room}{port})) { Log 0, "room >>$room<< has no value for >>port<< configured"; $errorcount++; } else { if(not $config{$room}{port} =~ /^\d+$/) { Log 0, "value >>port<< for room >>$room<< is not a number: ".$config{$room}{port}; $errorcount++; } } if(not exists($config{$room}{absence_timeout})) { Log 0, "room >>$room<< has no value for >>absence_timeout<< configured"; $errorcount++; } else { if(not $config{$room}{absence_timeout} =~ /^\d+$/) { Log 0, "value >>absence_timeout<< value for room >>$room<< is not a number: ".$config{$room}{absence_timeout}; $errorcount++; } } if(not exists($config{$room}{presence_timeout})) { Log 0, "room >>$room<< has no value for >>presence_timeout<< configured"; $errorcount++; } else { if(not $config{$room}{presence_timeout} =~ /^\d+$/) { Log 0, "value >>presence_timeout<< value for room >>$room<< is not a number: ".$config{$room}{presence_timeout}; $errorcount++; } } foreach my $param (keys %{$config{$room}}) { if(not $param =~ /(address|port|absence_timeout|presence_timeout)/) { Log 0, "invalid parameter $param in room $room"; $errorcount++; } } } if($errorcount) { print STDERR timestamp()."found $errorcount config errors. exiting....\n"; exit 2; } else { Log 0, "no config errors found"; } } sub aggregateRooms { my ($hash) = @_; my $previous = "absence"; my %rssi_results = (); my $hroom; my @rooms; my $key; my $first_key; # get all present rooms foreach $key (keys %$hash) { my $room_hash = $hash->{$key}; if(defined($room_hash->{state}) and $room_hash->{state} ne "") { my ($value, $data) = split(";", $hash->{$key}); if($room_hash->{state} eq "present") { push @rooms, $key; } } else { # if one room has no result return undef return undef; } } # if multiple rooms are present, try selection by highest RSSI if(@rooms > 0) { my $rssi_addon_data_key = "rssi"; my $rssi_available = 1; my $highest_value; my $highest_key; foreach $key (@rooms) { my $data = $hash->{$key}{data}; if(defined($data)) { my ($a,$h) = parseParams($data,';'); if(@{$a} == 1 and keys(%{$h}) == 0) # old presenced device name => convert to new style { $hash->{$key}{data} = "device_name='".$hash->{$key}{data}."'"; $rssi_available = 0; } elsif(@{$a} == 0 and keys(%{$h}) > 0) # new addon data style { # check rssi if(exists($h->{$rssi_addon_data_key}) and $h->{$rssi_addon_data_key} =~ /^-?(?:\d+\.)?\d+$/) { if(!defined($highest_value) or (defined($highest_value) and $h->{$rssi_addon_data_key} > $highest_value)) { $highest_value = $h->{$rssi_addon_data_key}; $highest_key = $key; } $rssi_results{$key} = $h->{$rssi_addon_data_key}; } else { $rssi_available = 0; } } else { Log 1, "invalid addon data received from room $key: $data"; } } } if($rssi_available and defined($highest_key)) { Log 2, "successful RSSI comparisation (highest $rssi_addon_data_key value $highest_value found in room $highest_key" if(@rooms > 1); $hroom = $highest_key; } } if(@rooms > 0) { return "present". ";rooms='".join(",",sort @rooms)."'". (defined($hroom) ? ";room='".$hroom."'" : ""). (%rssi_results ? ";".join(";", map { "rssi_".($_ =~ s/\s+/_/gr)."='".$rssi_results{$_}."'" } keys %rssi_results) : ""). (defined($hroom) ? ";".$hash->{$hroom}{data} : (defined($hash->{$rooms[0]}{data}) ? ";".$hash->{$rooms[0]}{data} : "")); } else { return "absence;room=;rooms=;rssi=;"; } } sub generateUUID { my $uuid = Digest::MD5::md5_hex(rand); while(defined($handle{$uuid})) { $uuid = Digest::MD5::md5_hex(rand); } return $uuid; } sub timestamp { return POSIX::strftime("%Y-%m-%d %H:%M:%S - ",localtime) unless($opt_n); return ""; } sub Log($$) { my ($loglevel, $message) = @_; my $thread = 0; if($message =~ /^\d+\|/) { ($thread, $message) = split("\\|", $message); } if($loglevel <= $opt_v) { if($opt_l) { open(LOGFILE, ">>$opt_l") or die ("could not open logfile: $opt_l"); } else { open (LOGFILE, ">&STDOUT") or die("cannot open STDOUT"); } print LOGFILE ($opt_l?"":"\r").timestamp().($opt_v >= 2 ? ($thread > 0 ? "(Thread $thread)" : "(Main Thread)")." - ":"").$message."\n"; close(LOGFILE); } } ##################################### # parseParams() from fhem.pl by justme1968 sub parseParams($;$) { my($cmd, $separator) = @_; $separator = ' ' if( !$separator ); my(@a, %h); my @params; if( ref($cmd) eq 'ARRAY' ) { @params = @{$cmd}; } else { @params = split($separator, $cmd); } while (@params) { my $param = shift(@params); my ($key, $value) = split( '=', $param, 2 ); if( !defined( $value ) ) { $value = $key; $key = undef; } #collect all parts until the closing ' or " while( $param && $value =~ m/^('|")/ && $value !~ m/$1$/ ) { my $next = shift(@params); last if( !defined($next) ); $value .= $separator . $next; } #remove matching ' or " from the start and end if( $value =~ m/^('|")/ && $value =~ m/$1$/ ) { $value =~ s/^.(.*).$/$1/; } #collext all parts until opening { and closing } are matched if( $value =~ m/^{/ ) { # } for match my $count = 0; for my $i (0..length($value)-1) { my $c = substr($value, $i, 1); ++$count if( $c eq '{' ); --$count if( $c eq '}' ); } while( $param && $count != 0 ) { my $next = shift(@params); last if( !defined($next) ); $value .= $separator . $next; for my $i (0..length($next)-1) { my $c = substr($next, $i, 1); ++$count if( $c eq '{' ); --$count if( $c eq '}' ); } } } if( defined($key) ) { $h{$key} = $value; } else { push @a, $value; } } return(\@a, \%h); } sub closeClientConnection($) { my ($client) = @_; # make a log entry and remove the socket from the socket selector Log 1, "closed connection from ".$client->peerhost(); $listener->remove($client); # if there is a running command, stop it first and clean up stopClientThreads($client); # now close the socket, that's it close $client; } sub stopClientThreads($) { my ($client) = @_; # if there is a running command, stop it first and clean up (same as stop command, see above) if(defined($socket_to_handle{$client})) { my $uuid = $socket_to_handle{$client}; # get all threads for this socket and send them a termination signal foreach my $room (keys %{$handle{$uuid}{threads}}) { Log 2, "killing thread ".$handle{$uuid}{threads}{$room}->tid()." for room $room for client ".$client->peerhost(); $queues{$handle{$uuid}{threads}{$room}->tid()}->enqueue("stop"); delete($handle{$uuid}{threads}{$room}); } # when all threads are signaled, delete all relationship entry for this client delete($handle{$uuid}); delete($socket_to_handle{$client}); return 1; } else { return 0; } }