From 52c2007313419253b6025bc613769b163e8d2eb5 Mon Sep 17 00:00:00 2001 From: eisler Date: Sat, 26 Aug 2017 22:15:46 +0000 Subject: [PATCH] MQTT: features from hexenmeister (Forum #msg662213) git-svn-id: https://svn.fhem.de/fhem/trunk@14964 2b470e98-0d58-463d-a4d8-8e2adae1ed80 --- fhem/CHANGED | 3 + fhem/FHEM/00_MQTT.pm | 367 +++++++++++++++++++++++++++++++++--- fhem/FHEM/10_MQTT_BRIDGE.pm | 107 ++++++++--- fhem/FHEM/10_MQTT_DEVICE.pm | 101 +++++++--- 4 files changed, 503 insertions(+), 75 deletions(-) diff --git a/fhem/CHANGED b/fhem/CHANGED index decccfde7..fe6446e0f 100644 --- a/fhem/CHANGED +++ b/fhem/CHANGED @@ -1,5 +1,8 @@ # Add changes at the top of the list. Keep it in ASCII, and 80-char wide. # Do not insert empty lines here, update check depends on it. + - feature: 00_MQTT: OnMessageFn, last-will, onConnect/onDisconnect + - feature: 10_MQTT_DEVICE: improved publishSet, retain, subscribeReadings + - feature: 10_MQTT_BRIDGE: improved retain, subscribeSet - new: 31_Aurora.pm: first release - bugfix: 21_HEOS: fix missing curl - bugfix: 31_PLAYBULB: code cleaning diff --git a/fhem/FHEM/00_MQTT.pm b/fhem/FHEM/00_MQTT.pm index e92a66497..0dc0f1303 100644 --- a/fhem/FHEM/00_MQTT.pm +++ b/fhem/FHEM/00_MQTT.pm @@ -27,6 +27,7 @@ my %sets = ( "connect" => "", "disconnect" => "", + "publish" => "", ); my %gets = ( @@ -50,13 +51,14 @@ sub MQTT_Initialize($) { $hash->{ReadFn} = "MQTT::Read"; # Consumer - $hash->{DefFn} = "MQTT::Define"; - $hash->{UndefFn} = "MQTT::Undef"; - $hash->{DeleteFn} = "MQTT::Delete"; - $hash->{SetFn} = "MQTT::Set"; - $hash->{NotifyFn} = "MQTT::Notify"; + $hash->{DefFn} = "MQTT::Define"; + $hash->{UndefFn} = "MQTT::Undef"; + $hash->{ShutdownFn} = "MQTT::Shutdown"; + $hash->{SetFn} = "MQTT::Set"; + $hash->{NotifyFn} = "MQTT::Notify"; + $hash->{AttrFn} = "MQTT::Attr"; - $hash->{AttrList} = "keep-alive ".$main::readingFnAttributes; + $hash->{AttrList} = "keep-alive "."last-will "."on-connect on-disconnect on-timeout".$main::readingFnAttributes; } package MQTT; @@ -86,10 +88,16 @@ BEGIN {GP_Import(qw( RemoveInternalTimer InternalTimer AttrVal + ReadingsVal Log3 AssignIoPort getKeyValue setKeyValue + CallFn + defs + modules + looks_like_number + fhem ))}; sub Define($$) { @@ -111,6 +119,8 @@ sub Define($$) { setKeyValue($name."_pass",$password) unless(defined($pass)); $hash->{DEF} = $host; + + #readingsSingleUpdate($hash,"connection","disconnected",0); if ($main::init_done) { return Start($hash); @@ -119,19 +129,67 @@ sub Define($$) { } } -sub Undef($$) { - my ($hash, $name) = @_; +sub Undef($) { + my $hash = shift; Stop($hash); - return undef; -} - -sub Delete($$) { - my ($hash, $name) = @_; + my $name = $hash->{NAME}; setKeyValue($name."_user",undef); setKeyValue($name."_pass",undef); return undef; } +sub Shutdown($) { + my $hash = shift; + Stop($hash); + my $name = $hash->{NAME}; + Log3($name,1,"Shutdown executed"); + return undef; +} + +sub onConnect($) { + my $hash = shift; + my $name = $hash->{NAME}; + my $cmdstr = AttrVal($name,"on-connect",undef); + return process_event($hash,$cmdstr); +} + +sub onDisconnect($) { + my $hash = shift; + my $name = $hash->{NAME}; + my $cmdstr = AttrVal($name,"on-disconnect",undef); + return process_event($hash,$cmdstr); +} + +sub onTimeout($) { + my $hash = shift; + my $name = $hash->{NAME}; + my $cmdstr = AttrVal($name,"on-timeout",undef); + if($cmdstr) { + return eval($cmdstr); + } +} + +sub process_event($$) { + my $hash = shift; + my $str = shift; + my ($qos, $retain,$topic, $message, $cmd) = parsePublishCmdStr($str); + + my $do=1; + if($cmd) { + my $name = $hash->{NAME}; + $do=eval($cmd); + $do=1 if (!defined($do)); + #no strict "refs"; + #my $ret = &{$hash->{WBCallback}}($hash); + #use strict "refs"; + } + + if($do && defined($topic)) { + $qos = MQTT_QOS_AT_MOST_ONCE unless defined($qos); + send_publish($hash, topic => $topic, message => $message, qos => $qos, retain => $retain); + } +} + sub Set($@) { my ($hash, @a) = @_; return "Need at least one parameters" if(@a < 2); @@ -149,9 +207,98 @@ sub Set($@) { Stop($hash); last; }; + $command eq "publish" and do { + shift(@a); + shift(@a); + #if(scalar(@a)<2) {return "not enough parameters. usage: publish [qos [retain]] topic value";} + #my $qos=0; + #my $retain=0; + #if(looks_like_number ($a[0])) { + # $qos = int($a[0]); + # $qos = 0 if $qos>1; + # shift(@a); + # if(looks_like_number ($a[0])) { + # $retain = int($a[0]); + # $retain = 0 if $retain>2; + # shift(@a); + # } + #} + #if(scalar(@a)<2) {return "missing parameters. usage: publish [qos [retain]] topic value";} + #my $topic = shift(@a); + #my $value = join (" ", @a); + + my ($qos, $retain,$topic, $value) = parsePublishCmd(@a); + return "missing parameters. usage: publish [qos:?] [retain:?] topic value1 [value2]..." if(!$topic); + return "wrong parameter. topic may nob be '#' or '+'" if ($topic eq '#' or $topic eq '+'); + $qos = MQTT_QOS_AT_MOST_ONCE unless defined($qos); + my $msgid = send_publish($hash, topic => $topic, message => $value, qos => $qos, retain => $retain); + last; + } }; } +sub parsePublishCmdStr($) { + my ($str) = @_; + + if(defined($str) && $str=~m/\s*(?:({.*})\s+)?(.*)/) { + my $exp = $1; + my $rest = $2; + if ($rest){ + my @lwa = split("[ \t]+",$rest); + unshift (@lwa,$exp) if($exp); + return parsePublishCmd(@lwa); + } + } + return undef; +} + +sub parsePublishCmd(@) { + my @a = @_; + # [qos:?] [retain:?] topic value + + return undef if(!@a); + return undef if(scalar(@a)<1); + + my $qos = 0; + my $retain = 0; + my $topic = undef; + my $value = "\0"; + my $expression = undef; + + while (scalar(@a)>0) { + my $av = shift(@a); + if($av =~ /\{.*\}/) { + $expression = $av; + next; + } + my ($pn,$pv) = split(":",$av); + if(defined($pv)) { + if($pn eq "qos") { + if($pv >=0 && $pv <=2) { + $qos = $pv; + } + } elsif($pn eq "retain") { + if($pv >=0 && $pv <=1) { + $retain = $pv; + } + } else { + # ignore + next; + } + } else { + $topic = $av; + last; + } + } + + if(scalar(@a)>0) { + $value = join(" ", @a); + } + + return undef unless $topic || $expression; + return ($qos, $retain,$topic, $value, $expression); +} + sub Notify($$) { my ($hash,$dev) = @_; if( grep(m/^(INITIALIZED|REREADCFG)$/, @{$dev->{CHANGED}}) ) { @@ -177,17 +324,47 @@ sub Attr($$$$) { }; last; }; + $attribute eq "last-will" and do { + if($hash->{STATE} ne "disconnected") { + Stop($hash); + InternalTimer(gettimeofday()+1, "MQTT::Start", $hash, 0); + } + last; + }; }; } +#sub Reconnect($){ +# my $hash = shift; +# Stop($hash); +# Start($hash); +#} + sub Start($) { my $hash = shift; + my $firsttime = $hash->{".cinitmark"}; + + if(defined($firsttime)) { + my $cstate=ReadingsVal($hash->{NAME},"connection",""); + if($cstate ne "disconnected" && $cstate ne "timed-out") { + return undef; + } + } else { + $hash->{".cinitmark"} = 1; + } + DevIo_CloseDev($hash); return DevIo_OpenDev($hash, 0, "MQTT::Init"); } sub Stop($) { my $hash = shift; + + my $cstate=ReadingsVal($hash->{NAME},"connection",""); + if($cstate eq "disconnected" || $cstate eq "timed-out") { + return undef; + } + send_disconnect($hash); DevIo_CloseDev($hash); RemoveInternalTimer($hash); @@ -221,7 +398,10 @@ sub Init($) { sub Timer($) { my $hash = shift; RemoveInternalTimer($hash); - readingsSingleUpdate($hash,"connection","timed-out",1) unless $hash->{ping_received}; + unless ($hash->{ping_received}) { + onTimeout($hash); + readingsSingleUpdate($hash,"connection","timed-out",1) ;#unless $hash->{ping_received}; + } $hash->{ping_received} = 0; InternalTimer(gettimeofday()+$hash->{timeout}, "MQTT::Timer", $hash, 0); send_ping($hash); @@ -242,6 +422,7 @@ sub Read { MESSAGE_TYPE: { $message_type == MQTT_CONNACK and do { readingsSingleUpdate($hash,"connection","connected",1); + onConnect($hash); GP_ForallClients($hash,\&client_start); foreach my $message_id (keys %{$hash->{messages}}) { my $msg = $hash->{messages}->{$message_id}->{message}; @@ -258,10 +439,15 @@ sub Read { Log3($client->{NAME},5,"publish received for $topic, ".$mqtt->message()); if (grep { $topic =~ $_ } @{$client->{subscribeExpr}}) { readingsSingleUpdate($client,"transmission-state","incoming publish received",1); - if ($client->{TYPE} eq "MQTT_DEVICE") { + my $fn = $modules{$defs{$client->{NAME}}{TYPE}}{OnMessageFn}; + if($fn) { + CallFn($client->{NAME},"OnMessageFn",($client,$topic,$mqtt->message())) + } elsif ($client->{TYPE} eq "MQTT_DEVICE") { MQTT::DEVICE::onmessage($client,$topic,$mqtt->message()); - } else { + } elsif ($client->{TYPE} eq "MQTT_BRIDGE") { MQTT::BRIDGE::onmessage($client,$topic,$mqtt->message()); + } else { + Log3($client->{NAME},1,"unexpected client or no OnMessageFn defined: ".$client->{TYPE}); } }; },undef); @@ -371,7 +557,11 @@ sub send_connect($) { my $name = $hash->{NAME}; my $user = getKeyValue($name."_user"); my $pass = getKeyValue($name."_pass"); - return send_message($hash, message_type => MQTT_CONNECT, keep_alive_timer => $hash->{timeout}, user_name => $user, password => $pass); + + my $lw = AttrVal($name,"last-will",undef); + my ($willqos, $willretain,$willtopic, $willmessage) = parsePublishCmdStr($lw); + + return send_message($hash, message_type => MQTT_CONNECT, keep_alive_timer => $hash->{timeout}, user_name => $user, password => $pass, will_topic => $willtopic, will_message => $willmessage, will_retain => $willretain, will_qos => $willqos); }; sub send_publish($@) { @@ -405,7 +595,9 @@ sub send_ping($) { }; sub send_disconnect($) { - return send_message(shift, message_type => MQTT_DISCONNECT); + my $hash = shift; + onDisconnect($hash); + return send_message($hash, message_type => MQTT_DISCONNECT); }; sub send_message($$$@) { @@ -419,6 +611,7 @@ sub send_message($$$@) { timeout => gettimeofday()+$hash->{timeout}, }; } + DevIo_SimpleWrite($hash,$message->bytes,undef); }; @@ -432,15 +625,17 @@ sub topic_to_regexp($) { return "^$t\$"; } -sub client_subscribe_topic($$) { - my ($client,$topic) = @_; +sub client_subscribe_topic($$;$$) { + my ($client,$topic,$qos,$retain) = @_; push @{$client->{subscribe}},$topic unless grep {$_ eq $topic} @{$client->{subscribe}}; my $expr = topic_to_regexp($topic); push @{$client->{subscribeExpr}},$expr unless grep {$_ eq $expr} @{$client->{subscribeExpr}}; if ($main::init_done) { if (my $mqtt = $client->{IODev}) {; + $qos = $client->{".qos"}->{"*"} unless defined $qos; # MQTT_QOS_AT_MOST_ONCE + $retain = 0 unless defined $retain; # not supported yet my $msgid = send_subscribe($mqtt, - topics => [[$topic => $client->{qos} || MQTT_QOS_AT_MOST_ONCE]], + topics => [[$topic => $qos || MQTT_QOS_AT_MOST_ONCE]], ); $client->{message_ids}->{$msgid}++; readingsSingleUpdate($client,"transmission-state","subscribe sent",1) @@ -468,8 +663,9 @@ sub Client_Define($$) { my ( $client, $def ) = @_; $client->{NOTIFYDEV} = $client->{DEF} if $client->{DEF}; - $client->{qos} = MQTT_QOS_AT_MOST_ONCE; - $client->{retain} = 0; + #$client->{qos} = MQTT_QOS_AT_MOST_ONCE; ### ALT + $client->{".qos"}->{'*'} = 0; + $client->{".retain"}->{'*'} = "0"; $client->{subscribe} = []; $client->{subscribeExpr} = []; AssignIoPort($client); @@ -485,25 +681,104 @@ sub Client_Undefine($) { client_stop(shift); return undef; }; - +#use Data::Dumper; sub client_attr($$$$$) { my ($client,$command,$name,$attribute,$value) = @_; ATTRIBUTE_HANDLER: { $attribute eq "qos" and do { - if ($command eq "set") { - $client->{qos} = $MQTT::qos{$value}; + #if ($command eq "set") { + # $client->{qos} = $MQTT::qos{$value}; ### ALT + #} else { + # $client->{qos} = MQTT_QOS_AT_MOST_ONCE; ### ALT + #} + + delete($client->{".qos"}); + + if ($command ne "set") { + delete($client->{".qos"}); + $client->{".qos"}->{"*"} = "0"; } else { - $client->{qos} = MQTT_QOS_AT_MOST_ONCE; + + my @values = (); + if(!defined($value) || $value=~/^[ \t]*$/) { + return "QOS value may not be empty. Format: [|*:]0|1|2"; + } + @values = split("[ \t]+",$value); + + foreach my $set (@values) { + my($rname,$rvalue) = split(":",$set); + if(!defined($rvalue)) { + $rvalue=$rname; + $rname=""; + $rname="*" if (scalar(@values)==1); # backward compatibility: single value without a reading name should be applied to all + } + #if ($command eq "set") { + # Map constants + #$rvalue = MQTT_QOS_AT_MOST_ONCE if($rvalue eq qos_string(MQTT_QOS_AT_MOST_ONCE)); + #$rvalue = MQTT_QOS_AT_LEAST_ONCE if($rvalue eq qos_string(MQTT_QOS_AT_LEAST_ONCE)); + #$rvalue = MQTT_QOS_EXACTLY_ONCE if($rvalue eq qos_string(MQTT_QOS_EXACTLY_ONCE)); + $rvalue=$MQTT::qos{$rvalue} if(defined($MQTT::qos{$rvalue})); + if($rvalue ne "0" && $rvalue ne "1" && $rvalue ne "2") { + return "unexpected QOS value $rvalue. use 0, 1 or 2. Constants may be also used (".MQTT_QOS_AT_MOST_ONCE."=".qos_string(MQTT_QOS_AT_MOST_ONCE).", ".MQTT_QOS_AT_LEAST_ONCE."=".qos_string(MQTT_QOS_AT_LEAST_ONCE).", ".MQTT_QOS_EXACTLY_ONCE."=".qos_string(MQTT_QOS_EXACTLY_ONCE)."). Format: [|*:]0|1|2"; + } + #$rvalue="1" unless ($rvalue eq "0"); + $client->{".qos"}->{$rname} = $rvalue; + #} else { + # delete($client->{".qos"}->{$rname}); + # $client->{".qos"}->{"*"} = "0" if($rname eq "*"); + #} + } } + + my $showqos = ""; + if(defined($client->{".qos"})) { + foreach my $rname (sort keys %{$client->{".qos"}}) { + my $rvalue = $client->{".qos"}->{$rname}; + $rname="[state]" if ($rname eq ""); + $showqos.=$rname.':'.$rvalue.' '; + } + } + $client->{"qos"} = $showqos; last; }; $attribute eq "retain" and do { - if ($command eq "set") { - $client->{retain} = $value; + delete($client->{".retain"}); + + if ($command ne "set") { + delete($client->{".retain"}); + $client->{".retain"}->{"*"} = "0"; } else { - $client->{retain} = 0; + my @values = (); + + if(!defined($value) || $value=~/^[ \t]*$/) { + return "retain value may not be empty. Format: [|*:]0|1"; + } + @values = split("[ \t]+",$value); + + foreach my $set (@values) { + my($rname,$rvalue) = split(":",$set); + if(!defined($rvalue)) { + $rvalue=$rname; + $rname=""; + $rname="*" if (scalar(@values)==1); # backward compatibility: single value without a reading name should be applied to all + } + if($rvalue ne "0" && $rvalue ne "1") { + return "unexpected retain value. use 0 or 1. Format: [|*:]0|1"; + } + $client->{".retain"}->{$rname} = $rvalue; + } } + + my $showretain = ""; + if(defined($client->{".retain"})) { + foreach my $rname (sort keys %{$client->{".retain"}}) { + my $rvalue = $client->{".retain"}->{$rname}; + $rname="[state]" if ($rname eq ""); + $showretain.=$rname.':'.$rvalue.' '; + } + } + $client->{"retain"} = $showretain; last; }; $attribute eq "IODev" and do { @@ -529,7 +804,7 @@ sub client_start($) { } if (@{$client->{subscribe}}) { my $msgid = send_subscribe($client->{IODev}, - topics => [map { [$_ => $client->{qos} || MQTT_QOS_AT_MOST_ONCE] } @{$client->{subscribe}}], + topics => [map { [$_ => $client->{".qos"}->{$_} || MQTT_QOS_AT_MOST_ONCE] } @{$client->{subscribe}}], ); $client->{message_ids}->{$msgid}++; readingsSingleUpdate($client,"transmission-state","subscribe sent",1); @@ -578,6 +853,10 @@ sub client_stop($) {

set <name> disconnect
disconnects the MQTT-device from the mqtt-broker

+
  • +

    set <name> publish [qos:?] [retain:?] <topic> <message>
    + sends message to the specified topic

    +
  • Attributes

    @@ -586,6 +865,32 @@ sub client_stop($) {

    keep-alive
    sets the keep-alive time (in seconds).

    +
  • +

    attr <name> last-will [qos:?] [retain:?] <topic> <message>
    + Support for MQTT feature "last will" +

    +

    example:
    + attr mqtt last-will /fhem/status crashed +

    +
  • +
  • +

    on-connect, on-disconnect
    + attr <name> on-connect {Perl-expression} <topic> <message>
    + Publish the specified message to a topic at connect / disconnect (counterpart to lastwill) and / or evaluation of Perl expression
    + If a Perl expression is provided, the message is sent only if expression returns true (for example, 1) or undef.
    + The following variables are passed to the expression at evaluation: $hash, $name, $qos, $retain, $topic, $message. +

    +

    examples:
    + attr mqtt on-connect /topic/status connected
    + attr mqtt on-connect {Log3("abc",1,"on-connect")} /fhem/status connected +

    +
  • +
  • +

    on-timeout
    + attr <name> on-timeout {Perl-expression} + evaluate the given Perl expression on timeout
    +

    +
  • diff --git a/fhem/FHEM/10_MQTT_BRIDGE.pm b/fhem/FHEM/10_MQTT_BRIDGE.pm index d7d9f4a08..5ffce2a7c 100644 --- a/fhem/FHEM/10_MQTT_BRIDGE.pm +++ b/fhem/FHEM/10_MQTT_BRIDGE.pm @@ -48,8 +48,9 @@ sub MQTT_BRIDGE_Initialize($) { $hash->{AttrList} = "IODev ". - "qos:".join(",",keys %MQTT::qos)." ". - "retain:0,1 ". + #"qos:".join(",",keys %MQTT::qos)." ". + "qos ". + "retain ". "publish-topic-base ". "publishState ". "publishReading_.* ". @@ -77,6 +78,10 @@ BEGIN { readingsSingleUpdate Log3 DoSet + fhem + defs + AttrVal + ReadingsVal )) }; @@ -99,23 +104,35 @@ sub Get($$@) { }; }; } - +#use Data::Dumper; sub Notify() { my ($hash,$dev) = @_; Log3($hash->{NAME},5,"Notify for $dev->{NAME}"); + #Log3($hash->{NAME},5,">>>>=====".Dumper($dev->{CHANGED})); foreach my $event (@{$dev->{CHANGED}}) { $event =~ /^([^:]+)(: )?(.*)$/; + #Log3($hash->{NAME},5,">>>>>>>>>>>>>>>>>>"); Log3($hash->{NAME},5,"$event, '".((defined $1) ? $1 : "-undef-")."', '".((defined $3) ? $3 : "-undef-")."'"); my $msgid; if (defined $3 and $3 ne "") { if (defined $hash->{publishReadings}->{$1}) { - $msgid = send_publish($hash->{IODev}, topic => $hash->{publishReadings}->{$1}, message => $3, qos => $hash->{qos}, retain => $hash->{retain}); + my $retain = $hash->{".retain"}->{$1}; + $retain = $hash->{".retain"}->{'*'} unless defined($retain); + my $qos = $hash->{".qos"}->{$1}; + $qos = $hash->{".qos"}->{'*'} unless defined($qos); + #Log3($hash->{NAME},1,">>>>>>>>>>>>>>>>>> RETAIN: ".$retain); $retain=0; ### TEST + $msgid = send_publish($hash->{IODev}, topic => $hash->{publishReadings}->{$1}, message => $3, qos => $qos, retain => $retain); readingsSingleUpdate($hash,"transmission-state","outgoing publish sent",1); } } else { if (defined $hash->{publishState}) { - $msgid = send_publish($hash->{IODev}, topic => $hash->{publishState}, message => $1, qos => $hash->{qos}, retain => $hash->{retain}); + my $retain = $hash->{".retain"}->{""}; + $retain = $hash->{".retain"}->{'*'} unless defined($retain); + my $qos = $hash->{".qos"}->{""}; + $qos = $hash->{".qos"}->{'*'} unless defined($qos); + #Log3($hash->{NAME},1,">>>>>>>>>>>>>>>>>> RETAIN: ".$retain); $retain=0; ### TEST + $msgid = send_publish($hash->{IODev}, topic => $hash->{publishState}, message => $1, qos => $qos, retain => $retain); readingsSingleUpdate($hash,"transmission-state","outgoing publish sent",1); } } @@ -130,15 +147,18 @@ sub Attr($$$$) { ATTRIBUTE_HANDLER: { $attribute =~ /^subscribeSet(_?)(.*)/ and do { if ($command eq "set") { - unless (defined $hash->{subscribeSets}->{$value} and $hash->{subscribeSets}->{$value} eq $2) { - unless (defined $hash->{subscribeSets}->{$value}) { - client_subscribe_topic($hash,$value); + my ($mqos, $mretain,$mtopic, $mvalue, $mcmd)=MQTT::parsePublishCmdStr($value); + if(!defined($mtopic)) { return "topic may not be empty";} + unless (defined $hash->{subscribeSets}->{$mtopic}->{name} and $hash->{subscribeSets}->{$mtopic}->{name} eq $2) { + unless (defined $hash->{subscribeSets}->{$mtopic}->{name}) { + client_subscribe_topic($hash,$mtopic,$mqos,$mretain); } - $hash->{subscribeSets}->{$value} = $2; + $hash->{subscribeSets}->{$mtopic}->{name} = $2; + $hash->{subscribeSets}->{$mtopic}->{cmd} = $mcmd; } } else { foreach my $topic (keys %{$hash->{subscribeSets}}) { - if ($hash->{subscribeSets}->{$topic} eq $2) { + if ($hash->{subscribeSets}->{$topic}->{name} eq $2) { client_unsubscribe_topic($hash,$topic); delete $hash->{subscribeSets}->{$topic}; last; @@ -163,20 +183,31 @@ sub Attr($$$$) { } last; }; - client_attr($hash,$command,$name,$attribute,$value); + return client_attr($hash,$command,$name,$attribute,$value); } } sub onmessage($$$) { my ($hash,$topic,$message) = @_; - if (defined (my $command = $hash->{subscribeSets}->{$topic})) { - my @args = split ("[ \t]+",$message); - if ($command eq "") { - Log3($hash->{NAME},5,"calling DoSet($hash->{DEF}".(@args ? ",".join(",",@args) : "")); - DoSet($hash->{DEF},@args); - } else { - Log3($hash->{NAME},5,"calling DoSet($hash->{DEF},$command".(@args ? ",".join(",",@args) : "")); - DoSet($hash->{DEF},$command,@args); + if (defined (my $command = $hash->{subscribeSets}->{$topic}->{name})) { + my $do=1; + if(defined (my $cmd = $hash->{subscribeSets}->{$topic}->{cmd})) { + Log3($hash->{NAME},5,"evaluating cmd: $cmd"); + my $name = $hash->{NAME}; + my $device = $hash->{DEF}; + $do=eval($cmd); + Log3($hash->{NAME},1,"ERROR evaluating $cmd: $@") if($@); + $do=1 if (!defined($do)); + } + if($do) { + my @args = split ("[ \t]+",$message); + if ($command eq "") { + Log3($hash->{NAME},5,"calling DoSet($hash->{DEF}".(@args ? ",".join(",",@args) : "")); + DoSet($hash->{DEF},@args); + } else { + Log3($hash->{NAME},5,"calling DoSet($hash->{DEF},$command".(@args ? ",".join(",",@args) : "")); + DoSet($hash->{DEF},$command,@args); + } } } } @@ -213,12 +244,22 @@ sub onmessage($$$) {

    Attributes

    • -

      attr <name> subscribeSet <topic>
      - configures a topic that will issue a 'set <message> whenever a message is received

      +

      attr <name> subscribeSet [{Perl-expression}] [qos:?] [retain:?] <topic>
      + configures a topic that will issue a 'set <message> whenever a message is received
      + QOS and ratain can be optionally defined for this topic.
      + Furthermore, a Perl statement can be provided which is executed when the message is received. The following variables are available for the expression: $hash, $name, $topic, $message, $devname (linked device). Return value decides whether reading is set (true (e.g., 1) or undef) or discarded (false (e.g., 0)). +

      +

      Example:
      + attr mqttest subscribeSet {fhem("do somethin")} /topic/cmd +

    • -

      attr <name> subscribeSet_<reading> <topic>
      - configures a topic that will issue a 'set <reading> <message> whenever a message is received

      +

      attr <name> subscribeSet_<reading> [{Perl-expression}] [qos:?] [retain:?] <topic>
      + configures a topic that will issue a 'set <reading> <message> whenever a message is received. see above +for Perl-Expression/QOS/retain

      +

      Example:
      + attr mqttest subscribeSet_cmd {if ($message eq "config") fhem("set $devname getconfig");; 0} /topic/cmd +

    • attr <name> publishState <topic>
      @@ -232,6 +273,26 @@ sub onmessage($$$) {

      attr <name> publish-topic-base <topic>
      this is used as base path when issueing 'get <device> readings' to construct topics to publish to based on the devices existing readings

    • +
    • +

      attr <name> retain <flags> ...
      + Specifies the retain flag for all or specific readings. Possible values are 0, 1

      +

      Examples:
      + attr mqttest retain 0
      + defines retain 0 for all readings/topics (due to downward compatibility)
      + retain *:0 1 test:1
      + defines retain 0 for all readings/topics except the reading 'test'. Retain for 'test' is 1
      +

      +
    • +
    • +

      attr <name> qos <flags> ...
      + Specifies the QOS flag for all or specific readings. Possible values are 0, 1 or 2. Constants may be also used: at-most-once = 0, at-least-once = 1, exactly-once = 2

      +

      Examples:
      + attr mqttest qos 0
      + defines QOS 0 for all readings/topics (due to downward compatibility)
      + retain *:0 1 test:1
      + defines QOS 0 for all readings/topics except the reading 'test'. Retain for 'test' is 1
      +

      +
    diff --git a/fhem/FHEM/10_MQTT_DEVICE.pm b/fhem/FHEM/10_MQTT_DEVICE.pm index 8f3aaf329..0ec27242a 100644 --- a/fhem/FHEM/10_MQTT_DEVICE.pm +++ b/fhem/FHEM/10_MQTT_DEVICE.pm @@ -41,10 +41,13 @@ sub MQTT_DEVICE_Initialize($) { $hash->{SetFn} = "MQTT::DEVICE::Set"; $hash->{AttrFn} = "MQTT::DEVICE::Attr"; + #$hash->{OnMessageFn} = "MQTT::DEVICE::onmessage"; + $hash->{AttrList} = "IODev ". - "qos:".join(",",keys %MQTT::qos)." ". - "retain:0,1 ". + #"qos:".join(",",keys %MQTT::qos)." ". + "qos ". + "retain ". "publishSet ". "publishSet_.* ". "subscribeReading_.* ". @@ -70,6 +73,10 @@ BEGIN { CommandAttr readingsSingleUpdate Log3 + fhem + defs + AttrVal + ReadingsVal )) }; @@ -87,11 +94,21 @@ sub Set($$$@) { if($command ne '?') { if(defined($hash->{publishSets}->{$command})) { my $value = join " ",@values; - $msgid = send_publish($hash->{IODev}, topic => $hash->{publishSets}->{$command}->{topic}, message => $value, qos => $hash->{qos}, retain => $hash->{retain}); + my $retain = $hash->{".retain"}->{$command}; + $retain = $hash->{".retain"}->{'*'} unless defined($retain); + my $qos = $hash->{".qos"}->{$command}; + $qos = $hash->{".qos"}->{'*'} unless defined($qos); + #Log3($hash->{NAME},1,">>>>>>>>>>>>>>>>>> RETAIN: ".$retain); $retain=0; ### TEST + $msgid = send_publish($hash->{IODev}, topic => $hash->{publishSets}->{$command}->{topic}, message => $value, qos => $qos, retain => $retain); readingsSingleUpdate($hash,$command,$value,1); $mark=1; } elsif(defined($hash->{publishSets}->{""})) { - $msgid = send_publish($hash->{IODev}, topic => $hash->{publishSets}->{""}->{topic}, message => $command, qos => $hash->{qos}, retain => $hash->{retain}); + my $retain = $hash->{".retain"}->{""}; + $retain = $hash->{".retain"}->{'*'} unless defined($retain); + my $qos = $hash->{".qos"}->{""}; + $qos = $hash->{".qos"}->{'*'} unless defined($qos); + #Log3($hash->{NAME},1,">>>>>>>>>>>>>>>>>> RETAIN: ".$retain); $retain=0; ### TEST + $msgid = send_publish($hash->{IODev}, topic => $hash->{publishSets}->{""}->{topic}, message => $command, qos => $qos, retain => $retain); readingsSingleUpdate($hash,"state",$command,1); $mark=1; } @@ -111,15 +128,18 @@ sub Attr($$$$) { ATTRIBUTE_HANDLER: { $attribute =~ /^subscribeReading_(.+)/ and do { if ($command eq "set") { - unless (defined $hash->{subscribeReadings}->{$value} and $hash->{subscribeReadings}->{$value} eq $1) { - unless (defined $hash->{subscribeReadings}->{$value}) { - client_subscribe_topic($hash,$value); + my ($mqos, $mretain,$mtopic, $mvalue, $mcmd)=MQTT::parsePublishCmdStr($value); + if(!defined($mtopic)) {return "topic may not be empty";} + unless (defined $hash->{subscribeReadings}->{$mtopic}->{name} and $hash->{subscribeReadings}->{$mtopic}->{name} eq $1) { + unless (defined $hash->{subscribeReadings}->{$mtopic}->{name}) { + client_subscribe_topic($hash,$mtopic,$mqos,$mretain); } - $hash->{subscribeReadings}->{$value} = $1; + $hash->{subscribeReadings}->{$mtopic}->{name} = $1; + $hash->{subscribeReadings}->{$mtopic}->{cmd} = $mcmd; } } else { foreach my $topic (keys %{$hash->{subscribeReadings}}) { - if ($hash->{subscribeReadings}->{$topic} eq $1) { + if ($hash->{subscribeReadings}->{$topic}->{name} eq $1) { client_unsubscribe_topic($hash,$topic); delete $hash->{subscribeReadings}->{$topic}; CommandDeleteReading(undef,"$hash->{NAME} $1"); @@ -187,17 +207,27 @@ sub Attr($$$$) { } last; }; - client_attr($hash,$command,$name,$attribute,$value); + return client_attr($hash,$command,$name,$attribute,$value); } } sub onmessage($$$) { my ($hash,$topic,$message) = @_; - if (defined (my $reading = $hash->{subscribeReadings}->{$topic})) { - Log3($hash->{NAME},5,"calling readingsSingleUpdate($hash->{NAME},$reading,$message,1"); - readingsSingleUpdate($hash,$reading,$message,1); + if (defined (my $reading = $hash->{subscribeReadings}->{$topic}->{name})) { + my $do=1; + if(defined (my $cmd = $hash->{subscribeReadings}->{$topic}->{cmd})) { + Log3($hash->{NAME},5,"evaluating cmd: $cmd"); + my $name = $hash->{NAME}; + $do=eval($cmd); + Log3($hash->{NAME},1,"ERROR evaluating $cmd: $@") if($@); + $do=1 if (!defined($do)); + } + if($do) { + Log3($hash->{NAME},5,"calling readingsSingleUpdate($hash->{NAME},$reading,$message,1)"); + readingsSingleUpdate($hash,$reading,$message,1); + } } elsif ($topic =~ $hash->{'.autoSubscribeExpr'}) { - Log3($hash->{NAME},5,"calling readingsSingleUpdate($hash->{NAME},$1,$message,1"); + Log3($hash->{NAME},5,"calling readingsSingleUpdate($hash->{NAME},$1,$message,1)"); CommandAttr(undef,"$hash->{NAME} subscribeReading_$1 $topic"); readingsSingleUpdate($hash,$1,$message,1); } @@ -230,19 +260,22 @@ sub onmessage($$$) { sets reading 'state' and publishes the command to topic configured via attr publishSet

  • -

    set <name> <h;reading> <value>
    - sets reading <h;reading> and publishes the command to topic configured via attr publishSet_<h;reading>

    +

    set <name> <reading> <value>
    + sets reading <reading> and publishes the command to topic configured via attr publishSet_<reading>

  • Attributes

    • -

      attr <name> publishSet [<commands>] <topic>
      - configures set commands that may be used to both set reading 'state' and publish to configured topic

      +

      attr <name> publishSet [[<reading>:]<commands_or_options>] <topic>
      + configures set commands and UI-options e.g. 'slider' that may be used to both set given reading ('state' if not defined) and publish to configured topic

      +

      example:
      + attr mqttest publishSet on off switch:on,off level:slider,0,1,100 /topic/123 +

    • -

      attr <name> publishSet_<reading> [<values>] <topic>
      +

      attr <name> publishSet_<reading> [<values>]* <topic>
      configures reading that may be used to both set 'reading' (to optionally configured values) and publish to configured topic

    • @@ -251,8 +284,34 @@ sub onmessage($$$) { e.g a message received with topic 'myhouse/kitchen/temperature' would create and update a reading 'temperature'

    • -

      attr <name> subscribeReading_<reading> <topic>
      - mapps a reading to a specific topic. The reading is updated whenever a message to the configured topic arrives

      +

      attr <name> subscribeReading_<reading> [{Perl-expression}] [qos:?] [retain:?] <topic>
      + mapps a reading to a specific topic. The reading is updated whenever a message to the configured topic arrives.
      + QOS and ratain can be optionally defined for this topic.
      + Furthermore, a Perl statement can be provided which is executed when the message is received. The following variables are available for the expression: $hash, $name, $topic, $message. Return value decides whether reading is set (true (e.g., 1) or undef) or discarded (false (e.g., 0)). +

      +

      Example:
      + attr mqttest subscribeReading_cmd {fhem("set something off")} /topic/cmd +

      +
    • +
    • +

      attr <name> retain <flags> ...
      + Specifies the retain flag for all or specific readings. Possible values are 0, 1

      +

      Examples:
      + attr mqttest retain 0
      + defines retain 0 for all readings/topics (due to downward compatibility)
      + retain *:0 1 test:1
      + defines retain 0 for all readings/topics except the reading 'test'. Retain for 'test' is 1
      +

      +
    • +
    • +

      attr <name> qos <flags> ...
      + Specifies the QOS flag for all or specific readings. Possible values are 0, 1 or 2. Constants may be also used: at-most-once = 0, at-least-once = 1, exactly-once = 2

      +

      Examples:
      + attr mqttest qos 0
      + defines QOS 0 for all readings/topics (due to downward compatibility)
      + retain *:0 1 test:1
      + defines QOS 0 for all readings/topics except the reading 'test'. Retain for 'test' is 1
      +