2
0
mirror of https://github.com/fhem/fhem-mirror.git synced 2025-03-10 09:16:53 +00:00

MQTT: features from hexenmeister (Forum #msg662213)

git-svn-id: https://svn.fhem.de/fhem/trunk@14964 2b470e98-0d58-463d-a4d8-8e2adae1ed80
This commit is contained in:
eisler 2017-08-26 22:15:46 +00:00
parent 0c8cc529d5
commit 52c2007313
4 changed files with 503 additions and 75 deletions

View File

@ -1,5 +1,8 @@
# Add changes at the top of the list. Keep it in ASCII, and 80-char wide. # Add changes at the top of the list. Keep it in ASCII, and 80-char wide.
# Do not insert empty lines here, update check depends on it. # Do not insert empty lines here, update check depends on it.
- feature: 00_MQTT: OnMessageFn, last-will, onConnect/onDisconnect
- feature: 10_MQTT_DEVICE: improved publishSet, retain, subscribeReadings
- feature: 10_MQTT_BRIDGE: improved retain, subscribeSet
- new: 31_Aurora.pm: first release - new: 31_Aurora.pm: first release
- bugfix: 21_HEOS: fix missing curl - bugfix: 21_HEOS: fix missing curl
- bugfix: 31_PLAYBULB: code cleaning - bugfix: 31_PLAYBULB: code cleaning

View File

@ -27,6 +27,7 @@
my %sets = ( my %sets = (
"connect" => "", "connect" => "",
"disconnect" => "", "disconnect" => "",
"publish" => "",
); );
my %gets = ( my %gets = (
@ -52,11 +53,12 @@ sub MQTT_Initialize($) {
# Consumer # Consumer
$hash->{DefFn} = "MQTT::Define"; $hash->{DefFn} = "MQTT::Define";
$hash->{UndefFn} = "MQTT::Undef"; $hash->{UndefFn} = "MQTT::Undef";
$hash->{DeleteFn} = "MQTT::Delete"; $hash->{ShutdownFn} = "MQTT::Shutdown";
$hash->{SetFn} = "MQTT::Set"; $hash->{SetFn} = "MQTT::Set";
$hash->{NotifyFn} = "MQTT::Notify"; $hash->{NotifyFn} = "MQTT::Notify";
$hash->{AttrFn} = "MQTT::Attr";
$hash->{AttrList} = "keep-alive ".$main::readingFnAttributes; $hash->{AttrList} = "keep-alive "."last-will "."on-connect on-disconnect on-timeout".$main::readingFnAttributes;
} }
package MQTT; package MQTT;
@ -86,10 +88,16 @@ BEGIN {GP_Import(qw(
RemoveInternalTimer RemoveInternalTimer
InternalTimer InternalTimer
AttrVal AttrVal
ReadingsVal
Log3 Log3
AssignIoPort AssignIoPort
getKeyValue getKeyValue
setKeyValue setKeyValue
CallFn
defs
modules
looks_like_number
fhem
))}; ))};
sub Define($$) { sub Define($$) {
@ -112,6 +120,8 @@ sub Define($$) {
$hash->{DEF} = $host; $hash->{DEF} = $host;
#readingsSingleUpdate($hash,"connection","disconnected",0);
if ($main::init_done) { if ($main::init_done) {
return Start($hash); return Start($hash);
} else { } else {
@ -119,19 +129,67 @@ sub Define($$) {
} }
} }
sub Undef($$) { sub Undef($) {
my ($hash, $name) = @_; my $hash = shift;
Stop($hash); Stop($hash);
return undef; my $name = $hash->{NAME};
}
sub Delete($$) {
my ($hash, $name) = @_;
setKeyValue($name."_user",undef); setKeyValue($name."_user",undef);
setKeyValue($name."_pass",undef); setKeyValue($name."_pass",undef);
return undef; return undef;
} }
sub Shutdown($) {
my $hash = shift;
Stop($hash);
my $name = $hash->{NAME};
Log3($name,1,"Shutdown executed");
return undef;
}
sub onConnect($) {
my $hash = shift;
my $name = $hash->{NAME};
my $cmdstr = AttrVal($name,"on-connect",undef);
return process_event($hash,$cmdstr);
}
sub onDisconnect($) {
my $hash = shift;
my $name = $hash->{NAME};
my $cmdstr = AttrVal($name,"on-disconnect",undef);
return process_event($hash,$cmdstr);
}
sub onTimeout($) {
my $hash = shift;
my $name = $hash->{NAME};
my $cmdstr = AttrVal($name,"on-timeout",undef);
if($cmdstr) {
return eval($cmdstr);
}
}
sub process_event($$) {
my $hash = shift;
my $str = shift;
my ($qos, $retain,$topic, $message, $cmd) = parsePublishCmdStr($str);
my $do=1;
if($cmd) {
my $name = $hash->{NAME};
$do=eval($cmd);
$do=1 if (!defined($do));
#no strict "refs";
#my $ret = &{$hash->{WBCallback}}($hash);
#use strict "refs";
}
if($do && defined($topic)) {
$qos = MQTT_QOS_AT_MOST_ONCE unless defined($qos);
send_publish($hash, topic => $topic, message => $message, qos => $qos, retain => $retain);
}
}
sub Set($@) { sub Set($@) {
my ($hash, @a) = @_; my ($hash, @a) = @_;
return "Need at least one parameters" if(@a < 2); return "Need at least one parameters" if(@a < 2);
@ -149,9 +207,98 @@ sub Set($@) {
Stop($hash); Stop($hash);
last; last;
}; };
$command eq "publish" and do {
shift(@a);
shift(@a);
#if(scalar(@a)<2) {return "not enough parameters. usage: publish [qos [retain]] topic value";}
#my $qos=0;
#my $retain=0;
#if(looks_like_number ($a[0])) {
# $qos = int($a[0]);
# $qos = 0 if $qos>1;
# shift(@a);
# if(looks_like_number ($a[0])) {
# $retain = int($a[0]);
# $retain = 0 if $retain>2;
# shift(@a);
# }
#}
#if(scalar(@a)<2) {return "missing parameters. usage: publish [qos [retain]] topic value";}
#my $topic = shift(@a);
#my $value = join (" ", @a);
my ($qos, $retain,$topic, $value) = parsePublishCmd(@a);
return "missing parameters. usage: publish [qos:?] [retain:?] topic value1 [value2]..." if(!$topic);
return "wrong parameter. topic may nob be '#' or '+'" if ($topic eq '#' or $topic eq '+');
$qos = MQTT_QOS_AT_MOST_ONCE unless defined($qos);
my $msgid = send_publish($hash, topic => $topic, message => $value, qos => $qos, retain => $retain);
last;
}
}; };
} }
sub parsePublishCmdStr($) {
my ($str) = @_;
if(defined($str) && $str=~m/\s*(?:({.*})\s+)?(.*)/) {
my $exp = $1;
my $rest = $2;
if ($rest){
my @lwa = split("[ \t]+",$rest);
unshift (@lwa,$exp) if($exp);
return parsePublishCmd(@lwa);
}
}
return undef;
}
sub parsePublishCmd(@) {
my @a = @_;
# [qos:?] [retain:?] topic value
return undef if(!@a);
return undef if(scalar(@a)<1);
my $qos = 0;
my $retain = 0;
my $topic = undef;
my $value = "\0";
my $expression = undef;
while (scalar(@a)>0) {
my $av = shift(@a);
if($av =~ /\{.*\}/) {
$expression = $av;
next;
}
my ($pn,$pv) = split(":",$av);
if(defined($pv)) {
if($pn eq "qos") {
if($pv >=0 && $pv <=2) {
$qos = $pv;
}
} elsif($pn eq "retain") {
if($pv >=0 && $pv <=1) {
$retain = $pv;
}
} else {
# ignore
next;
}
} else {
$topic = $av;
last;
}
}
if(scalar(@a)>0) {
$value = join(" ", @a);
}
return undef unless $topic || $expression;
return ($qos, $retain,$topic, $value, $expression);
}
sub Notify($$) { sub Notify($$) {
my ($hash,$dev) = @_; my ($hash,$dev) = @_;
if( grep(m/^(INITIALIZED|REREADCFG)$/, @{$dev->{CHANGED}}) ) { if( grep(m/^(INITIALIZED|REREADCFG)$/, @{$dev->{CHANGED}}) ) {
@ -177,17 +324,47 @@ sub Attr($$$$) {
}; };
last; last;
}; };
$attribute eq "last-will" and do {
if($hash->{STATE} ne "disconnected") {
Stop($hash);
InternalTimer(gettimeofday()+1, "MQTT::Start", $hash, 0);
}
last;
};
}; };
} }
#sub Reconnect($){
# my $hash = shift;
# Stop($hash);
# Start($hash);
#}
sub Start($) { sub Start($) {
my $hash = shift; my $hash = shift;
my $firsttime = $hash->{".cinitmark"};
if(defined($firsttime)) {
my $cstate=ReadingsVal($hash->{NAME},"connection","");
if($cstate ne "disconnected" && $cstate ne "timed-out") {
return undef;
}
} else {
$hash->{".cinitmark"} = 1;
}
DevIo_CloseDev($hash); DevIo_CloseDev($hash);
return DevIo_OpenDev($hash, 0, "MQTT::Init"); return DevIo_OpenDev($hash, 0, "MQTT::Init");
} }
sub Stop($) { sub Stop($) {
my $hash = shift; my $hash = shift;
my $cstate=ReadingsVal($hash->{NAME},"connection","");
if($cstate eq "disconnected" || $cstate eq "timed-out") {
return undef;
}
send_disconnect($hash); send_disconnect($hash);
DevIo_CloseDev($hash); DevIo_CloseDev($hash);
RemoveInternalTimer($hash); RemoveInternalTimer($hash);
@ -221,7 +398,10 @@ sub Init($) {
sub Timer($) { sub Timer($) {
my $hash = shift; my $hash = shift;
RemoveInternalTimer($hash); RemoveInternalTimer($hash);
readingsSingleUpdate($hash,"connection","timed-out",1) unless $hash->{ping_received}; unless ($hash->{ping_received}) {
onTimeout($hash);
readingsSingleUpdate($hash,"connection","timed-out",1) ;#unless $hash->{ping_received};
}
$hash->{ping_received} = 0; $hash->{ping_received} = 0;
InternalTimer(gettimeofday()+$hash->{timeout}, "MQTT::Timer", $hash, 0); InternalTimer(gettimeofday()+$hash->{timeout}, "MQTT::Timer", $hash, 0);
send_ping($hash); send_ping($hash);
@ -242,6 +422,7 @@ sub Read {
MESSAGE_TYPE: { MESSAGE_TYPE: {
$message_type == MQTT_CONNACK and do { $message_type == MQTT_CONNACK and do {
readingsSingleUpdate($hash,"connection","connected",1); readingsSingleUpdate($hash,"connection","connected",1);
onConnect($hash);
GP_ForallClients($hash,\&client_start); GP_ForallClients($hash,\&client_start);
foreach my $message_id (keys %{$hash->{messages}}) { foreach my $message_id (keys %{$hash->{messages}}) {
my $msg = $hash->{messages}->{$message_id}->{message}; my $msg = $hash->{messages}->{$message_id}->{message};
@ -258,10 +439,15 @@ sub Read {
Log3($client->{NAME},5,"publish received for $topic, ".$mqtt->message()); Log3($client->{NAME},5,"publish received for $topic, ".$mqtt->message());
if (grep { $topic =~ $_ } @{$client->{subscribeExpr}}) { if (grep { $topic =~ $_ } @{$client->{subscribeExpr}}) {
readingsSingleUpdate($client,"transmission-state","incoming publish received",1); readingsSingleUpdate($client,"transmission-state","incoming publish received",1);
if ($client->{TYPE} eq "MQTT_DEVICE") { my $fn = $modules{$defs{$client->{NAME}}{TYPE}}{OnMessageFn};
if($fn) {
CallFn($client->{NAME},"OnMessageFn",($client,$topic,$mqtt->message()))
} elsif ($client->{TYPE} eq "MQTT_DEVICE") {
MQTT::DEVICE::onmessage($client,$topic,$mqtt->message()); MQTT::DEVICE::onmessage($client,$topic,$mqtt->message());
} else { } elsif ($client->{TYPE} eq "MQTT_BRIDGE") {
MQTT::BRIDGE::onmessage($client,$topic,$mqtt->message()); MQTT::BRIDGE::onmessage($client,$topic,$mqtt->message());
} else {
Log3($client->{NAME},1,"unexpected client or no OnMessageFn defined: ".$client->{TYPE});
} }
}; };
},undef); },undef);
@ -371,7 +557,11 @@ sub send_connect($) {
my $name = $hash->{NAME}; my $name = $hash->{NAME};
my $user = getKeyValue($name."_user"); my $user = getKeyValue($name."_user");
my $pass = getKeyValue($name."_pass"); my $pass = getKeyValue($name."_pass");
return send_message($hash, message_type => MQTT_CONNECT, keep_alive_timer => $hash->{timeout}, user_name => $user, password => $pass);
my $lw = AttrVal($name,"last-will",undef);
my ($willqos, $willretain,$willtopic, $willmessage) = parsePublishCmdStr($lw);
return send_message($hash, message_type => MQTT_CONNECT, keep_alive_timer => $hash->{timeout}, user_name => $user, password => $pass, will_topic => $willtopic, will_message => $willmessage, will_retain => $willretain, will_qos => $willqos);
}; };
sub send_publish($@) { sub send_publish($@) {
@ -405,7 +595,9 @@ sub send_ping($) {
}; };
sub send_disconnect($) { sub send_disconnect($) {
return send_message(shift, message_type => MQTT_DISCONNECT); my $hash = shift;
onDisconnect($hash);
return send_message($hash, message_type => MQTT_DISCONNECT);
}; };
sub send_message($$$@) { sub send_message($$$@) {
@ -419,6 +611,7 @@ sub send_message($$$@) {
timeout => gettimeofday()+$hash->{timeout}, timeout => gettimeofday()+$hash->{timeout},
}; };
} }
DevIo_SimpleWrite($hash,$message->bytes,undef); DevIo_SimpleWrite($hash,$message->bytes,undef);
}; };
@ -432,15 +625,17 @@ sub topic_to_regexp($) {
return "^$t\$"; return "^$t\$";
} }
sub client_subscribe_topic($$) { sub client_subscribe_topic($$;$$) {
my ($client,$topic) = @_; my ($client,$topic,$qos,$retain) = @_;
push @{$client->{subscribe}},$topic unless grep {$_ eq $topic} @{$client->{subscribe}}; push @{$client->{subscribe}},$topic unless grep {$_ eq $topic} @{$client->{subscribe}};
my $expr = topic_to_regexp($topic); my $expr = topic_to_regexp($topic);
push @{$client->{subscribeExpr}},$expr unless grep {$_ eq $expr} @{$client->{subscribeExpr}}; push @{$client->{subscribeExpr}},$expr unless grep {$_ eq $expr} @{$client->{subscribeExpr}};
if ($main::init_done) { if ($main::init_done) {
if (my $mqtt = $client->{IODev}) {; if (my $mqtt = $client->{IODev}) {;
$qos = $client->{".qos"}->{"*"} unless defined $qos; # MQTT_QOS_AT_MOST_ONCE
$retain = 0 unless defined $retain; # not supported yet
my $msgid = send_subscribe($mqtt, my $msgid = send_subscribe($mqtt,
topics => [[$topic => $client->{qos} || MQTT_QOS_AT_MOST_ONCE]], topics => [[$topic => $qos || MQTT_QOS_AT_MOST_ONCE]],
); );
$client->{message_ids}->{$msgid}++; $client->{message_ids}->{$msgid}++;
readingsSingleUpdate($client,"transmission-state","subscribe sent",1) readingsSingleUpdate($client,"transmission-state","subscribe sent",1)
@ -468,8 +663,9 @@ sub Client_Define($$) {
my ( $client, $def ) = @_; my ( $client, $def ) = @_;
$client->{NOTIFYDEV} = $client->{DEF} if $client->{DEF}; $client->{NOTIFYDEV} = $client->{DEF} if $client->{DEF};
$client->{qos} = MQTT_QOS_AT_MOST_ONCE; #$client->{qos} = MQTT_QOS_AT_MOST_ONCE; ### ALT
$client->{retain} = 0; $client->{".qos"}->{'*'} = 0;
$client->{".retain"}->{'*'} = "0";
$client->{subscribe} = []; $client->{subscribe} = [];
$client->{subscribeExpr} = []; $client->{subscribeExpr} = [];
AssignIoPort($client); AssignIoPort($client);
@ -485,25 +681,104 @@ sub Client_Undefine($) {
client_stop(shift); client_stop(shift);
return undef; return undef;
}; };
#use Data::Dumper;
sub client_attr($$$$$) { sub client_attr($$$$$) {
my ($client,$command,$name,$attribute,$value) = @_; my ($client,$command,$name,$attribute,$value) = @_;
ATTRIBUTE_HANDLER: { ATTRIBUTE_HANDLER: {
$attribute eq "qos" and do { $attribute eq "qos" and do {
if ($command eq "set") { #if ($command eq "set") {
$client->{qos} = $MQTT::qos{$value}; # $client->{qos} = $MQTT::qos{$value}; ### ALT
#} else {
# $client->{qos} = MQTT_QOS_AT_MOST_ONCE; ### ALT
#}
delete($client->{".qos"});
if ($command ne "set") {
delete($client->{".qos"});
$client->{".qos"}->{"*"} = "0";
} else { } else {
$client->{qos} = MQTT_QOS_AT_MOST_ONCE;
my @values = ();
if(!defined($value) || $value=~/^[ \t]*$/) {
return "QOS value may not be empty. Format: [<reading>|*:]0|1|2";
} }
@values = split("[ \t]+",$value);
foreach my $set (@values) {
my($rname,$rvalue) = split(":",$set);
if(!defined($rvalue)) {
$rvalue=$rname;
$rname="";
$rname="*" if (scalar(@values)==1); # backward compatibility: single value without a reading name should be applied to all
}
#if ($command eq "set") {
# Map constants
#$rvalue = MQTT_QOS_AT_MOST_ONCE if($rvalue eq qos_string(MQTT_QOS_AT_MOST_ONCE));
#$rvalue = MQTT_QOS_AT_LEAST_ONCE if($rvalue eq qos_string(MQTT_QOS_AT_LEAST_ONCE));
#$rvalue = MQTT_QOS_EXACTLY_ONCE if($rvalue eq qos_string(MQTT_QOS_EXACTLY_ONCE));
$rvalue=$MQTT::qos{$rvalue} if(defined($MQTT::qos{$rvalue}));
if($rvalue ne "0" && $rvalue ne "1" && $rvalue ne "2") {
return "unexpected QOS value $rvalue. use 0, 1 or 2. Constants may be also used (".MQTT_QOS_AT_MOST_ONCE."=".qos_string(MQTT_QOS_AT_MOST_ONCE).", ".MQTT_QOS_AT_LEAST_ONCE."=".qos_string(MQTT_QOS_AT_LEAST_ONCE).", ".MQTT_QOS_EXACTLY_ONCE."=".qos_string(MQTT_QOS_EXACTLY_ONCE)."). Format: [<reading>|*:]0|1|2";
}
#$rvalue="1" unless ($rvalue eq "0");
$client->{".qos"}->{$rname} = $rvalue;
#} else {
# delete($client->{".qos"}->{$rname});
# $client->{".qos"}->{"*"} = "0" if($rname eq "*");
#}
}
}
my $showqos = "";
if(defined($client->{".qos"})) {
foreach my $rname (sort keys %{$client->{".qos"}}) {
my $rvalue = $client->{".qos"}->{$rname};
$rname="[state]" if ($rname eq "");
$showqos.=$rname.':'.$rvalue.' ';
}
}
$client->{"qos"} = $showqos;
last; last;
}; };
$attribute eq "retain" and do { $attribute eq "retain" and do {
if ($command eq "set") { delete($client->{".retain"});
$client->{retain} = $value;
if ($command ne "set") {
delete($client->{".retain"});
$client->{".retain"}->{"*"} = "0";
} else { } else {
$client->{retain} = 0; my @values = ();
if(!defined($value) || $value=~/^[ \t]*$/) {
return "retain value may not be empty. Format: [<reading>|*:]0|1";
} }
@values = split("[ \t]+",$value);
foreach my $set (@values) {
my($rname,$rvalue) = split(":",$set);
if(!defined($rvalue)) {
$rvalue=$rname;
$rname="";
$rname="*" if (scalar(@values)==1); # backward compatibility: single value without a reading name should be applied to all
}
if($rvalue ne "0" && $rvalue ne "1") {
return "unexpected retain value. use 0 or 1. Format: [<reading>|*:]0|1";
}
$client->{".retain"}->{$rname} = $rvalue;
}
}
my $showretain = "";
if(defined($client->{".retain"})) {
foreach my $rname (sort keys %{$client->{".retain"}}) {
my $rvalue = $client->{".retain"}->{$rname};
$rname="[state]" if ($rname eq "");
$showretain.=$rname.':'.$rvalue.' ';
}
}
$client->{"retain"} = $showretain;
last; last;
}; };
$attribute eq "IODev" and do { $attribute eq "IODev" and do {
@ -529,7 +804,7 @@ sub client_start($) {
} }
if (@{$client->{subscribe}}) { if (@{$client->{subscribe}}) {
my $msgid = send_subscribe($client->{IODev}, my $msgid = send_subscribe($client->{IODev},
topics => [map { [$_ => $client->{qos} || MQTT_QOS_AT_MOST_ONCE] } @{$client->{subscribe}}], topics => [map { [$_ => $client->{".qos"}->{$_} || MQTT_QOS_AT_MOST_ONCE] } @{$client->{subscribe}}],
); );
$client->{message_ids}->{$msgid}++; $client->{message_ids}->{$msgid}++;
readingsSingleUpdate($client,"transmission-state","subscribe sent",1); readingsSingleUpdate($client,"transmission-state","subscribe sent",1);
@ -578,6 +853,10 @@ sub client_stop($) {
<p><code>set &lt;name&gt; disconnect</code><br/> <p><code>set &lt;name&gt; disconnect</code><br/>
disconnects the MQTT-device from the mqtt-broker</p> disconnects the MQTT-device from the mqtt-broker</p>
</li> </li>
<li>
<p><code>set &lt;name&gt; publish [qos:?] [retain:?] &lt;topic&gt; &lt;message&gt;</code><br/>
sends message to the specified topic</p>
</li>
</ul> </ul>
<a name="MQTTattr"></a> <a name="MQTTattr"></a>
<p><b>Attributes</b></p> <p><b>Attributes</b></p>
@ -586,6 +865,32 @@ sub client_stop($) {
<p>keep-alive<br/> <p>keep-alive<br/>
sets the keep-alive time (in seconds).</p> sets the keep-alive time (in seconds).</p>
</li> </li>
<li>
<p><code>attr &lt;name&gt; last-will [qos:?] [retain:?] &lt;topic&gt; &lt;message&gt;</code><br/>
Support for MQTT feature "last will"
</p>
<p>example:<br/>
<code>attr mqtt last-will /fhem/status crashed</code>
</p>
</li>
<li>
<p>on-connect, on-disconnect<br/>
<code>attr &lt;name&gt; on-connect {Perl-expression} &lt;topic&gt; &lt;message&gt;</code><br/>
Publish the specified message to a topic at connect / disconnect (counterpart to lastwill) and / or evaluation of Perl expression<br/>
If a Perl expression is provided, the message is sent only if expression returns true (for example, 1) or undef.<br/>
The following variables are passed to the expression at evaluation: $hash, $name, $qos, $retain, $topic, $message.
</p>
<p>examples:<br/>
<code>attr mqtt on-connect /topic/status connected</code><br/>
<code>attr mqtt on-connect {Log3("abc",1,"on-connect")} /fhem/status connected</code>
</p>
</li>
<li>
<p>on-timeout<br/>
<code>attr &lt;name&gt; on-timeout {Perl-expression}</code>
evaluate the given Perl expression on timeout<br/>
</p>
</li>
</ul> </ul>
</ul> </ul>

View File

@ -48,8 +48,9 @@ sub MQTT_BRIDGE_Initialize($) {
$hash->{AttrList} = $hash->{AttrList} =
"IODev ". "IODev ".
"qos:".join(",",keys %MQTT::qos)." ". #"qos:".join(",",keys %MQTT::qos)." ".
"retain:0,1 ". "qos ".
"retain ".
"publish-topic-base ". "publish-topic-base ".
"publishState ". "publishState ".
"publishReading_.* ". "publishReading_.* ".
@ -77,6 +78,10 @@ BEGIN {
readingsSingleUpdate readingsSingleUpdate
Log3 Log3
DoSet DoSet
fhem
defs
AttrVal
ReadingsVal
)) ))
}; };
@ -99,23 +104,35 @@ sub Get($$@) {
}; };
}; };
} }
#use Data::Dumper;
sub Notify() { sub Notify() {
my ($hash,$dev) = @_; my ($hash,$dev) = @_;
Log3($hash->{NAME},5,"Notify for $dev->{NAME}"); Log3($hash->{NAME},5,"Notify for $dev->{NAME}");
#Log3($hash->{NAME},5,">>>>=====".Dumper($dev->{CHANGED}));
foreach my $event (@{$dev->{CHANGED}}) { foreach my $event (@{$dev->{CHANGED}}) {
$event =~ /^([^:]+)(: )?(.*)$/; $event =~ /^([^:]+)(: )?(.*)$/;
#Log3($hash->{NAME},5,">>>>>>>>>>>>>>>>>>");
Log3($hash->{NAME},5,"$event, '".((defined $1) ? $1 : "-undef-")."', '".((defined $3) ? $3 : "-undef-")."'"); Log3($hash->{NAME},5,"$event, '".((defined $1) ? $1 : "-undef-")."', '".((defined $3) ? $3 : "-undef-")."'");
my $msgid; my $msgid;
if (defined $3 and $3 ne "") { if (defined $3 and $3 ne "") {
if (defined $hash->{publishReadings}->{$1}) { if (defined $hash->{publishReadings}->{$1}) {
$msgid = send_publish($hash->{IODev}, topic => $hash->{publishReadings}->{$1}, message => $3, qos => $hash->{qos}, retain => $hash->{retain}); my $retain = $hash->{".retain"}->{$1};
$retain = $hash->{".retain"}->{'*'} unless defined($retain);
my $qos = $hash->{".qos"}->{$1};
$qos = $hash->{".qos"}->{'*'} unless defined($qos);
#Log3($hash->{NAME},1,">>>>>>>>>>>>>>>>>> RETAIN: ".$retain); $retain=0; ### TEST
$msgid = send_publish($hash->{IODev}, topic => $hash->{publishReadings}->{$1}, message => $3, qos => $qos, retain => $retain);
readingsSingleUpdate($hash,"transmission-state","outgoing publish sent",1); readingsSingleUpdate($hash,"transmission-state","outgoing publish sent",1);
} }
} else { } else {
if (defined $hash->{publishState}) { if (defined $hash->{publishState}) {
$msgid = send_publish($hash->{IODev}, topic => $hash->{publishState}, message => $1, qos => $hash->{qos}, retain => $hash->{retain}); my $retain = $hash->{".retain"}->{""};
$retain = $hash->{".retain"}->{'*'} unless defined($retain);
my $qos = $hash->{".qos"}->{""};
$qos = $hash->{".qos"}->{'*'} unless defined($qos);
#Log3($hash->{NAME},1,">>>>>>>>>>>>>>>>>> RETAIN: ".$retain); $retain=0; ### TEST
$msgid = send_publish($hash->{IODev}, topic => $hash->{publishState}, message => $1, qos => $qos, retain => $retain);
readingsSingleUpdate($hash,"transmission-state","outgoing publish sent",1); readingsSingleUpdate($hash,"transmission-state","outgoing publish sent",1);
} }
} }
@ -130,15 +147,18 @@ sub Attr($$$$) {
ATTRIBUTE_HANDLER: { ATTRIBUTE_HANDLER: {
$attribute =~ /^subscribeSet(_?)(.*)/ and do { $attribute =~ /^subscribeSet(_?)(.*)/ and do {
if ($command eq "set") { if ($command eq "set") {
unless (defined $hash->{subscribeSets}->{$value} and $hash->{subscribeSets}->{$value} eq $2) { my ($mqos, $mretain,$mtopic, $mvalue, $mcmd)=MQTT::parsePublishCmdStr($value);
unless (defined $hash->{subscribeSets}->{$value}) { if(!defined($mtopic)) { return "topic may not be empty";}
client_subscribe_topic($hash,$value); unless (defined $hash->{subscribeSets}->{$mtopic}->{name} and $hash->{subscribeSets}->{$mtopic}->{name} eq $2) {
unless (defined $hash->{subscribeSets}->{$mtopic}->{name}) {
client_subscribe_topic($hash,$mtopic,$mqos,$mretain);
} }
$hash->{subscribeSets}->{$value} = $2; $hash->{subscribeSets}->{$mtopic}->{name} = $2;
$hash->{subscribeSets}->{$mtopic}->{cmd} = $mcmd;
} }
} else { } else {
foreach my $topic (keys %{$hash->{subscribeSets}}) { foreach my $topic (keys %{$hash->{subscribeSets}}) {
if ($hash->{subscribeSets}->{$topic} eq $2) { if ($hash->{subscribeSets}->{$topic}->{name} eq $2) {
client_unsubscribe_topic($hash,$topic); client_unsubscribe_topic($hash,$topic);
delete $hash->{subscribeSets}->{$topic}; delete $hash->{subscribeSets}->{$topic};
last; last;
@ -163,13 +183,23 @@ sub Attr($$$$) {
} }
last; last;
}; };
client_attr($hash,$command,$name,$attribute,$value); return client_attr($hash,$command,$name,$attribute,$value);
} }
} }
sub onmessage($$$) { sub onmessage($$$) {
my ($hash,$topic,$message) = @_; my ($hash,$topic,$message) = @_;
if (defined (my $command = $hash->{subscribeSets}->{$topic})) { if (defined (my $command = $hash->{subscribeSets}->{$topic}->{name})) {
my $do=1;
if(defined (my $cmd = $hash->{subscribeSets}->{$topic}->{cmd})) {
Log3($hash->{NAME},5,"evaluating cmd: $cmd");
my $name = $hash->{NAME};
my $device = $hash->{DEF};
$do=eval($cmd);
Log3($hash->{NAME},1,"ERROR evaluating $cmd: $@") if($@);
$do=1 if (!defined($do));
}
if($do) {
my @args = split ("[ \t]+",$message); my @args = split ("[ \t]+",$message);
if ($command eq "") { if ($command eq "") {
Log3($hash->{NAME},5,"calling DoSet($hash->{DEF}".(@args ? ",".join(",",@args) : "")); Log3($hash->{NAME},5,"calling DoSet($hash->{DEF}".(@args ? ",".join(",",@args) : ""));
@ -179,6 +209,7 @@ sub onmessage($$$) {
DoSet($hash->{DEF},$command,@args); DoSet($hash->{DEF},$command,@args);
} }
} }
}
} }
1; 1;
@ -213,12 +244,22 @@ sub onmessage($$$) {
<p><b>Attributes</b></p> <p><b>Attributes</b></p>
<ul> <ul>
<li> <li>
<p><code>attr &lt;name&gt; subscribeSet &lt;topic&gt;</code><br/> <p><code>attr &lt;name&gt; subscribeSet [{Perl-expression}] [qos:?] [retain:?] &lt;topic&gt;</code><br/>
configures a topic that will issue a 'set &lt;message&gt; whenever a message is received</p> configures a topic that will issue a 'set &lt;message&gt; whenever a message is received<br/>
QOS and ratain can be optionally defined for this topic. <br/>
Furthermore, a Perl statement can be provided which is executed when the message is received. The following variables are available for the expression: $hash, $name, $topic, $message, $devname (linked device). Return value decides whether reading is set (true (e.g., 1) or undef) or discarded (false (e.g., 0)).
</p>
<p>Example:<br/>
<code>attr mqttest subscribeSet {fhem("do somethin")} /topic/cmd</code>
</p>
</li> </li>
<li> <li>
<p><code>attr &lt;name&gt; subscribeSet_&lt;reading&gt; &lt;topic&gt;</code><br/> <p><code>attr &lt;name&gt; subscribeSet_&lt;reading&gt; [{Perl-expression}] [qos:?] [retain:?] &lt;topic&gt;</code><br/>
configures a topic that will issue a 'set &lt;reading&gt; &lt;message&gt; whenever a message is received</p> configures a topic that will issue a 'set &lt;reading&gt; &lt;message&gt; whenever a message is received. see above
for Perl-Expression/QOS/retain</p>
<p>Example:<br/>
<code>attr mqttest subscribeSet_cmd {if ($message eq "config") fhem("set $devname getconfig");; 0} /topic/cmd</code>
</p>
</li> </li>
<li> <li>
<p><code>attr &lt;name&gt; publishState &lt;topic&gt;</code><br/> <p><code>attr &lt;name&gt; publishState &lt;topic&gt;</code><br/>
@ -232,6 +273,26 @@ sub onmessage($$$) {
<p><code>attr &lt;name&gt; publish-topic-base &lt;topic&gt;</code><br/> <p><code>attr &lt;name&gt; publish-topic-base &lt;topic&gt;</code><br/>
this is used as base path when issueing 'get &lt;device&gt; readings' to construct topics to publish to based on the devices existing readings</p> this is used as base path when issueing 'get &lt;device&gt; readings' to construct topics to publish to based on the devices existing readings</p>
</li> </li>
<li>
<p><code>attr &lt;name&gt; retain &lt;flags&gt; ...</code><br/>
Specifies the retain flag for all or specific readings. Possible values are 0, 1</p>
<p>Examples:<br/>
<code>attr mqttest retain 0</code><br/>
defines retain 0 for all readings/topics (due to downward compatibility)<br>
<code> retain *:0 1 test:1</code><br/>
defines retain 0 for all readings/topics except the reading 'test'. Retain for 'test' is 1<br>
</p>
</li>
<li>
<p><code>attr &lt;name&gt; qos &lt;flags&gt; ...</code><br/>
Specifies the QOS flag for all or specific readings. Possible values are 0, 1 or 2. Constants may be also used: at-most-once = 0, at-least-once = 1, exactly-once = 2</p>
<p>Examples:<br/>
<code>attr mqttest qos 0</code><br/>
defines QOS 0 for all readings/topics (due to downward compatibility)<br>
<code> retain *:0 1 test:1</code><br/>
defines QOS 0 for all readings/topics except the reading 'test'. Retain for 'test' is 1<br>
</p>
</li>
</ul> </ul>
</ul> </ul>

View File

@ -41,10 +41,13 @@ sub MQTT_DEVICE_Initialize($) {
$hash->{SetFn} = "MQTT::DEVICE::Set"; $hash->{SetFn} = "MQTT::DEVICE::Set";
$hash->{AttrFn} = "MQTT::DEVICE::Attr"; $hash->{AttrFn} = "MQTT::DEVICE::Attr";
#$hash->{OnMessageFn} = "MQTT::DEVICE::onmessage";
$hash->{AttrList} = $hash->{AttrList} =
"IODev ". "IODev ".
"qos:".join(",",keys %MQTT::qos)." ". #"qos:".join(",",keys %MQTT::qos)." ".
"retain:0,1 ". "qos ".
"retain ".
"publishSet ". "publishSet ".
"publishSet_.* ". "publishSet_.* ".
"subscribeReading_.* ". "subscribeReading_.* ".
@ -70,6 +73,10 @@ BEGIN {
CommandAttr CommandAttr
readingsSingleUpdate readingsSingleUpdate
Log3 Log3
fhem
defs
AttrVal
ReadingsVal
)) ))
}; };
@ -87,11 +94,21 @@ sub Set($$$@) {
if($command ne '?') { if($command ne '?') {
if(defined($hash->{publishSets}->{$command})) { if(defined($hash->{publishSets}->{$command})) {
my $value = join " ",@values; my $value = join " ",@values;
$msgid = send_publish($hash->{IODev}, topic => $hash->{publishSets}->{$command}->{topic}, message => $value, qos => $hash->{qos}, retain => $hash->{retain}); my $retain = $hash->{".retain"}->{$command};
$retain = $hash->{".retain"}->{'*'} unless defined($retain);
my $qos = $hash->{".qos"}->{$command};
$qos = $hash->{".qos"}->{'*'} unless defined($qos);
#Log3($hash->{NAME},1,">>>>>>>>>>>>>>>>>> RETAIN: ".$retain); $retain=0; ### TEST
$msgid = send_publish($hash->{IODev}, topic => $hash->{publishSets}->{$command}->{topic}, message => $value, qos => $qos, retain => $retain);
readingsSingleUpdate($hash,$command,$value,1); readingsSingleUpdate($hash,$command,$value,1);
$mark=1; $mark=1;
} elsif(defined($hash->{publishSets}->{""})) { } elsif(defined($hash->{publishSets}->{""})) {
$msgid = send_publish($hash->{IODev}, topic => $hash->{publishSets}->{""}->{topic}, message => $command, qos => $hash->{qos}, retain => $hash->{retain}); my $retain = $hash->{".retain"}->{""};
$retain = $hash->{".retain"}->{'*'} unless defined($retain);
my $qos = $hash->{".qos"}->{""};
$qos = $hash->{".qos"}->{'*'} unless defined($qos);
#Log3($hash->{NAME},1,">>>>>>>>>>>>>>>>>> RETAIN: ".$retain); $retain=0; ### TEST
$msgid = send_publish($hash->{IODev}, topic => $hash->{publishSets}->{""}->{topic}, message => $command, qos => $qos, retain => $retain);
readingsSingleUpdate($hash,"state",$command,1); readingsSingleUpdate($hash,"state",$command,1);
$mark=1; $mark=1;
} }
@ -111,15 +128,18 @@ sub Attr($$$$) {
ATTRIBUTE_HANDLER: { ATTRIBUTE_HANDLER: {
$attribute =~ /^subscribeReading_(.+)/ and do { $attribute =~ /^subscribeReading_(.+)/ and do {
if ($command eq "set") { if ($command eq "set") {
unless (defined $hash->{subscribeReadings}->{$value} and $hash->{subscribeReadings}->{$value} eq $1) { my ($mqos, $mretain,$mtopic, $mvalue, $mcmd)=MQTT::parsePublishCmdStr($value);
unless (defined $hash->{subscribeReadings}->{$value}) { if(!defined($mtopic)) {return "topic may not be empty";}
client_subscribe_topic($hash,$value); unless (defined $hash->{subscribeReadings}->{$mtopic}->{name} and $hash->{subscribeReadings}->{$mtopic}->{name} eq $1) {
unless (defined $hash->{subscribeReadings}->{$mtopic}->{name}) {
client_subscribe_topic($hash,$mtopic,$mqos,$mretain);
} }
$hash->{subscribeReadings}->{$value} = $1; $hash->{subscribeReadings}->{$mtopic}->{name} = $1;
$hash->{subscribeReadings}->{$mtopic}->{cmd} = $mcmd;
} }
} else { } else {
foreach my $topic (keys %{$hash->{subscribeReadings}}) { foreach my $topic (keys %{$hash->{subscribeReadings}}) {
if ($hash->{subscribeReadings}->{$topic} eq $1) { if ($hash->{subscribeReadings}->{$topic}->{name} eq $1) {
client_unsubscribe_topic($hash,$topic); client_unsubscribe_topic($hash,$topic);
delete $hash->{subscribeReadings}->{$topic}; delete $hash->{subscribeReadings}->{$topic};
CommandDeleteReading(undef,"$hash->{NAME} $1"); CommandDeleteReading(undef,"$hash->{NAME} $1");
@ -187,17 +207,27 @@ sub Attr($$$$) {
} }
last; last;
}; };
client_attr($hash,$command,$name,$attribute,$value); return client_attr($hash,$command,$name,$attribute,$value);
} }
} }
sub onmessage($$$) { sub onmessage($$$) {
my ($hash,$topic,$message) = @_; my ($hash,$topic,$message) = @_;
if (defined (my $reading = $hash->{subscribeReadings}->{$topic})) { if (defined (my $reading = $hash->{subscribeReadings}->{$topic}->{name})) {
Log3($hash->{NAME},5,"calling readingsSingleUpdate($hash->{NAME},$reading,$message,1"); my $do=1;
if(defined (my $cmd = $hash->{subscribeReadings}->{$topic}->{cmd})) {
Log3($hash->{NAME},5,"evaluating cmd: $cmd");
my $name = $hash->{NAME};
$do=eval($cmd);
Log3($hash->{NAME},1,"ERROR evaluating $cmd: $@") if($@);
$do=1 if (!defined($do));
}
if($do) {
Log3($hash->{NAME},5,"calling readingsSingleUpdate($hash->{NAME},$reading,$message,1)");
readingsSingleUpdate($hash,$reading,$message,1); readingsSingleUpdate($hash,$reading,$message,1);
}
} elsif ($topic =~ $hash->{'.autoSubscribeExpr'}) { } elsif ($topic =~ $hash->{'.autoSubscribeExpr'}) {
Log3($hash->{NAME},5,"calling readingsSingleUpdate($hash->{NAME},$1,$message,1"); Log3($hash->{NAME},5,"calling readingsSingleUpdate($hash->{NAME},$1,$message,1)");
CommandAttr(undef,"$hash->{NAME} subscribeReading_$1 $topic"); CommandAttr(undef,"$hash->{NAME} subscribeReading_$1 $topic");
readingsSingleUpdate($hash,$1,$message,1); readingsSingleUpdate($hash,$1,$message,1);
} }
@ -230,19 +260,22 @@ sub onmessage($$$) {
sets reading 'state' and publishes the command to topic configured via attr publishSet</p> sets reading 'state' and publishes the command to topic configured via attr publishSet</p>
</li> </li>
<li> <li>
<p><code>set &lt;name&gt; &lt;h;reading&gt; &lt;value&gt;</code><br/> <p><code>set &lt;name&gt; &lt;reading&gt; &lt;value&gt;</code><br/>
sets reading &lt;h;reading&gt; and publishes the command to topic configured via attr publishSet_&lt;h;reading&gt;</p> sets reading &lt;reading&gt; and publishes the command to topic configured via attr publishSet_&lt;reading&gt;</p>
</li> </li>
</ul> </ul>
<a name="MQTT_DEVICEattr"></a> <a name="MQTT_DEVICEattr"></a>
<p><b>Attributes</b></p> <p><b>Attributes</b></p>
<ul> <ul>
<li> <li>
<p><code>attr &lt;name&gt; publishSet [&lt;commands&gt;] &lt;topic&gt;</code><br/> <p><code>attr &lt;name&gt; publishSet [[&lt;reading&gt;:]&lt;commands_or_options&gt;] &lt;topic&gt;</code><br/>
configures set commands that may be used to both set reading 'state' and publish to configured topic</p> configures set commands and UI-options e.g. 'slider' that may be used to both set given reading ('state' if not defined) and publish to configured topic</p>
<p>example:<br/>
<code>attr mqttest publishSet on off switch:on,off level:slider,0,1,100 /topic/123</code>
</p>
</li> </li>
<li> <li>
<p><code>attr &lt;name&gt; publishSet_&lt;reading&gt; [&lt;values&gt;] &lt;topic&gt;</code><br/> <p><code>attr &lt;name&gt; publishSet_&lt;reading&gt; [&lt;values&gt;]* &lt;topic&gt;</code><br/>
configures reading that may be used to both set 'reading' (to optionally configured values) and publish to configured topic</p> configures reading that may be used to both set 'reading' (to optionally configured values) and publish to configured topic</p>
</li> </li>
<li> <li>
@ -251,8 +284,34 @@ sub onmessage($$$) {
e.g a message received with topic 'myhouse/kitchen/temperature' would create and update a reading 'temperature'</p> e.g a message received with topic 'myhouse/kitchen/temperature' would create and update a reading 'temperature'</p>
</li> </li>
<li> <li>
<p><code>attr &lt;name&gt; subscribeReading_&lt;reading&gt; &lt;topic&gt;</code><br/> <p><code>attr &lt;name&gt; subscribeReading_&lt;reading&gt; [{Perl-expression}] [qos:?] [retain:?] &lt;topic&gt;</code><br/>
mapps a reading to a specific topic. The reading is updated whenever a message to the configured topic arrives</p> mapps a reading to a specific topic. The reading is updated whenever a message to the configured topic arrives.<br/>
QOS and ratain can be optionally defined for this topic. <br/>
Furthermore, a Perl statement can be provided which is executed when the message is received. The following variables are available for the expression: $hash, $name, $topic, $message. Return value decides whether reading is set (true (e.g., 1) or undef) or discarded (false (e.g., 0)).
</p>
<p>Example:<br/>
<code>attr mqttest subscribeReading_cmd {fhem("set something off")} /topic/cmd</code>
</p>
</li>
<li>
<p><code>attr &lt;name&gt; retain &lt;flags&gt; ...</code><br/>
Specifies the retain flag for all or specific readings. Possible values are 0, 1</p>
<p>Examples:<br/>
<code>attr mqttest retain 0</code><br/>
defines retain 0 for all readings/topics (due to downward compatibility)<br>
<code> retain *:0 1 test:1</code><br/>
defines retain 0 for all readings/topics except the reading 'test'. Retain for 'test' is 1<br>
</p>
</li>
<li>
<p><code>attr &lt;name&gt; qos &lt;flags&gt; ...</code><br/>
Specifies the QOS flag for all or specific readings. Possible values are 0, 1 or 2. Constants may be also used: at-most-once = 0, at-least-once = 1, exactly-once = 2</p>
<p>Examples:<br/>
<code>attr mqttest qos 0</code><br/>
defines QOS 0 for all readings/topics (due to downward compatibility)<br>
<code> retain *:0 1 test:1</code><br/>
defines QOS 0 for all readings/topics except the reading 'test'. Retain for 'test' is 1<br>
</p>
</li> </li>
</ul> </ul>
</ul> </ul>