2
0
mirror of https://github.com/fhem/fhem-mirror.git synced 2025-02-01 01:09:47 +00:00

00_MQTT2_SERVER.pm: Fixing bugs (Forum #90145)

git-svn-id: https://svn.fhem.de/fhem/trunk@17126 2b470e98-0d58-463d-a4d8-8e2adae1ed80
This commit is contained in:
rudolfkoenig 2018-08-12 11:56:13 +00:00
parent 76c54ca4cf
commit ac3067c14a
3 changed files with 89 additions and 41 deletions

View File

@ -9,11 +9,10 @@ use warnings;
use TcpServerUtils;
use MIME::Base64;
sub MQTT2_SERVER_Parse($$$);
sub MQTT2_SERVER_Read($@);
sub MQTT2_SERVER_Write($$$);
sub MQTT2_SERVER_Undef($@);
sub MQTT2_SERVER_doPublish($$$;$$);
sub MQTT2_SERVER_doPublish($$$$;$);
# See also:
@ -25,6 +24,7 @@ MQTT2_SERVER_Initialize($)
my ($hash) = @_;
$hash->{Clients} = ":MQTT2_DEVICE:";
$hash->{MatchList}= { "1:MQTT2_DEVICE" => "^.*" },
$hash->{ReadFn} = "MQTT2_SERVER_Read";
$hash->{DefFn} = "MQTT2_SERVER_Define";
$hash->{AttrFn} = "MQTT2_SERVER_Attr";
@ -61,7 +61,7 @@ MQTT2_SERVER_Define($$)
Log3 $hash, 1, "$ret. Exiting.";
exit(1);
}
$hash->{NRCLIENTS} = 0;
readingsSingleUpdate($hash, "nrclients", 0, 0);
$hash->{clients} = {};
$hash->{retain} = {};
InternalTimer(1, "MQTT2_SERVER_keepaliveChecker", $hash, 0);
@ -73,12 +73,15 @@ MQTT2_SERVER_keepaliveChecker($)
{
my ($hash) = @_;
my $now = gettimeofday();
foreach my $clName (keys %{$hash->{clients}}) {
my $cHash = $defs{$clName};
next if(!$cHash || !$cHash->{keepalive} ||
$now < $cHash->{lastMsgTime}+$cHash->{keepalive}*1.5 );
Log3 $hash, 3, "$hash->{NAME}: $clName left us (keepalive check)";
CommandDelete(undef, $clName);
my $multiplier = AttrVal($hash, "keepaliveFactor", 1.5);
if($multiplier) {
foreach my $clName (keys %{$hash->{clients}}) {
my $cHash = $defs{$clName};
next if(!$cHash || !$cHash->{keepalive} ||
$now < $cHash->{lastMsgTime}+$cHash->{keepalive}*$multiplier );
Log3 $hash, 3, "$hash->{NAME}: $clName left us (keepalive check)";
CommandDelete(undef, $clName);
}
}
InternalTimer($now+10, "MQTT2_SERVER_keepaliveChecker", $hash, 0);
}
@ -93,11 +96,12 @@ MQTT2_SERVER_Undef($@)
my $shash = $defs{$sname};
delete($shash->{clients}{$hash->{NAME}});
$shash->{NRCLIENTS}--;
readingsSingleUpdate($shash, "nrclients",
ReadingsVal($sname, "nrclients", 1)-1, 1);
if($hash->{lwt}) { # Last will
my ($tp, $val) = split(':', $hash->{lwt}, 2);
MQTT2_SERVER_doPublish($shash, $tp, $val, undef, $hash->{cflags} & 0x20);
MQTT2_SERVER_doPublish($shash, $shash, $tp, $val, $hash->{cflags} & 0x20);
}
return $ret;
}
@ -133,7 +137,7 @@ MQTT2_SERVER_Set($@)
return "Usage: publish -r topic [value]" if(@a < 1);
my $tp = shift(@a);
my $val = join(" ", @a);
MQTT2_SERVER_doPublish($hash, $tp, $val, undef, $retain);
MQTT2_SERVER_doPublish($hash->{CL}, $hash, $tp, $val, $retain);
}
}
@ -167,7 +171,8 @@ MQTT2_SERVER_Read($@)
my $nhash = TcpServer_Accept($hash, "MQTT2_SERVER");
return if(!$nhash);
$nhash->{CD}->blocking(0);
$hash->{NRCLIENTS}++;
readingsSingleUpdate($hash, "nrclients",
ReadingsVal($hash->{NAME}, "nrclients", 0)+1, 1);
return;
}
@ -280,7 +285,7 @@ MQTT2_SERVER_Read($@)
$val = substr($pl, $off);
Log3 $sname, 4, "$cname $hash->{cid} $cpt $tp:$val";
addToWritebuffer($hash, pack("CCnC*", 0x40, 2, $pid)) if($qos); # PUBACK
MQTT2_SERVER_doPublish($defs{$sname}, $tp, $val, $cname, $cf & 0x01);
MQTT2_SERVER_doPublish($hash, $defs{$sname}, $tp, $val, $cf & 0x01);
####################################
} elsif($cpt eq "SUBSCRIBE") {
@ -328,25 +333,29 @@ MQTT2_SERVER_Read($@)
######################################
# Call sendto for all clients + Dispatch + dotrigger if rawEvents is set
sub
MQTT2_SERVER_doPublish($$$;$$)
MQTT2_SERVER_doPublish($$$$;$)
{
my ($hash, $tp, $val, $src, $retain) = @_;
my ($src, $tgt, $tp, $val, $retain) = @_;
$val = "" if(!defined($val));
$src = $tgt if(!defined($src));
if($retain) {
my $now = gettimeofday();
my %h = ( ts=>$now, val=>$val );
$hash->{retain}{$tp} = \%h;
$tgt->{retain}{$tp} = \%h;
}
foreach my $clName (keys %{$hash->{clients}}) {
MQTT2_SERVER_sendto($defs{$clName}, $tp, $val) if(!$src || $src ne $clName);
foreach my $clName (keys %{$tgt->{clients}}) {
MQTT2_SERVER_sendto($defs{$clName}, $tp, $val) if($src->{NAME} ne $clName);
}
Dispatch($hash, "$tp:$val", undef, 1);
my $re = AttrVal($hash->{NAME}, "rawEvents", undef);
DoTrigger($hash->{NAME}, "$tp:$val") if($re && $tp =~ m/$re/);
if($src->{cid}) { # "real" MQTT client
my $cid = $src->{cid};
$cid =~ s,[^a-z0-9._],_,gi;
Dispatch($tgt, "$cid:$tp:$val", undef, 1);
my $re = AttrVal($tgt->{NAME}, "rawEvents", undef);
DoTrigger($tgt->{NAME}, "$tp:$val") if($re && $tp =~ m/$re/);
}
}
######################################
@ -382,7 +391,7 @@ sub
MQTT2_SERVER_Write($$$)
{
my ($hash,$topic,$msg) = @_;
MQTT2_SERVER_doPublish($hash, $topic, $msg);
MQTT2_SERVER_doPublish($hash, $hash, $topic, $msg);
}
sub
@ -490,6 +499,18 @@ MQTT2_SERVER_getStr($$)
Should only be used, if there is no MQTT2_DEVICE to process the topic.
</li><br>
<a name="keepaliveFactor"></a>
<li>keepaliveFactor<br>
the oasis spec requires a disconnect, if after 1.5 times the client
supplied keepalive no data or PINGREQ is sent. With this attribute you
can modify this factor, 0 disables the check.
Notes:
<ul>
<li>dont complain if you set this attribute to less or equal to 1.</li>
<li>MQTT2_SERVER checks the keepalive only every 10 second.</li>
</ul>
</li>
<a name="SSL"></a>
<li>SSL<br>
Enable SSL (i.e. TLS)

View File

@ -2,8 +2,6 @@
# $Id$
package main;
# TODO: autocreate
use strict;
use warnings;
use SetExtensions;
@ -46,6 +44,7 @@ MQTT2_DEVICE_Define($$)
my @a = split("[ \t][ \t]*", $def);
my $name = shift @a;
my $type = shift @a; # always MQTT2_DEVICE
shift(@a) if(@a && $a[0] eq "autocreated");
return "wrong syntax for $name: define <name> MQTT2_DEVICE" if(int(@a));
@ -72,10 +71,11 @@ MQTT2_DEVICE_Parse($$)
}
}
my ($topic, $value) = split(":", $msg, 2);
my ($cid, $topic, $value) = split(":", $msg, 3);
my $dp = $modules{MQTT2_DEVICE}{defptr};
foreach my $re (keys %{$dp}) {
next if($msg !~ m/^$re$/s);
next if(!("$topic:$value" =~ m/^$re$/s ||
"$cid:$topic:$value" =~ m/^$re$/s));
foreach my $dev (keys %{$dp->{$re}}) {
next if(IsDisabled($dev));
my @retData;
@ -86,13 +86,15 @@ MQTT2_DEVICE_Parse($$)
if($code =~ m/^{.*}$/s) {
$code = EvalSpecials($code, ("%TOPIC"=>$topic, "%EVENT"=>$value));
my $ret = AnalyzePerlCommand(undef, $code);
readingsBeginUpdate($hash);
foreach my $k (keys %{$ret}) {
readingsBulkUpdate($hash, $k, $ret->{$k});
push(@retData, "$k $ret->{$k}");
checkForGet($hash, $k, $ret->{$k});
if($ret && ref $ret eq "HASH") {
readingsBeginUpdate($hash);
foreach my $k (keys %{$ret}) {
readingsBulkUpdate($hash, $k, $ret->{$k});
push(@retData, "$k $ret->{$k}");
checkForGet($hash, $k, $ret->{$k});
}
readingsEndUpdate($hash, 1);
}
readingsEndUpdate($hash, 1);
} else {
readingsSingleUpdate($hash, $code, $value, 1);
@ -104,6 +106,27 @@ MQTT2_DEVICE_Parse($$)
}
}
# autocreate, init readingList if message is a json string
# deactivated, as there are a lot of messages to be catched
# if(!@ret) {
# my $nn = "MQTT2_$cid";
# if(!$defs{$nn} && $cid !~ m/mosqpub.*/) {
# PrioQueue_add(sub{
# return if(!$defs{$nn});
# if($value =~ m/^{.*}$/) {
# my %ret = MQTT2_JSON($msg);
# if(keys %ret) {
# CommandAttr(undef,
# "$nn readingList $cid:$topic:.* { MQTT2_JSON(\$EVENT) }");
# }
# }
# $defs{$nn}{autocreated_on} = $msg;
# }, undef);
# return "UNDEFINED $nn MQTT2_DEVICE autocreated"
# }
# return "";
# }
return @ret;
}
@ -274,6 +297,7 @@ MQTT2_DEVICE_Attr($$)
return undef;
}
return "$dev attr $attrName: more parameters needed" if(!$param); #90145
foreach my $el (split("\n", $param)) {
my ($par1, $par2) = split(" ", $el, 2);
next if(!$par1);
@ -394,11 +418,13 @@ MQTT2_DEVICE_Undef($$)
<a name="readingList"></a>
<li>readingList &lt;regexp&gt; [readingName|perl-Expression] ...
<br>
If the regexp matches topic:message either set readingName to
the published message, or evaluate the perl expression, which has to
return a hash consisting of readingName=>readingValue entries.
If the regexp matches topic:message or cid:topic:message either set
readingName to the published message, or evaluate the perl expression,
which has to return a hash consisting of readingName=>readingValue
entries.
You can define multiple such tuples, separated by newline, the newline
does not have to be entered in the FHEMWEB frontend.<br>
does not have to be entered in the FHEMWEB frontend. cid is the client-id
of the sending device.<br>
Example:<br>
<code>
&nbsp;&nbsp;attr dev readingList\<br>

View File

@ -59,9 +59,10 @@ FHEM2FHEM_Define($$)
my $iodev = $defs{$rdev};
return "Undefined local device $rdev" if(!$iodev);
$hash->{rawDevice} = $rdev;
$hash->{Clients} = $iodev->{Clients};
$hash->{Clients} = $modules{$iodev->{TYPE}}{Clients}
if(!$hash->{Clients});
my $iomod = $modules{$iodev->{TYPE}};
$hash->{Clients} = $iodev->{Clients} ? $iodev->{Clients} :$iomod->{Clients};
$hash->{MatchList} = $iomod->{MatchList} if($iomod->{MatchList});
}