From 56d6122a6ff868d62f723baf70783038631fc0e7 Mon Sep 17 00:00:00 2001 From: rudolfkoenig <> Date: Fri, 16 Nov 2018 08:59:25 +0000 Subject: [PATCH] 00_MQTT2_CLIENT.pm: "MQTT_GENERiC_BRIDGE" changes (Forum #93255) git-svn-id: https://svn.fhem.de/fhem/trunk@17757 2b470e98-0d58-463d-a4d8-8e2adae1ed80 --- fhem/FHEM/00_MQTT2_CLIENT.pm | 116 +++++++++++++++++++++++------------ fhem/FHEM/00_MQTT2_SERVER.pm | 21 +++++-- fhem/FHEM/10_MQTT2_DEVICE.pm | 4 +- 3 files changed, 93 insertions(+), 48 deletions(-) diff --git a/fhem/FHEM/00_MQTT2_CLIENT.pm b/fhem/FHEM/00_MQTT2_CLIENT.pm index c1c952f6d..57fee2614 100644 --- a/fhem/FHEM/00_MQTT2_CLIENT.pm +++ b/fhem/FHEM/00_MQTT2_CLIENT.pm @@ -9,6 +9,7 @@ use DevIo; sub MQTT2_CLIENT_Read($@); sub MQTT2_CLIENT_Write($$$); sub MQTT2_CLIENT_Undef($@); +sub MQTT2_CLIENT_doPublish($@); my $keepalive = 30; @@ -25,14 +26,15 @@ MQTT2_CLIENT_Initialize($) "1:MQTT2_DEVICE" => "^.*", "2:MQTT_GENERIC_BRIDGE" => "^.*" }; - $hash->{ReadFn} = "MQTT2_CLIENT_Read"; - $hash->{DefFn} = "MQTT2_CLIENT_Define"; - $hash->{AttrFn} = "MQTT2_CLIENT_Attr"; - $hash->{SetFn} = "MQTT2_CLIENT_Set"; - $hash->{UndefFn} = "MQTT2_CLIENT_Undef"; - $hash->{DeleteFn}= "MQTT2_CLIENT_Delete"; - $hash->{WriteFn} = "MQTT2_CLIENT_Write"; - $hash->{ReadyFn} = "MQTT2_CLIENT_connect"; + $hash->{ReadFn} = "MQTT2_CLIENT_Read"; + $hash->{DefFn} = "MQTT2_CLIENT_Define"; + $hash->{AttrFn} = "MQTT2_CLIENT_Attr"; + $hash->{SetFn} = "MQTT2_CLIENT_Set"; + $hash->{UndefFn} = "MQTT2_CLIENT_Undef"; + $hash->{ShutdownFn} = "MQTT2_CLIENT_Undef"; + $hash->{DeleteFn} = "MQTT2_CLIENT_Delete"; + $hash->{WriteFn} = "MQTT2_CLIENT_Write"; + $hash->{ReadyFn} = "MQTT2_CLIENT_connect"; no warnings 'qw'; my @attrList = qw( @@ -42,8 +44,9 @@ MQTT2_CLIENT_Initialize($) disabledForIntervals lwt lwtRetain + msgAfterConnect + msgBeforeDisconnect mqttVersion:3.1.1,3.1 - onConnect rawEvents subscriptions SSL @@ -124,10 +127,14 @@ MQTT2_CLIENT_doinit($) ############################## SUBSCRIBE } elsif($hash->{connecting} == 2) { + my $s = AttrVal($name, "subscriptions", "#"); + if($s eq "setByTheProgram") { + $s = ($hash->{".subscribe"} ? $hash->{".subscribe"} : "#"); + } my $msg = pack("n", $hash->{FD}). # packed Identifier join("", map { pack("n", length($_)).$_.pack("C",0) } # QOS:0 - split(" ", AttrVal($name, "subscriptions", "#"))); + split(" ", $s)); addToWritebuffer($hash, pack("C",0x80). MQTT2_CLIENT_calcRemainingLength(length($msg)).$msg); @@ -144,10 +151,7 @@ MQTT2_CLIENT_keepalive($) my $name = $hash->{NAME}; return if(ReadingsVal($name, "state", "") ne "opened"); Log3 $name, 5, "$name: keepalive $keepalive"; - my $msg = join("", map { pack("n", length($_)).$_.pack("C",0) } # QOS:0 - split(" ", AttrVal($name, "subscriptions", "#"))); - addToWritebuffer($hash, - pack("C",0xC0).pack("C",0)); + addToWritebuffer($hash, pack("C",0xC0).pack("C",0)); # PINGREQ InternalTimer(gettimeofday()+$keepalive, "MQTT2_CLIENT_keepalive", $hash, 0); } @@ -155,10 +159,27 @@ sub MQTT2_CLIENT_Undef($@) { my ($hash, $arg) = @_; + RemoveInternalTimer($hash); + my $ond = AttrVal($hash->{NAME}, "msgBeforeDisconnect", ""); + MQTT2_CLIENT_doPublish($hash, split(" ", $ond, 2), 0, 1) if($ond); + DevIo_SimpleWrite($hash, pack("C",0xE0).pack("C",0), 0); # DISCONNECT DevIo_CloseDev($hash); return undef; } +sub +MQTT2_CLIENT_Disco($) +{ + my ($hash) = @_; + RemoveInternalTimer($hash); + $hash->{connecting} = 1; + my $ond = AttrVal($hash->{NAME}, "msgBeforeDisconnect", ""); + MQTT2_CLIENT_doPublish($hash, split(" ", $ond, 2), 0, 0) if($ond); + addToWritebuffer($hash, pack("C",0xE0).pack("C",0)); # DISCONNECT + DevIo_Disconnected($hash); +} + + sub MQTT2_CLIENT_Delete($@) { @@ -190,15 +211,6 @@ MQTT2_CLIENT_Attr(@) return undef; } -sub -MQTT2_CLIENT_Disco($) -{ - my ($hash) = @_; - RemoveInternalTimer($hash); - $hash->{connecting} = 1; - DevIo_Disconnected($hash); -} - sub MQTT2_CLIENT_Set($@) { @@ -291,7 +303,7 @@ MQTT2_CLIENT_Read($@) if($cpt eq "CONNACK") { my $rc = ord(substr($pl,1,1)); if($rc == 0) { - my $onc = AttrVal($name, "onConnect", ""); + my $onc = AttrVal($name, "msgAfterConnect", ""); MQTT2_CLIENT_doPublish($hash, split(" ", $onc, 2)) if($onc); MQTT2_CLIENT_doinit($hash); @@ -340,30 +352,47 @@ MQTT2_CLIENT_Read($@) ###################################### # send topic to client if its subscription matches the topic sub -MQTT2_CLIENT_doPublish($$$$) +MQTT2_CLIENT_doPublish($@) { - my ($hash, $topic, $val, $retain) = @_; + my ($hash, $topic, $val, $retain, $immediate) = @_; my $name = $hash->{NAME}; return if(IsDisabled($name)); $val = "" if(!defined($val)); Log3 $name, 5, "$name: sending PUBLISH $topic $val"; - addToWritebuffer($hash, - pack("C",0x30). - MQTT2_CLIENT_calcRemainingLength(2+length($topic)+length($val)). - pack("n", length($topic)). - $topic.$val); + + my $msg = pack("C",0x30). + MQTT2_CLIENT_calcRemainingLength(2+length($topic)+length($val)). + pack("n", length($topic)). + $topic.$val; + if($immediate) { + DevIo_SimpleWrite($hash, $msg, 0); + } else { + addToWritebuffer($hash, $msg); + } } sub MQTT2_CLIENT_Write($$$) { - my ($hash,$topic,$msg) = @_; - my $retain; - if($topic =~ m/^(.*):r$/) { - $topic = $1; - $retain = 1; + my ($hash, $function, $topicMsg) = @_; + + if($function eq "publish") { + my ($topic, $msg) = split(" ", $topicMsg, 2); + my $retain; + if($topic =~ m/^(.*):r$/) { + $topic = $1; + $retain = 1; + } + MQTT2_CLIENT_doPublish($hash, $topic, $msg, $retain); + + } elsif($function eq "subscribe") { + $hash->{".subscribtion"} = $topicMsg; + MQTT2_CLIENT_Disco($hash); + + } else { + my $name = $hash->{NAME}; + Log3 $name, 1, "$name: ERROR: Ignoring function $function"; } - MQTT2_CLIENT_doPublish($hash, $topic, $msg, $retain); } sub @@ -491,11 +520,16 @@ MQTT2_CLIENT_getStr($$) set the MQTT protocol version in the CONNECT header, default is 3.1 - -