From f4a82eea921833355a90595f99716995b226e084 Mon Sep 17 00:00:00 2001 From: hexenmeister Date: Sat, 11 Sep 2021 16:21:40 +0000 Subject: [PATCH] bugfix: disconnects with new mosquitto version (wrong use of DUP flag) fix commandref git-svn-id: https://svn.fhem.de/fhem/trunk@24953 2b470e98-0d58-463d-a4d8-8e2adae1ed80 --- fhem/FHEM/00_MQTT.pm | 231 +++++++++++++++++++++---------------------- 1 file changed, 110 insertions(+), 121 deletions(-) diff --git a/fhem/FHEM/00_MQTT.pm b/fhem/FHEM/00_MQTT.pm index 42a30b20b..1b37d5c8f 100644 --- a/fhem/FHEM/00_MQTT.pm +++ b/fhem/FHEM/00_MQTT.pm @@ -25,6 +25,54 @@ # ############################################## +package MQTT; + +use Exporter ('import'); +@EXPORT = (); +@EXPORT_OK = qw(send_publish send_subscribe send_unsubscribe client_attr client_subscribe_topic client_unsubscribe_topic topic_to_regexp parseParams); +%EXPORT_TAGS = (all => [@EXPORT_OK]); + +use strict; +use warnings; + +use GPUtils qw(GP_Import GP_ForallClients); +use Scalar::Util qw(looks_like_number); +use Carp qw(carp); + +use Net::MQTT::Constants; +use Net::MQTT::Message; + +our %qos = map {qos_string($_) => $_} (MQTT_QOS_AT_MOST_ONCE,MQTT_QOS_AT_LEAST_ONCE,MQTT_QOS_EXACTLY_ONCE); + +BEGIN {GP_Import(qw( + gettimeofday + readingsSingleUpdate + readingsBeginUpdate + readingsBulkUpdate + readingsEndUpdate + DevIo_OpenDev + DevIo_SimpleWrite + DevIo_SimpleRead + DevIo_CloseDev + DevIo_IsOpen + DevIo_getState + IsDisabled + RemoveInternalTimer + InternalTimer + AttrVal + ReadingsVal + Log3 + AssignIoPort + getKeyValue + setKeyValue + CallFn + defs + modules + init_done + ))}; + +sub ::MQTT_Initialize { goto &Initialize }; + my %sets = ( "connect" => "", "disconnect" => "", @@ -42,66 +90,29 @@ my @clients = qw( use DevIo; -sub MQTT_Initialize($) { +sub Initialize { - my $hash = shift @_; + my $hash = shift // return; # Provider - $hash->{Clients} = join (':',@clients); - $hash->{ReadyFn} = "MQTT::Ready"; - $hash->{ReadFn} = "MQTT::Read"; + $hash->{Clients} = join q{:}, @clients; + $hash->{ReadyFn} = \&Ready; + $hash->{ReadFn} = \&Read; # Consumer - $hash->{DefFn} = "MQTT::Define"; - $hash->{UndefFn} = "MQTT::Undef"; - $hash->{DeleteFn} = "MQTT::Delete"; - $hash->{RenameFn} = "MQTT::Rename"; - $hash->{ShutdownFn} = "MQTT::Shutdown"; - $hash->{SetFn} = "MQTT::Set"; - $hash->{NotifyFn} = "MQTT::Notify"; - $hash->{AttrFn} = "MQTT::Attr"; + $hash->{DefFn} = \&Define; + $hash->{UndefFn} = \&Undef; + $hash->{DeleteFn} = \&Delete; + $hash->{RenameFn} = \&Rename; + $hash->{ShutdownFn} = \&Shutdown; + $hash->{SetFn} = \&Set; + $hash->{NotifyFn} = \&Notify; + $hash->{AttrFn} = \&Attr; - $hash->{AttrList} = "keep-alive "."last-will client-id "."on-connect on-disconnect on-timeout privacy:1,0 ".$main::readingFnAttributes; + $hash->{AttrList} = "keep-alive last-will client-id on-connect on-disconnect on-timeout privacy:1,0 ".$::readingFnAttributes; } -package MQTT; -use Exporter ('import'); -@EXPORT = (); -@EXPORT_OK = qw(send_publish send_subscribe send_unsubscribe client_attr client_subscribe_topic client_unsubscribe_topic topic_to_regexp parseParams); -%EXPORT_TAGS = (all => [@EXPORT_OK]); - -use strict; -use warnings; - -use GPUtils qw(:all); - -use Net::MQTT::Constants; -use Net::MQTT::Message; - -our %qos = map {qos_string($_) => $_} (MQTT_QOS_AT_MOST_ONCE,MQTT_QOS_AT_LEAST_ONCE,MQTT_QOS_EXACTLY_ONCE); - -BEGIN {GP_Import(qw( - gettimeofday - readingsSingleUpdate - DevIo_OpenDev - DevIo_SimpleWrite - DevIo_SimpleRead - DevIo_CloseDev - RemoveInternalTimer - InternalTimer - AttrVal - ReadingsVal - Log3 - AssignIoPort - getKeyValue - setKeyValue - CallFn - defs - modules - looks_like_number - fhem - ))}; sub Define($$) { my ( $hash, $def ) = @_; @@ -115,21 +126,15 @@ sub Define($$) { $hash->{DeviceName} = $host; my $name = $hash->{NAME}; - my $user = getKeyValue($name."_user"); - my $pass = getKeyValue($name."_pass"); - - setKeyValue($name."_user",$username) unless(defined($user)); - setKeyValue($name."_pass",$password) unless(defined($pass)); + setKeyValue($name."_user",$username) if defined $username; + setKeyValue($name."_pass",$password) if defined $password; $hash->{DEF} = $host; #readingsSingleUpdate($hash,"connection","disconnected",0); - if ($main::init_done) { - return Start($hash); - } else { - return undef; - } + return Start($hash) if $init_done; + return; } sub Undef($) { @@ -206,49 +211,22 @@ sub process_event($$) { sub Set($@) { my ($hash, @a) = @_; - return "Need at least one parameters" if(@a < 2); + return 'Need at least one parameter!' if @a < 2; return "Unknown argument $a[1], choose one of " . join(" ", sort keys %sets) if(!defined($sets{$a[1]})); my $command = $a[1]; - my $value = $a[2]; + #my $value = $a[2]; - COMMAND_HANDLER: { - $command eq "connect" and do { - Start($hash); - last; - }; - $command eq "disconnect" and do { - Stop($hash); - last; - }; - $command eq "publish" and do { - shift(@a); - shift(@a); - #if(scalar(@a)<2) {return "not enough parameters. usage: publish [qos [retain]] topic value";} - #my $qos=0; - #my $retain=0; - #if(looks_like_number ($a[0])) { - # $qos = int($a[0]); - # $qos = 0 if $qos>1; - # shift(@a); - # if(looks_like_number ($a[0])) { - # $retain = int($a[0]); - # $retain = 0 if $retain>2; - # shift(@a); - # } - #} - #if(scalar(@a)<2) {return "missing parameters. usage: publish [qos [retain]] topic value";} - #my $topic = shift(@a); - #my $value = join (" ", @a); - - my ($qos, $retain,$topic, $value) = parsePublishCmd(@a); - return "missing parameters. usage: publish [qos:?] [retain:?] topic value1 [value2]..." if(!$topic); - return "wrong parameter. topic may nob be '#' or '+'" if ($topic eq '#' or $topic eq '+'); - $qos = MQTT_QOS_AT_MOST_ONCE unless defined($qos); - my $msgid = send_publish($hash, topic => $topic, message => $value, qos => $qos, retain => $retain); - last; - } - }; + return Start($hash) if $command eq 'connect'; + return Stop($hash) if $command eq 'disconnect'; + + return "unknown command $command" if $command ne 'publish'; + my ($qos, $retain,$topic, $value) = parsePublishCmd(@a); + return 'missing parameters. usage: publish [qos:?] [retain:?] topic value1 [value2]...' if !$topic; + return "wrong parameter. topic may not be '#' or '+'" if ($topic eq '#' or $topic eq '+'); + $qos = MQTT_QOS_AT_MOST_ONCE if !defined $qos; + send_publish($hash, topic => $topic, message => $value, qos => $qos, retain => $retain); + return; } sub parseParams($;$$$$) { @@ -408,7 +386,7 @@ sub Attr($$$$) { } else { $hash->{timeout} = 60; } - if ($main::init_done) { + if ($init_done) { $hash->{ping_received}=1; Timer($hash); }; @@ -417,7 +395,7 @@ sub Attr($$$$) { $attribute eq "last-will" and do { if($hash->{STATE} ne "disconnected") { Stop($hash); - InternalTimer(gettimeofday()+1, "MQTT::Start", $hash, 0); + InternalTimer(gettimeofday()+1, \&Start, $hash, 0); } last; }; @@ -445,7 +423,7 @@ sub Start($) { } DevIo_CloseDev($hash); - return DevIo_OpenDev($hash, 0, "MQTT::Init"); + return DevIo_OpenDev($hash, 0, \&Init); } sub Stop($) { @@ -461,20 +439,24 @@ sub Stop($) { send_disconnect($hash); DevIo_CloseDev($hash); RemoveInternalTimer($hash); - readingsSingleUpdate($hash,"connection","disconnected",1); - readingsSingleUpdate($hash,"state","disconnected",1); + readingsBeginUpdate($hash); + readingsBulkUpdate($hash,'connection','disconnected'); + readingsBulkUpdate($hash,'state','disconnected'); + readingsEndUpdate($hash,1); } sub Ready($) { my $hash = shift; - return DevIo_OpenDev($hash, 1, "MQTT::Init") if($hash->{STATE} eq "disconnected"); + return if DevIo_IsOpen($hash) || IsDisabled($hash->{NAME}); + return DevIo_OpenDev($hash, 1, \&Init, sub(){}) if DevIo_getState($hash) eq 'disconnected'; + return; } sub Rename() { my ($new,$old) = @_; setKeyValue($new."_user",getKeyValue($old."_user")); setKeyValue($new."_pass",getKeyValue($old."_pass")); - + setKeyValue($old."_user",undef); setKeyValue($old."_pass",undef); return undef; @@ -741,7 +723,7 @@ sub client_subscribe_topic($$;$$) { $client->{subscribeQos}->{$topic}=$qos; my $expr = topic_to_regexp($topic); push @{$client->{subscribeExpr}},$expr unless grep {$_ eq $expr} @{$client->{subscribeExpr}}; - if ($main::init_done) { + if ($init_done) { if (my $mqtt = $client->{IODev}) {; $qos = $client->{".qos"}->{"*"} unless defined $qos; # MQTT_QOS_AT_MOST_ONCE $retain = 0 unless defined $retain; # not supported yet @@ -760,7 +742,7 @@ sub client_unsubscribe_topic($$) { delete $client->{subscribeQos}->{$topic}; my $expr = topic_to_regexp($topic); $client->{subscribeExpr} = [grep { $_ ne $expr} @{$client->{subscribeExpr}}]; - if ($main::init_done) { + if ($init_done) { if (my $mqtt = $client->{IODev}) {; my $msgid = send_unsubscribe($mqtt, topics => [$topic], @@ -783,12 +765,9 @@ sub Client_Define($$) { $client->{subscribeExpr} = []; AssignIoPort($client); - if ($main::init_done) { - return client_start($client); - } else { - return undef; - } -}; + return client_start($client) if $init_done; + return; +} sub Client_Undefine($) { client_stop(shift); @@ -895,7 +874,7 @@ sub client_attr($$$$$) { last; }; $attribute eq "IODev" and do { - if ($main::init_done) { + if ($init_done) { if ($command eq "set") { client_stop($client); $main::attr{$name}{IODev} = $value; @@ -971,48 +950,55 @@ sub client_stop($) { }; 1; +__END__ =pod =item [device] =item summary connects fhem to MQTT + =begin html - +

MQTT