diff --git a/fhem/FHEM/00_MQTT.pm b/fhem/FHEM/00_MQTT.pm index 904abc5c9..86789cd84 100644 --- a/fhem/FHEM/00_MQTT.pm +++ b/fhem/FHEM/00_MQTT.pm @@ -61,7 +61,7 @@ package MQTT; use Exporter ('import'); @EXPORT = (); -@EXPORT_OK = qw(send_publish send_subscribe send_unsubscribe client_attr); +@EXPORT_OK = qw(send_publish send_subscribe send_unsubscribe client_attr client_subscribe_topic client_unsubscribe_topic topic_to_regexp); %EXPORT_TAGS = (all => [@EXPORT_OK]); use strict; @@ -195,7 +195,7 @@ sub Read { GP_ForallClients($hash,sub { my $client = shift; Log3($client->{NAME},5,"publish received for $topic, ".$mqtt->message()); - if (grep { $_ eq $topic } @{$client->{subscribe}}) { + if (grep { $topic =~ $_ } @{$client->{subscribeExpr}}) { readingsSingleUpdate($client,"transmission-state","publish received",1); if ($client->{TYPE} eq "MQTT_DEVICE") { MQTT::DEVICE::onmessage($client,$topic,$mqtt->message()); @@ -327,12 +327,55 @@ sub send_message($$$@) { return $msgid; }; +sub topic_to_regexp($) { + my $t = shift; + $t =~ s|#$|.\*|; + $t =~ s|\/\.\*$|.\*|; + $t =~ s|\/|\\\/|g; + $t =~ s|(\+)([^+]*$)|(+)$2|; + $t =~ s|\+|[^\/]+|g; + return "^$t\$"; +} + +sub client_subscribe_topic($$) { + my ($client,$topic) = @_; + push @{$client->{subscribe}},$topic unless grep {$_ eq $topic} @{$client->{subscribe}}; + my $expr = topic_to_regexp($topic); + push @{$client->{subscribeExpr}},$expr unless grep {$_ eq $expr} @{$client->{subscribeExpr}}; + if ($main::init_done) { + if (my $mqtt = $client->{IODev}) {; + my $msgid = send_subscribe($mqtt, + topics => [[$topic => $client->{qos} || MQTT_QOS_AT_MOST_ONCE]], + ); + $client->{message_ids}->{$msgid}++; + readingsSingleUpdate($client,"transmission-state","subscribe sent",1) + } + } +}; + +sub client_unsubscribe_topic($$) { + my ($client,$topic) = @_; + $client->{subscribe} = [grep { $_ ne $topic } @{$client->{subscribe}}]; + my $expr = topic_to_regexp($topic); + $client->{subscribeExpr} = [grep { $_ ne $expr} @{$client->{subscribeExpr}}]; + if ($main::init_done) { + if (my $mqtt = $client->{IODev}) {; + my $msgid = send_unsubscribe($mqtt, + topics => [$topic], + ); + $client->{message_ids}->{$msgid}++; + readingsSingleUpdate($client,"transmission-state","unsubscribe sent",1) + } + } +}; + sub Client_Define($$) { my ( $client, $def ) = @_; $client->{NOTIFYDEV} = $client->{DEF} if $client->{DEF}; $client->{qos} = MQTT_QOS_AT_MOST_ONCE; $client->{subscribe} = []; + $client->{subscribeExpr} = []; if ($main::init_done) { return client_start($client); diff --git a/fhem/FHEM/10_MQTT_BRIDGE.pm b/fhem/FHEM/10_MQTT_BRIDGE.pm index 35bac9068..0de2c9eda 100644 --- a/fhem/FHEM/10_MQTT_BRIDGE.pm +++ b/fhem/FHEM/10_MQTT_BRIDGE.pm @@ -126,30 +126,17 @@ sub Attr($$$$) { ATTRIBUTE_HANDLER: { $attribute =~ /^subscribeSet(_?)(.*)/ and do { if ($command eq "set") { - $hash->{subscribeSets}->{$value} = $2; - push @{$hash->{subscribe}},$value unless grep {$_ eq $value} @{$hash->{subscribe}}; - if ($main::init_done) { - if (my $mqtt = $hash->{IODev}) {; - my $msgid = send_subscribe($mqtt, - topics => [[$value => $hash->{qos} || MQTT_QOS_AT_MOST_ONCE]], - ); - $hash->{message_ids}->{$msgid}++; - readingsSingleUpdate($hash,"transmission-state","subscribe sent",1) + unless (defined $hash->{subscribeSets}->{$value} and $hash->{subscribeSets}->{$value} eq $2) { + unless (defined $hash->{subscribeSets}->{$value}) { + client_subscribe_topic($hash,$value); } + $hash->{subscribeSets}->{$value} = $2; } } else { foreach my $topic (keys %{$hash->{subscribeSets}}) { - if ($hash->{subscribeSets}->{topic} eq $2) { + if ($hash->{subscribeSets}->{$topic} eq $2) { + client_unsubscribe_topic($hash,$topic); delete $hash->{subscribeSets}->{$topic}; - $hash->{subscribe} = [grep { $_ ne $topic } @{$hash->{subscribe}}]; - if ($main::init_done) { - if (my $mqtt = $hash->{IODev}) {; - my $msgid = send_unsubscribe($mqtt, - topics => [$topic], - ); - $hash->{message_ids}->{$msgid}++; - } - } last; } } diff --git a/fhem/FHEM/10_MQTT_DEVICE.pm b/fhem/FHEM/10_MQTT_DEVICE.pm index f55a4b671..5171483ee 100644 --- a/fhem/FHEM/10_MQTT_DEVICE.pm +++ b/fhem/FHEM/10_MQTT_DEVICE.pm @@ -49,6 +49,7 @@ sub MQTT_DEVICE_Initialize($) { "publishSet ". "publishSet_.* ". "subscribeReading_.* ". + "autoSubscribeReadings ". $main::readingFnAttributes; main::LoadModule("MQTT"); @@ -66,6 +67,8 @@ BEGIN { MQTT->import(qw(:all)); GP_Import(qw( + CommandDeleteReading + CommandAttr readingsSingleUpdate Log3 )) @@ -95,36 +98,43 @@ sub Attr($$$$) { ATTRIBUTE_HANDLER: { $attribute =~ /^subscribeReading_(.+)/ and do { if ($command eq "set") { - $hash->{subscribeReadings}->{$value} = $1; - push @{$hash->{subscribe}},$value unless grep {$_ eq $value} @{$hash->{subscribe}}; - if ($main::init_done) { - if (my $mqtt = $hash->{IODev}) {; - my $msgid = send_subscribe($mqtt, - topics => [[$value => $hash->{qos} || MQTT_QOS_AT_MOST_ONCE]], - ); - $hash->{message_ids}->{$msgid}++; - readingsSingleUpdate($hash,"transmission-state","subscribe sent",1) + unless (defined $hash->{subscribeReadings}->{$value} and $hash->{subscribeReadings}->{$value} eq $1) { + unless (defined $hash->{subscribeReadings}->{$value}) { + client_subscribe_topic($hash,$value); } + $hash->{subscribeReadings}->{$value} = $1; } } else { foreach my $topic (keys %{$hash->{subscribeReadings}}) { if ($hash->{subscribeReadings}->{$topic} eq $1) { - $hash->{subscribe} = [grep { $_ ne $topic } @{$hash->{subscribe}}]; + client_unsubscribe_topic($hash,$topic); delete $hash->{subscribeReadings}->{$topic}; - if ($main::init_done) { - if (my $mqtt = $hash->{IODev}) {; - my $msgid = send_unsubscribe($mqtt, - topics => [$topic], - ); - $hash->{message_ids}->{$msgid}++; - } - } + CommandDeleteReading(undef,"$hash->{NAME} $1"); last; } } } last; }; + $attribute eq "autoSubscribeReadings" and do { + if ($command eq "set") { + unless (defined $hash->{'.autoSubscribeTopic'} and $hash->{'.autoSubscribeTopic'} eq $value) { + if (defined $hash->{'.autoSubscribeTopic'}) { + client_unsubscribe_topic($hash,$hash->{'.autoSubscribeTopic'}); + } + $hash->{'.autoSubscribeTopic'} = $value; + $hash->{'.autoSubscribeExpr'} = topic_to_regexp($value); + client_subscribe_topic($hash,$value); + } + } else { + if (defined $hash->{'.autoSubscribeTopic'}) { + client_unsubscribe_topic($hash,$hash->{'.autoSubscribeTopic'}); + delete $hash->{'.autoSubscribeTopic'}; + delete $hash->{'.autoSubscribeExpr'}; + } + } + last; + }; $attribute =~ /^publishSet(_?)(.*)/ and do { if ($command eq "set") { my @values = split ("[ \t]+",$value); @@ -146,6 +156,7 @@ sub Attr($$$$) { delete $sets{$set}; } } else { + CommandDeleteReading(undef,"$hash->{NAME} $2"); delete $sets{$2}; } delete $hash->{publishSets}->{$2}; @@ -161,6 +172,10 @@ sub onmessage($$$) { if (defined (my $reading = $hash->{subscribeReadings}->{$topic})) { Log3($hash->{NAME},5,"calling readingsSingleUpdate($hash->{NAME},$reading,$message,1"); readingsSingleUpdate($hash,$reading,$message,1); + } elsif ($topic =~ $hash->{'.autoSubscribeExpr'}) { + Log3($hash->{NAME},5,"calling readingsSingleUpdate($hash->{NAME},$1,$message,1"); + CommandAttr(undef,"$hash->{NAME} subscribeReading_$1 $topic"); + readingsSingleUpdate($hash,$1,$message,1); } } @@ -210,6 +225,15 @@ sub onmessage($$$) { attr <name> publishSet_<reading> [<values>] <topic>
configures reading that may be used to both set 'reading' (to optionally configured values) and publish to configured topic
+
  • + attr <name> autoSubscribeReadings <topic>
    + specify a mqtt-topic pattern with wildcard (e.c. 'myhouse/kitchen/+') and MQTT_DEVICE automagically creates readings based on the wildcard-match
    + e.g a message received with topic 'myhouse/kitchen/temperature' would create and update a reading 'temperature' +
  • +
  • + attr <name> subscribeReading_<reading> <topic>
    + mapps a reading to a specific topic. The reading is updated whenever a message to the configured topic arrives
    +