diff --git a/fhem/FHEM/10_MQTT_GENERIC_BRIDGE.pm b/fhem/FHEM/10_MQTT_GENERIC_BRIDGE.pm index 199648598..d61022347 100644 --- a/fhem/FHEM/10_MQTT_GENERIC_BRIDGE.pm +++ b/fhem/FHEM/10_MQTT_GENERIC_BRIDGE.pm @@ -31,11 +31,38 @@ # CHANGE LOG # # +# 22.09.2018 0.9.8 +# improved : Topics-Ausgabe (devinfo, subscribe) alphabetisch sortiert +# improved : Trennung der zu ueberwachenden Geraete ist jetzt mit Kommas, +# Leezeichen, Kommas mit Leerzeichen +# oder einer Mischung davon moeglich +# feature : Unterstuetzung (fast) beliebiger variablen +# in defaults / global defaults (subscribe). +# Verwendung wie bei $base. +# z.B. hugo wird zum $hugo in Topics/Expression +# Variablen in Dev koennen Variablen aus global verwenden +# feature : Neue Einstellung: resendOnConnect fuer mgttPublish +# Bewirkt, dass wenn keine mqtt-Connection besteht, +# die zu sendende Nachrichten in einer Warteschlange gespeichet +# werden. Wird die Verbindung aufgebaut, +# werden die Nachrichten verschickt. +# Moegliche Optionen: none - alle verwerfen, +# last - immer nur die letzte Nachricht speichern, +# first - immer nur die erste Nachricht speichern, +# danach folgende verwerfen, all - alle speichern, +# allerdings existiert eine Obergrenze von 100, wird es mehr, +# werden aelteste ueberzaelige Nachrichten verworfen. +# (Beispiel: *:resendOnConnect=last) +# +# 21.09.2019 0.9.7 +# fix : Anlegen / Loeschen von userAttr +# change : Vorbereitungen fuer resendOnConnect +# # 16.09.2018 0.9.7 # fix : defaults in globalPublish nicht verfuegbar # fix : atopic in devinfo korrekt anzeigen # improved : Anzeige devinfo -# change : Umstellung auf delFromDevAttrList (zum ENtfernen von UserArrt) +# change : Umstellung auf delFromDevAttrList (zum Entfernen von UserArrt) # change : Methoden-Doku # # 15.09.2018 0.9.7 @@ -46,10 +73,11 @@ # muss nicht mehr zwingend topic definiert werden # (dafuer muss dann ggf. Expression sorgen) # improved : Formatierung Ausgabe devinfo -# feature : Unterstuetzung beliegiger (fast) variable -# in defaults / global defaults. -# Verwendung wie auch bei $base. +# feature : Unterstuetzung (fast) beliebiger variablen +# in defaults / global defaults (publish). +# Verwendung wie bei $base. # z.B. hugo wird zum $hugo in Topics/Expression +# Variablen in Dev koennen Variablen aus global verwenden # minor fixes # # xx.08.2018 0.9.6 @@ -70,10 +98,10 @@ # Ideen: # # [i like it] -# - global base (sollte in $base verwendet werden koennen) +# done - global base (sollte in $base verwendet werden koennen) # - resend / interval, Warteschlange -# - resendOnConnect (no, first, last, all) -# - resendInterval (no/0, x min) +# done - resendOnConnect (no, first, last, all) +# -- autoResend Interval (no/0, x min) # # [maybe] # - QOS for subscribe (fertig?), defaults(qos, fertig?), alias mapping @@ -81,10 +109,12 @@ # - global excludes # - Support for MQTT2_SERVER # - commands per mqtt fuer die Bridge: Liste der Geraete, Infos dazu etc. +# - mqttOptions (zuschaltbare optionen im Form eines Perl-Routine) (json, ttl) # # [I don't like it] # - templates (e.g. 'define template' in der Bridge, 'mqttUseTemplate' in Devices) # - autocreate +# - a reading for last send / receive mesages # # [just no] # @@ -113,7 +143,7 @@ use warnings; #my $DEBUG = 1; my $cvsid = '$Id$'; -my $VERSION = "version 0.9.7 by hexenmeister\n$cvsid"; +my $VERSION = "version 0.9.8 by hexenmeister\n$cvsid"; my %sets = ( ); @@ -199,7 +229,6 @@ BEGIN { MQTT->import(qw(:all)); GP_Import(qw( - AttrVal CommandAttr readingsSingleUpdate readingsBeginUpdate @@ -234,7 +263,7 @@ BEGIN { }; use constant { - HELPER => ".helper", + HELPER => ".helper", IO_DEV_TYPE => "IO_DEV_TYPE", HS_TAB_NAME_DEVICES => "devices", @@ -252,6 +281,9 @@ use constant { HS_PROP_NAME_PREFIX => "prefix", HS_PROP_NAME_DEVSPEC => "devspec", + HS_PROP_NAME_PUB_OFFLINE_QUEUE => ".pub_queue", + HS_PROP_NAME_PUB_OFFLINE_QUEUE_MAX_CNT_PROTOPIC => ".pub_queue_max_cnt_pro_topic", + HS_PROP_NAME_GLOBAL_EXCLUDES_TYPE => "globalTypeExcludes", HS_PROP_NAME_GLOBAL_EXCLUDES_READING => "globalReadingExcludes", HS_PROP_NAME_GLOBAL_EXCLUDES_DEVICES => "globalDeviceExcludes", @@ -306,8 +338,15 @@ sub startsWith($$) { # Device define sub Define() { my ($hash, $def) = @_; - # Definition :=> defmod mqttGeneric MQTT_GENERIC_BRIDGE [prefix] [devspec] - my($name, $type, $prefix, $devspec) = split("[ \t][ \t]*", $def); + # Definition :=> defmod mqttGeneric MQTT_GENERIC_BRIDGE [prefix] [devspec,[devspec]] + my($name, $type, $prefix, @devspeca) = split("[ \t][ \t]*", $def); + # restlichen Parameter nach Leerzeichen trennen + # aus dem Array einen kommagetrennten String erstellen + my $devspec = join(",", @devspeca); + # Doppelte Kommas entfernen. + $devspec =~s/,+/,/g; + # damit ist jetzt Trennung der zu ueberwachenden Geraete mit Kommas, Leezeichen, Kommas mit Leerzeichen und Mischung davon moeglich + removeOldUserAttr($hash) if defined $hash->{+HS_PROP_NAME_PREFIX}; $prefix="mqtt" unless defined $prefix; # default prefix is 'mqtt' @@ -315,6 +354,8 @@ sub Define() { $hash->{+HELPER}->{+HS_PROP_NAME_PREFIX_OLD}=$hash->{+HS_PROP_NAME_PREFIX}; $hash->{+HS_PROP_NAME_PREFIX}=$prefix; # store in device hash + # wenn Leer oder nicht definiert => devspec auf '.*' setzen + $devspec = undef if $devspec eq ''; $hash->{+HS_PROP_NAME_DEVSPEC} = defined($devspec)?$devspec:".*"; Log3($hash->{NAME},5,"MQTT-GB:DEBUG:> [$hash->{NAME}] Define: params: $name, $type, $hash->{+HS_PROP_NAME_PREFIX}, $hash->{+HS_PROP_NAME_DEVSPEC}"); @@ -333,6 +374,9 @@ sub Define() { #TODO: aktivieren, wenn gebraucht wird $hash->{+HELPER}->{+HS_PROP_NAME_INTERVAL} = 60; # Sekunden + # Max messages count pro topic for offline queue + $hash->{+HS_PROP_NAME_PUB_OFFLINE_QUEUE_MAX_CNT_PROTOPIC} = 100; + readingsBeginUpdate($hash); readingsBulkUpdate($hash,"incoming-count",$hash->{+HELPER}->{+HS_PROP_NAME_INCOMING_CNT}); readingsBulkUpdate($hash,"outgoing-count",$hash->{+HELPER}->{+HS_PROP_NAME_OUTGOING_CNT}); @@ -405,6 +449,7 @@ sub initUserAttr($) { } my @devices = devspec2array($devspec); Log3($hash->{NAME},5,"MQTT-GB:DEBUG:> [$hash->{NAME}] initUserAttr: addToDevAttrList: $prefix"); + #Log3('xxx',1,"MQTT-GB:DEBUG:> initUserAttr> devspec: '$devspec', array: ".Dumper(@devices)); foreach my $dev (@devices) { addToDevAttrList($dev, $prefix.CTRL_ATTR_NAME_DEFAULTS.":textField-long"); addToDevAttrList($dev, $prefix.CTRL_ATTR_NAME_ALIAS.":textField-long"); @@ -492,7 +537,7 @@ sub updateDevCount($) { sub removeOldUserAttr($;$$) { my ($hash, $prefix, $devspec) = @_; $prefix = $hash->{+HS_PROP_NAME_PREFIX} unless defined $prefix; - # Pruefen, on ein weiteres Device (MQTT_GENERIC_BRIDGE) mit dem selben Prefic existiert (waere zwar Quatsch, aber dennoch) + # Pruefen, on ein weiteres Device (MQTT_GENERIC_BRIDGE) mit dem selben Prefix existiert (waere zwar Quatsch, aber dennoch) my @bridges = devspec2array("TYPE=MQTT_GENERIC_BRIDGE"); my $name = $hash->{NAME}; foreach my $dev (@bridges) { @@ -505,13 +550,16 @@ sub removeOldUserAttr($;$$) { $devspec = 'global' if ($devspec eq '.*'); # kann spaeter auch delFromDevAttrList Methode genutzt werden my @devices = devspec2array($devspec); + + #Log3('xxx',1,"MQTT-GB:DEBUG:> removeOldUserAttr> devspec: $devspec, array: ".Dumper(@devices)); foreach my $dev (@devices) { # neue Methode im fhem.pl - delFromDevAttrList($dev,$prefix.CTRL_ATTR_NAME_DEFAULTS); - delFromDevAttrList($dev,$prefix.CTRL_ATTR_NAME_ALIAS); - delFromDevAttrList($dev,$prefix.CTRL_ATTR_NAME_PUBLISH); - delFromDevAttrList($dev,$prefix.CTRL_ATTR_NAME_SUBSCRIBE); - delFromDevAttrList($dev,$prefix.CTRL_ATTR_NAME_IGNORE); + #Log3('xxx',1,"MQTT-GB:DEBUG:> removeOldUserAttr> delete: from $dev ".$prefix.CTRL_ATTR_NAME_DEFAULTS); + delFromDevAttrList($dev,$prefix.CTRL_ATTR_NAME_DEFAULTS.":textField-long"); + delFromDevAttrList($dev,$prefix.CTRL_ATTR_NAME_ALIAS.":textField-long"); + delFromDevAttrList($dev,$prefix.CTRL_ATTR_NAME_PUBLISH.":textField-long"); + delFromDevAttrList($dev,$prefix.CTRL_ATTR_NAME_SUBSCRIBE.":textField-long"); + delFromDevAttrList($dev,$prefix.CTRL_ATTR_NAME_IGNORE.":both,incoming,outgoing"); # my $ua = $main::attr{$dev}{userattr}; # if (defined $ua) { # my %h = map { ($_ => 1) } split(" ", "$ua"); @@ -656,7 +704,8 @@ sub CreateSingleDeviceTableAttrPublish($$$$) { if(($ident eq 'topic') or ($ident eq 'readings-topic') or ($ident eq 'atopic') or ($ident eq 'attr-topic') or ($ident eq 'qos') or ($ident eq 'retain') or - ($ident eq 'expression')) { + ($ident eq 'expression') or + ($ident eq 'resendOnConnect')) { my @nameParts = split(/\|/, $name); while (@nameParts) { my $namePart = shift(@nameParts); @@ -737,6 +786,15 @@ sub getDevicePublishRecIntern($$$$$) { # wenn kein topic und keine expression definiert sind, kann auch nicht gesendet werden, es muss nichts mehr ausgewertet werden return unless (defined($topic) or defined($atopic) or defined( $expression)); + # resendOnConnect Option + my $resendOnConnect = undef; + $resendOnConnect = $readingMap->{'resendOnConnect'} if defined $readingMap; + $resendOnConnect = $defaultReadingMap->{'resendOnConnect'} if (defined($defaultReadingMap) and !defined($resendOnConnect)); + # global + $resendOnConnect = $globalReadingMap->{'resendOnConnect'} if (defined($globalReadingMap) and !defined($resendOnConnect)); + $resendOnConnect = $globalDefaultReadingMap->{'resendOnConnect'} if (defined($globalDefaultReadingMap) and !defined($resendOnConnect)); + + # map name my $name = undef; if (defined($devMap) and defined($devMap->{':alias'})) { $name = $devMap->{':alias'}->{'pub:'.$reading}; @@ -746,8 +804,10 @@ sub getDevicePublishRecIntern($$$$$) { } $name = $reading unless defined $name; + # get mode my $mode = $readingMap->{'mode'}; + # compute defaults my $combined = computeDefaults($hash, 'pub:', $globalMap, $devMap, {'device'=>$dev,'reading'=>$reading,'name'=>$name,'mode'=>$mode}); # $topic evaluieren (avialable vars: $device (device name), $reading (oringinal name), $name ($reading oder alias, if defined), defaults) if(defined($topic) and ($topic =~ m/^{.*}$/)) { @@ -757,7 +817,9 @@ sub getDevicePublishRecIntern($$$$$) { $atopic = _evalValue2($hash->{NAME},$atopic,{'topic'=>$atopic,'device'=>$dev,'reading'=>$reading,'name'=>$name,%$combined}) if defined $atopic; } - return {'topic'=>$topic,'atopic'=>$atopic,'qos'=>$qos,'retain'=>$retain,'expression'=>$expression,'name'=>$name,'mode'=>$mode,'.defaultMap'=>$combined}; + return {'topic'=>$topic,'atopic'=>$atopic,'qos'=>$qos,'retain'=>$retain, + 'expression'=>$expression,'name'=>$name,'mode'=>$mode, + 'resendOnConnect'=>$resendOnConnect,'.defaultMap'=>$combined}; } # sucht Qos, Retain, Expression Werte unter Beruecksichtigung von Defaults und Globals @@ -888,23 +950,23 @@ sub _evalValue2($$;$$) { # Alte Methode, verwendet noch fixe Variable (base, dev, reading, name), kein Map # soll durch _evalValue2 ersetzt werden -sub _evalValue($$;$$$$) { - my($mod, $str, $base, $device, $reading, $name) = @_; - #Log3('xxx',1,"MQTT-GB:DEBUG:> eval: (str, base, dev, reading, name) $str, $base, $device, $reading, $name"); - my$ret = $str; - #$base="" unless defined $base; - if($str =~ m/^{.*}$/) { - no strict "refs"; - local $@; - #Log3('xxx',1,"MQTT-GB:DEBUG:> eval !!!"); - $ret = eval($str); - #Log3('xxx',1,"MQTT-GB:DEBUG:> eval done: $ret"); - if ($@) { - Log3($mod,2,"user value ('".$str."'') eval error: ".$@); - } - } - return $ret; -} +# sub _evalValue($$;$$$$) { +# my($mod, $str, $base, $device, $reading, $name) = @_; +# #Log3('xxx',1,"MQTT-GB:DEBUG:> eval: (str, base, dev, reading, name) $str, $base, $device, $reading, $name"); +# my$ret = $str; +# #$base="" unless defined $base; +# if($str =~ m/^{.*}$/) { +# no strict "refs"; +# local $@; +# #Log3('xxx',1,"MQTT-GB:DEBUG:> eval !!!"); +# $ret = eval($str); +# #Log3('xxx',1,"MQTT-GB:DEBUG:> eval done: $ret"); +# if ($@) { +# Log3($mod,2,"user value ('".$str."'') eval error: ".$@); +# } +# } +# return $ret; +# } # sucht zu dem gegebenen (ankommenden) topic das entsprechende device und reading # Params: $hash, $topic (empfangene topic) @@ -1016,25 +1078,40 @@ sub CreateSingleDeviceTableAttrSubscribe($$$$) { if(($ident eq 'topic') or ($ident eq 'readings-topic') or ($ident eq 'stopic') or ($ident eq 'set-topic') or ($ident eq 'atopic') or ($ident eq 'attr-topic')) { # -> topic - my $base=undef; - if (defined($devMap->{':defaults'})) { - $base = $devMap->{':defaults'}->{'sub:base'}; - } - if (defined($globalMap) and defined($globalMap->{':defaults'}) and !defined($base)) { - $base = $globalMap->{':defaults'}->{'sub:base'}; - } - $base='' unless defined $base; - $base = _evalValue($hash->{NAME},$base,$base,$dev,'$reading','$name'); - $rmap->{'mode'} = 'R'; $rmap->{'mode'} = 'S' if (($ident eq 'stopic') or ($ident eq 'set-topic')); $rmap->{'mode'} = 'A' if (($ident eq 'atopic') or ($ident eq 'attr-topic')); - # $base verwenden => eval - my $topic = _evalValue($hash->{NAME},$val,$base,$dev,'$reading','$name'); - $rmap->{'topicOrig'} = $val; + # my $base=undef; + # if (defined($devMap->{':defaults'})) { + # $base = $devMap->{':defaults'}->{'sub:base'}; + # } + # if (defined($globalMap) and defined($globalMap->{':defaults'}) and !defined($base)) { + # $base = $globalMap->{':defaults'}->{'sub:base'}; + # } + # $base='' unless defined $base; + # $base = _evalValue($hash->{NAME},$base,$base,$dev,'$reading','$name'); + + #Log3($hash->{NAME},1,"MQTT-GB:DEBUG:> sub: old base: $base"); + + # $base verwenden => eval + #my $topic = _evalValue($hash->{NAME},$val,$base,$dev,'$reading','$name'); + + my $combined = computeDefaults($hash, 'sub:', $globalMap, $devMap, {'device'=>$dev,'reading'=>'#reading','name'=>'#name','mode'=>$rmap->{'mode'}}); + #Log3($hash->{NAME},1,"MQTT-GB:DEBUG:> sub: Defaults: ".Dumper($combined)); + my $topic = _evalValue2($hash->{NAME},$val,{'device'=>$dev,'reading'=>'#reading','name'=>'#name',%$combined}) if defined $val; + my $old = '#reading'; + my $new = '$reading'; + $topic =~ s/\Q$old\E/$new/g; + $old = '#name'; + $new = '$name'; + $topic =~ s/\Q$old\E/$new/g; + #Log3($hash->{NAME},1,"MQTT-GB:DEBUG:> sub: Topic old: $topic"); + #Log3($hash->{NAME},1,"MQTT-GB:DEBUG:> sub: Topic new: $topic"); + + $rmap->{'topicOrig'} = $val; $rmap->{'topicExp'}=createRegexpForTopic($topic); $topic =~ s/\$reading/+/g; @@ -1389,42 +1466,25 @@ sub Get($$$@) { } } $res.=" subscribe:\n"; + my @resa; foreach my $subRec (@{$hash->{+HS_TAB_NAME_DEVICES}->{$dname}->{':subscribe'}}) { my $qos = $subRec->{'qos'}; my $mode = $subRec->{'mode'}; my $expression = $subRec->{'expression'}; my $topic = $subRec->{'topic'}; $topic = '---' unless defined $topic; - $res.= sprintf(' %-16s <= %s', $subRec->{'reading'}, $topic); - $res.= " (mode: $mode"; - $res.= "; qos: $qos" if defined ($qos); - $res.= ")\n"; - $res.= " exp: $expression\n" if defined ($expression); - # TODO - # my $qos = $subRec->{'qos'}; - # my $mode = $subRec->{'mode'}; - # my $expression = $subRec->{'expression'}; - # my $topic = "---"; - # if($mode eq 'R') { - # $topic = $subRec->{'topic'}; - # } elsif($mode eq 'S') { - # $topic = $subRec->{'stopic'}; - # } elsif($mode eq 'A') { - # $topic = $subRec->{'atopic'}; - # } else { - # $topic = '!unexpected mode!'; - # } - # $topic = '---' unless defined $topic; - # $res.= sprintf(' %-16s <= %s', $subRec->{'reading'}, $topic); - # $res.= " (mode: $mode"; - # $res.= "; qos: $qos" if defined ($qos); - # $res.= ")\n"; - # $res.= " exp: $expression\n" if defined ($expression); - # # TODO + my $rest.= sprintf(' %-16s <= %s', $subRec->{'reading'}, $topic); + $rest.= " (mode: $mode"; + $rest.= "; qos: $qos" if defined ($qos); + $rest.= ")\n"; + $rest.= " exp: $expression\n" if defined ($expression); + push (@resa, $rest); } + $res.=join('', sort @resa); } $res.= "\n"; } + # TODO : Weitere Dev Infos? $res = "no devices found" unless ($res ne ""); return $res; #last; @@ -1472,7 +1532,7 @@ sub Notify() { RefreshDeviceTable($hash, $sdev, $attrName, $val); } else { #Log3($hash->{NAME},1,"MQTT-GB:DEBUG:> attr created/changed: non observed attr"); - # TODO: check/ publish atopic => val + # check/ publish atopic => val publishDeviceUpdate($hash, $defs{$sdev}, 'A', $attrName, $val); } } elsif($s =~ m/^DELETEATTR ([^ ]*) ([^ ]*)$/) { @@ -1483,7 +1543,7 @@ sub Notify() { if(IsObservedAttribute($hash,$attrName)) { RefreshDeviceTable($hash, $sdev, $attrName, undef); } else { - # TODO: check/ publish atopic => null + # check/ publish atopic => null publishDeviceUpdate($hash, $defs{$sdev}, 'A', $attrName, undef); } } @@ -1638,8 +1698,54 @@ sub isTypeDevReadingExcluded($$$$) { # MQTT-Nachricht senden # Params: Bridge-Hash, Topic, Nachricht, QOS- und Retain-Flags -sub doPublish($$$$$) { - my ($hash,$topic,$message,$qos,$retain) = @_; +sub doPublish($$$$$$) { + my ($hash,$topic,$message,$qos,$retain,$resendOnConnect) = @_; + + #Log3($hash->{NAME},1,"MQTT_GENERIC_BRIDGE doPublish: topic: $topic, msg: $message, resend mode: ".(defined($resendOnConnect)?$resendOnConnect:"no")); + if(!isConnected($hash)) { + # store message? + if(defined($resendOnConnect)) { + $resendOnConnect = lc($resendOnConnect); + Log3($hash->{NAME},5,"MQTT_GENERIC_BRIDGE offline publish: store: topic: $topic, msg: $message, mode: $resendOnConnect"); + if($resendOnConnect eq 'first' or $resendOnConnect eq 'last' or $resendOnConnect eq 'all') { + # store msg data + my $queue = $hash->{+HELPER}->{+HS_PROP_NAME_PUB_OFFLINE_QUEUE}; + #my $queue = $hash->{+HS_PROP_NAME_PUB_OFFLINE_QUEUE}; + $queue = {} unless defined $queue; + + my $entry = {'topic'=>$topic, 'message'=>$message, 'qos'=>$qos, 'retain'=>$retain, 'resendOnConnect'=>$resendOnConnect}; + my $topicQueue = $queue->{$topic}; + unless (defined($topicQueue)) { + $topicQueue = [$entry]; + } + else { + if ($resendOnConnect eq 'first') { + if (scalar @$topicQueue == 0) { + $topicQueue = [$entry]; + } + } elsif($resendOnConnect eq 'last') { + $topicQueue = [$entry]; + } else { # all + push (@$topicQueue, $entry); + } + } + # check max lng + my $max = $hash->{+HS_PROP_NAME_PUB_OFFLINE_QUEUE_MAX_CNT_PROTOPIC}; + $max = 10 unless defined $max; + while (scalar @$topicQueue > $max) { + shift @$topicQueue; + } + + $queue->{$topic} = $topicQueue; + #Log3($hash->{NAME},1,"MQTT_GENERIC_BRIDGE offline publish: stored: ".Dumper($queue)); + + $hash->{+HELPER}->{+HS_PROP_NAME_PUB_OFFLINE_QUEUE} = $queue; + } + } + return; + } + + Log3($hash->{NAME},5,"MQTT_GENERIC_BRIDGE publish: $topic => $message (qos: $qos, retain: ".(defined($retain)?$retain:'0').")"); if (isIODevMQTT2($hash)){ #if ($hash->{+HELPER}->{+IO_DEV_TYPE} eq 'MQTT2_SERVER') { # TODO: publish MQTT2 @@ -1695,6 +1801,7 @@ sub publishDeviceUpdate($$$$$) { my $retain = $pubRec->{'retain'}; my $expression = $pubRec->{'expression'}; my $base = $pubRec->{'base'}; + my $resendOnConnect = $pubRec->{'resendOnConnect'}; $base='' unless defined $base; $value="\0" unless defined $value; @@ -1732,10 +1839,10 @@ sub publishDeviceUpdate($$$$$) { if(defined($redefMap)) { foreach my $key (keys %{$redefMap}) { my $val = $redefMap->{$key}; - doPublish($hash,$key,$val,$qos,$retain); + doPublish($hash,$key,$val,$qos,$retain,$resendOnConnect); } } else { - doPublish($hash,$topic,$message,$qos,$retain) if defined $topic; + doPublish($hash,$topic,$message,$qos,$retain,$resendOnConnect) if defined $topic; } } @@ -1846,7 +1953,22 @@ sub ioDevConnect($) { #Log3($hash->{NAME},1,"MQTT_GENERIC_BRIDGE DEBUG: ioDevConnect"); - # TODO + # resend stored msgs => doPublish (...., undef) + my $queue = $hash->{+HELPER}->{+HS_PROP_NAME_PUB_OFFLINE_QUEUE}; + if (defined($queue)) { + foreach my $topic (keys %{$queue}) { + my $topicQueue = $queue->{$topic}; + my $topicRec = undef; + while ($topicRec = shift(@$topicQueue)) { + my $message = $topicRec->{'message'}; + my $qos = $topicRec->{'qos'}; + my $retain = $topicRec->{'retain'}; + my $resendOnConnect = undef; #$topicRec->{'resendOnConnect'}; + doPublish($hash,$topic,$message,$qos,$retain,$resendOnConnect); + } + } + } + } # CallBack-Handler fuer IODev beim Disconnect @@ -1964,7 +2086,7 @@ sub onmessage($$$) {
- This module is an MQTT bridge, which simultaneously collects data from several FHEM devices
+ This module is a MQTT bridge, which simultaneously collects data from several FHEM devices
and passes their readings via MQTT or set readings from the incoming MQTT messages or executes them
as a 'set' command on the configured FHEM device.
An MQTT device is needed as IODev.
@@ -1974,7 +2096,7 @@ sub onmessage($$$) {
Definition:
In the simplest case, two lines are enough:
-All parameters in the define are optional. The first parameter is a prefix for the control attributes on which the devices to be
@@ -1983,6 +2105,7 @@ sub onmessage($$$) {
The second parameter ('devspec') allows to minimize the number of devices to be monitored
(otherwise all devices will be monitored, which may cost performance).
+ Example for devspec: 'TYPE=dummy' or 'dummy1,dummy2'. With comma separated list no spaces must be used!
see devspecdefmod mqttGeneric MQTT_GENERIC_BRIDGE [prefix] [devspec]
+
defmod mqttGeneric MQTT_GENERIC_BRIDGE [prefix] [devspec,[devspec]
attr mqttGeneric IODev
Option 'resendOnConnect' allows to save the messages, + if the bridge is not connected to the MQTT server. + The messages to be sent are stored in a queue. + When the connection is established, the messages are sent in the original order. +
Examples:
attr <dev> mqttPublish temperature:topic={"$base/$name"} temperature:qos=1 temperature:retain=0 *:topic={"$base/$name"} humidity:topic=/TEST/Feuchte
attr <dev> mqttPublish temperature|humidity:topic={"$base/$name"} temperature|humidity:qos=1 temperature|humidity:retain=0
attr <dev> mqttPublish *:topic={"$base/$name"} *:qos=2 *:retain=0
attr <dev> mqttPublish *:topic={"$base/$name"} reading:expression={"message: $value"}
attr <dev> mqttPublish *:topic={"$base/$name"} reading:expression={$value="message: $value"}
- attr <dev> mqttPublish *:topic={"$base/$name"} reading:expression={"/TEST/Topic1"=>"$message", "/TEST/Topic2"=>"message: $message"}
Definition:
Im einfachsten Fall reichen schon zwei Zeilen:
-Alle Parameter im Define sind optional. Der erste ist ein Prefix fuer die Steuerattribute, worueber die zu ueberwachende Geraete (s.u.)
@@ -2336,6 +2473,7 @@ sub onmessage($$$) {
Wird dieser z.B. als 'hugo' redefiniert, heissen die Steuerungsattribute entsprechend hugoPublish etc. Der zweite Parameter ('devspec') erlaubt die Menge der zu ueberwachenden Geraeten
zu begrenzen (sonst werden einfach alle ueberwacht, was jedoch Performance kosten kann).
+ Beispiel fuer devspec: 'TYPE=dummy' oder 'dummy1,dummy2'. Bei kommaseparierten Liste duerfen keine Leerzeichen verwendet werden!
s.a. devspecdefmod mqttGeneric MQTT_GENERIC_BRIDGE [prefix] [devspec]
+
defmod mqttGeneric MQTT_GENERIC_BRIDGE [prefix] [devspec,[devspec]]
attr mqttGeneric IODev
Option 'resendOnConnect' erlaubt eine Speicherung der Nachrichten, + wenn keine Verbindung zu dem MQTT-Server besteht. + Die zu sendende Nachrichten in einer Warteschlange gespeichet. + Wird die Verbindung aufgebaut, werden die Nachrichten in der ursprungichen Reihenfolge verschickt. +
Beispiele:
attr <dev> mqttPublish temperature:topic={"$base/$name"} temperature:qos=1 temperature:retain=0 *:topic={"$base/$name"} humidity:topic=/TEST/Feuchte
attr <dev> mqttPublish temperature|humidity:topic={"$base/$name"} temperature|humidity:qos=1 temperature|humidity:retain=0
attr <dev> mqttPublish *:topic={"$base/$name"} *:qos=2 *:retain=0
attr <dev> mqttPublish *:topic={"$base/$name"} reading:expression={"message: $value"}
attr <dev> mqttPublish *:topic={"$base/$name"} reading:expression={$value="message: $value"}
- attr <dev> mqttPublish *:topic={"$base/$name"} reading:expression={"/TEST/Topic1"=>"$message", "/TEST/Topic2"=>"message: $message"}