From 0732623b54e7f786349447fddd06a09c28c49fd2 Mon Sep 17 00:00:00 2001
From: hexenmeister
Date: Sat, 22 Sep 2018 21:24:52 +0000
Subject: [PATCH] change: global defaults, resend on connect, div. fixes
git-svn-id: https://svn.fhem.de/fhem/trunk@17387 2b470e98-0d58-463d-a4d8-8e2adae1ed80
---
fhem/FHEM/10_MQTT_GENERIC_BRIDGE.pm | 328 ++++++++++++++++++++--------
1 file changed, 240 insertions(+), 88 deletions(-)
diff --git a/fhem/FHEM/10_MQTT_GENERIC_BRIDGE.pm b/fhem/FHEM/10_MQTT_GENERIC_BRIDGE.pm
index 199648598..d61022347 100644
--- a/fhem/FHEM/10_MQTT_GENERIC_BRIDGE.pm
+++ b/fhem/FHEM/10_MQTT_GENERIC_BRIDGE.pm
@@ -31,11 +31,38 @@
# CHANGE LOG
#
#
+# 22.09.2018 0.9.8
+# improved : Topics-Ausgabe (devinfo, subscribe) alphabetisch sortiert
+# improved : Trennung der zu ueberwachenden Geraete ist jetzt mit Kommas,
+# Leezeichen, Kommas mit Leerzeichen
+# oder einer Mischung davon moeglich
+# feature : Unterstuetzung (fast) beliebiger variablen
+# in defaults / global defaults (subscribe).
+# Verwendung wie bei $base.
+# z.B. hugo wird zum $hugo in Topics/Expression
+# Variablen in Dev koennen Variablen aus global verwenden
+# feature : Neue Einstellung: resendOnConnect fuer mgttPublish
+# Bewirkt, dass wenn keine mqtt-Connection besteht,
+# die zu sendende Nachrichten in einer Warteschlange gespeichet
+# werden. Wird die Verbindung aufgebaut,
+# werden die Nachrichten verschickt.
+# Moegliche Optionen: none - alle verwerfen,
+# last - immer nur die letzte Nachricht speichern,
+# first - immer nur die erste Nachricht speichern,
+# danach folgende verwerfen, all - alle speichern,
+# allerdings existiert eine Obergrenze von 100, wird es mehr,
+# werden aelteste ueberzaelige Nachrichten verworfen.
+# (Beispiel: *:resendOnConnect=last)
+#
+# 21.09.2019 0.9.7
+# fix : Anlegen / Loeschen von userAttr
+# change : Vorbereitungen fuer resendOnConnect
+#
# 16.09.2018 0.9.7
# fix : defaults in globalPublish nicht verfuegbar
# fix : atopic in devinfo korrekt anzeigen
# improved : Anzeige devinfo
-# change : Umstellung auf delFromDevAttrList (zum ENtfernen von UserArrt)
+# change : Umstellung auf delFromDevAttrList (zum Entfernen von UserArrt)
# change : Methoden-Doku
#
# 15.09.2018 0.9.7
@@ -46,10 +73,11 @@
# muss nicht mehr zwingend topic definiert werden
# (dafuer muss dann ggf. Expression sorgen)
# improved : Formatierung Ausgabe devinfo
-# feature : Unterstuetzung beliegiger (fast) variable
-# in defaults / global defaults.
-# Verwendung wie auch bei $base.
+# feature : Unterstuetzung (fast) beliebiger variablen
+# in defaults / global defaults (publish).
+# Verwendung wie bei $base.
# z.B. hugo wird zum $hugo in Topics/Expression
+# Variablen in Dev koennen Variablen aus global verwenden
# minor fixes
#
# xx.08.2018 0.9.6
@@ -70,10 +98,10 @@
# Ideen:
#
# [i like it]
-# - global base (sollte in $base verwendet werden koennen)
+# done - global base (sollte in $base verwendet werden koennen)
# - resend / interval, Warteschlange
-# - resendOnConnect (no, first, last, all)
-# - resendInterval (no/0, x min)
+# done - resendOnConnect (no, first, last, all)
+# -- autoResend Interval (no/0, x min)
#
# [maybe]
# - QOS for subscribe (fertig?), defaults(qos, fertig?), alias mapping
@@ -81,10 +109,12 @@
# - global excludes
# - Support for MQTT2_SERVER
# - commands per mqtt fuer die Bridge: Liste der Geraete, Infos dazu etc.
+# - mqttOptions (zuschaltbare optionen im Form eines Perl-Routine) (json, ttl)
#
# [I don't like it]
# - templates (e.g. 'define template' in der Bridge, 'mqttUseTemplate' in Devices)
# - autocreate
+# - a reading for last send / receive mesages
#
# [just no]
#
@@ -113,7 +143,7 @@ use warnings;
#my $DEBUG = 1;
my $cvsid = '$Id$';
-my $VERSION = "version 0.9.7 by hexenmeister\n$cvsid";
+my $VERSION = "version 0.9.8 by hexenmeister\n$cvsid";
my %sets = (
);
@@ -199,7 +229,6 @@ BEGIN {
MQTT->import(qw(:all));
GP_Import(qw(
- AttrVal
CommandAttr
readingsSingleUpdate
readingsBeginUpdate
@@ -234,7 +263,7 @@ BEGIN {
};
use constant {
- HELPER => ".helper",
+ HELPER => ".helper",
IO_DEV_TYPE => "IO_DEV_TYPE",
HS_TAB_NAME_DEVICES => "devices",
@@ -252,6 +281,9 @@ use constant {
HS_PROP_NAME_PREFIX => "prefix",
HS_PROP_NAME_DEVSPEC => "devspec",
+ HS_PROP_NAME_PUB_OFFLINE_QUEUE => ".pub_queue",
+ HS_PROP_NAME_PUB_OFFLINE_QUEUE_MAX_CNT_PROTOPIC => ".pub_queue_max_cnt_pro_topic",
+
HS_PROP_NAME_GLOBAL_EXCLUDES_TYPE => "globalTypeExcludes",
HS_PROP_NAME_GLOBAL_EXCLUDES_READING => "globalReadingExcludes",
HS_PROP_NAME_GLOBAL_EXCLUDES_DEVICES => "globalDeviceExcludes",
@@ -306,8 +338,15 @@ sub startsWith($$) {
# Device define
sub Define() {
my ($hash, $def) = @_;
- # Definition :=> defmod mqttGeneric MQTT_GENERIC_BRIDGE [prefix] [devspec]
- my($name, $type, $prefix, $devspec) = split("[ \t][ \t]*", $def);
+ # Definition :=> defmod mqttGeneric MQTT_GENERIC_BRIDGE [prefix] [devspec,[devspec]]
+ my($name, $type, $prefix, @devspeca) = split("[ \t][ \t]*", $def);
+ # restlichen Parameter nach Leerzeichen trennen
+ # aus dem Array einen kommagetrennten String erstellen
+ my $devspec = join(",", @devspeca);
+ # Doppelte Kommas entfernen.
+ $devspec =~s/,+/,/g;
+ # damit ist jetzt Trennung der zu ueberwachenden Geraete mit Kommas, Leezeichen, Kommas mit Leerzeichen und Mischung davon moeglich
+ removeOldUserAttr($hash) if defined $hash->{+HS_PROP_NAME_PREFIX};
$prefix="mqtt" unless defined $prefix; # default prefix is 'mqtt'
@@ -315,6 +354,8 @@ sub Define() {
$hash->{+HELPER}->{+HS_PROP_NAME_PREFIX_OLD}=$hash->{+HS_PROP_NAME_PREFIX};
$hash->{+HS_PROP_NAME_PREFIX}=$prefix; # store in device hash
+ # wenn Leer oder nicht definiert => devspec auf '.*' setzen
+ $devspec = undef if $devspec eq '';
$hash->{+HS_PROP_NAME_DEVSPEC} = defined($devspec)?$devspec:".*";
Log3($hash->{NAME},5,"MQTT-GB:DEBUG:> [$hash->{NAME}] Define: params: $name, $type, $hash->{+HS_PROP_NAME_PREFIX}, $hash->{+HS_PROP_NAME_DEVSPEC}");
@@ -333,6 +374,9 @@ sub Define() {
#TODO: aktivieren, wenn gebraucht wird
$hash->{+HELPER}->{+HS_PROP_NAME_INTERVAL} = 60; # Sekunden
+ # Max messages count pro topic for offline queue
+ $hash->{+HS_PROP_NAME_PUB_OFFLINE_QUEUE_MAX_CNT_PROTOPIC} = 100;
+
readingsBeginUpdate($hash);
readingsBulkUpdate($hash,"incoming-count",$hash->{+HELPER}->{+HS_PROP_NAME_INCOMING_CNT});
readingsBulkUpdate($hash,"outgoing-count",$hash->{+HELPER}->{+HS_PROP_NAME_OUTGOING_CNT});
@@ -405,6 +449,7 @@ sub initUserAttr($) {
}
my @devices = devspec2array($devspec);
Log3($hash->{NAME},5,"MQTT-GB:DEBUG:> [$hash->{NAME}] initUserAttr: addToDevAttrList: $prefix");
+ #Log3('xxx',1,"MQTT-GB:DEBUG:> initUserAttr> devspec: '$devspec', array: ".Dumper(@devices));
foreach my $dev (@devices) {
addToDevAttrList($dev, $prefix.CTRL_ATTR_NAME_DEFAULTS.":textField-long");
addToDevAttrList($dev, $prefix.CTRL_ATTR_NAME_ALIAS.":textField-long");
@@ -492,7 +537,7 @@ sub updateDevCount($) {
sub removeOldUserAttr($;$$) {
my ($hash, $prefix, $devspec) = @_;
$prefix = $hash->{+HS_PROP_NAME_PREFIX} unless defined $prefix;
- # Pruefen, on ein weiteres Device (MQTT_GENERIC_BRIDGE) mit dem selben Prefic existiert (waere zwar Quatsch, aber dennoch)
+ # Pruefen, on ein weiteres Device (MQTT_GENERIC_BRIDGE) mit dem selben Prefix existiert (waere zwar Quatsch, aber dennoch)
my @bridges = devspec2array("TYPE=MQTT_GENERIC_BRIDGE");
my $name = $hash->{NAME};
foreach my $dev (@bridges) {
@@ -505,13 +550,16 @@ sub removeOldUserAttr($;$$) {
$devspec = 'global' if ($devspec eq '.*');
# kann spaeter auch delFromDevAttrList Methode genutzt werden
my @devices = devspec2array($devspec);
+
+ #Log3('xxx',1,"MQTT-GB:DEBUG:> removeOldUserAttr> devspec: $devspec, array: ".Dumper(@devices));
foreach my $dev (@devices) {
# neue Methode im fhem.pl
- delFromDevAttrList($dev,$prefix.CTRL_ATTR_NAME_DEFAULTS);
- delFromDevAttrList($dev,$prefix.CTRL_ATTR_NAME_ALIAS);
- delFromDevAttrList($dev,$prefix.CTRL_ATTR_NAME_PUBLISH);
- delFromDevAttrList($dev,$prefix.CTRL_ATTR_NAME_SUBSCRIBE);
- delFromDevAttrList($dev,$prefix.CTRL_ATTR_NAME_IGNORE);
+ #Log3('xxx',1,"MQTT-GB:DEBUG:> removeOldUserAttr> delete: from $dev ".$prefix.CTRL_ATTR_NAME_DEFAULTS);
+ delFromDevAttrList($dev,$prefix.CTRL_ATTR_NAME_DEFAULTS.":textField-long");
+ delFromDevAttrList($dev,$prefix.CTRL_ATTR_NAME_ALIAS.":textField-long");
+ delFromDevAttrList($dev,$prefix.CTRL_ATTR_NAME_PUBLISH.":textField-long");
+ delFromDevAttrList($dev,$prefix.CTRL_ATTR_NAME_SUBSCRIBE.":textField-long");
+ delFromDevAttrList($dev,$prefix.CTRL_ATTR_NAME_IGNORE.":both,incoming,outgoing");
# my $ua = $main::attr{$dev}{userattr};
# if (defined $ua) {
# my %h = map { ($_ => 1) } split(" ", "$ua");
@@ -656,7 +704,8 @@ sub CreateSingleDeviceTableAttrPublish($$$$) {
if(($ident eq 'topic') or ($ident eq 'readings-topic') or
($ident eq 'atopic') or ($ident eq 'attr-topic') or
($ident eq 'qos') or ($ident eq 'retain') or
- ($ident eq 'expression')) {
+ ($ident eq 'expression') or
+ ($ident eq 'resendOnConnect')) {
my @nameParts = split(/\|/, $name);
while (@nameParts) {
my $namePart = shift(@nameParts);
@@ -737,6 +786,15 @@ sub getDevicePublishRecIntern($$$$$) {
# wenn kein topic und keine expression definiert sind, kann auch nicht gesendet werden, es muss nichts mehr ausgewertet werden
return unless (defined($topic) or defined($atopic) or defined( $expression));
+ # resendOnConnect Option
+ my $resendOnConnect = undef;
+ $resendOnConnect = $readingMap->{'resendOnConnect'} if defined $readingMap;
+ $resendOnConnect = $defaultReadingMap->{'resendOnConnect'} if (defined($defaultReadingMap) and !defined($resendOnConnect));
+ # global
+ $resendOnConnect = $globalReadingMap->{'resendOnConnect'} if (defined($globalReadingMap) and !defined($resendOnConnect));
+ $resendOnConnect = $globalDefaultReadingMap->{'resendOnConnect'} if (defined($globalDefaultReadingMap) and !defined($resendOnConnect));
+
+ # map name
my $name = undef;
if (defined($devMap) and defined($devMap->{':alias'})) {
$name = $devMap->{':alias'}->{'pub:'.$reading};
@@ -746,8 +804,10 @@ sub getDevicePublishRecIntern($$$$$) {
}
$name = $reading unless defined $name;
+ # get mode
my $mode = $readingMap->{'mode'};
+ # compute defaults
my $combined = computeDefaults($hash, 'pub:', $globalMap, $devMap, {'device'=>$dev,'reading'=>$reading,'name'=>$name,'mode'=>$mode});
# $topic evaluieren (avialable vars: $device (device name), $reading (oringinal name), $name ($reading oder alias, if defined), defaults)
if(defined($topic) and ($topic =~ m/^{.*}$/)) {
@@ -757,7 +817,9 @@ sub getDevicePublishRecIntern($$$$$) {
$atopic = _evalValue2($hash->{NAME},$atopic,{'topic'=>$atopic,'device'=>$dev,'reading'=>$reading,'name'=>$name,%$combined}) if defined $atopic;
}
- return {'topic'=>$topic,'atopic'=>$atopic,'qos'=>$qos,'retain'=>$retain,'expression'=>$expression,'name'=>$name,'mode'=>$mode,'.defaultMap'=>$combined};
+ return {'topic'=>$topic,'atopic'=>$atopic,'qos'=>$qos,'retain'=>$retain,
+ 'expression'=>$expression,'name'=>$name,'mode'=>$mode,
+ 'resendOnConnect'=>$resendOnConnect,'.defaultMap'=>$combined};
}
# sucht Qos, Retain, Expression Werte unter Beruecksichtigung von Defaults und Globals
@@ -888,23 +950,23 @@ sub _evalValue2($$;$$) {
# Alte Methode, verwendet noch fixe Variable (base, dev, reading, name), kein Map
# soll durch _evalValue2 ersetzt werden
-sub _evalValue($$;$$$$) {
- my($mod, $str, $base, $device, $reading, $name) = @_;
- #Log3('xxx',1,"MQTT-GB:DEBUG:> eval: (str, base, dev, reading, name) $str, $base, $device, $reading, $name");
- my$ret = $str;
- #$base="" unless defined $base;
- if($str =~ m/^{.*}$/) {
- no strict "refs";
- local $@;
- #Log3('xxx',1,"MQTT-GB:DEBUG:> eval !!!");
- $ret = eval($str);
- #Log3('xxx',1,"MQTT-GB:DEBUG:> eval done: $ret");
- if ($@) {
- Log3($mod,2,"user value ('".$str."'') eval error: ".$@);
- }
- }
- return $ret;
-}
+# sub _evalValue($$;$$$$) {
+# my($mod, $str, $base, $device, $reading, $name) = @_;
+# #Log3('xxx',1,"MQTT-GB:DEBUG:> eval: (str, base, dev, reading, name) $str, $base, $device, $reading, $name");
+# my$ret = $str;
+# #$base="" unless defined $base;
+# if($str =~ m/^{.*}$/) {
+# no strict "refs";
+# local $@;
+# #Log3('xxx',1,"MQTT-GB:DEBUG:> eval !!!");
+# $ret = eval($str);
+# #Log3('xxx',1,"MQTT-GB:DEBUG:> eval done: $ret");
+# if ($@) {
+# Log3($mod,2,"user value ('".$str."'') eval error: ".$@);
+# }
+# }
+# return $ret;
+# }
# sucht zu dem gegebenen (ankommenden) topic das entsprechende device und reading
# Params: $hash, $topic (empfangene topic)
@@ -1016,25 +1078,40 @@ sub CreateSingleDeviceTableAttrSubscribe($$$$) {
if(($ident eq 'topic') or ($ident eq 'readings-topic') or
($ident eq 'stopic') or ($ident eq 'set-topic') or
($ident eq 'atopic') or ($ident eq 'attr-topic')) { # -> topic
- my $base=undef;
- if (defined($devMap->{':defaults'})) {
- $base = $devMap->{':defaults'}->{'sub:base'};
- }
- if (defined($globalMap) and defined($globalMap->{':defaults'}) and !defined($base)) {
- $base = $globalMap->{':defaults'}->{'sub:base'};
- }
- $base='' unless defined $base;
- $base = _evalValue($hash->{NAME},$base,$base,$dev,'$reading','$name');
-
$rmap->{'mode'} = 'R';
$rmap->{'mode'} = 'S' if (($ident eq 'stopic') or ($ident eq 'set-topic'));
$rmap->{'mode'} = 'A' if (($ident eq 'atopic') or ($ident eq 'attr-topic'));
- # $base verwenden => eval
- my $topic = _evalValue($hash->{NAME},$val,$base,$dev,'$reading','$name');
- $rmap->{'topicOrig'} = $val;
+ # my $base=undef;
+ # if (defined($devMap->{':defaults'})) {
+ # $base = $devMap->{':defaults'}->{'sub:base'};
+ # }
+ # if (defined($globalMap) and defined($globalMap->{':defaults'}) and !defined($base)) {
+ # $base = $globalMap->{':defaults'}->{'sub:base'};
+ # }
+ # $base='' unless defined $base;
+ # $base = _evalValue($hash->{NAME},$base,$base,$dev,'$reading','$name');
+
+ #Log3($hash->{NAME},1,"MQTT-GB:DEBUG:> sub: old base: $base");
+
+ # $base verwenden => eval
+ #my $topic = _evalValue($hash->{NAME},$val,$base,$dev,'$reading','$name');
+
+ my $combined = computeDefaults($hash, 'sub:', $globalMap, $devMap, {'device'=>$dev,'reading'=>'#reading','name'=>'#name','mode'=>$rmap->{'mode'}});
+ #Log3($hash->{NAME},1,"MQTT-GB:DEBUG:> sub: Defaults: ".Dumper($combined));
+ my $topic = _evalValue2($hash->{NAME},$val,{'device'=>$dev,'reading'=>'#reading','name'=>'#name',%$combined}) if defined $val;
+ my $old = '#reading';
+ my $new = '$reading';
+ $topic =~ s/\Q$old\E/$new/g;
+ $old = '#name';
+ $new = '$name';
+ $topic =~ s/\Q$old\E/$new/g;
+ #Log3($hash->{NAME},1,"MQTT-GB:DEBUG:> sub: Topic old: $topic");
+ #Log3($hash->{NAME},1,"MQTT-GB:DEBUG:> sub: Topic new: $topic");
+
+ $rmap->{'topicOrig'} = $val;
$rmap->{'topicExp'}=createRegexpForTopic($topic);
$topic =~ s/\$reading/+/g;
@@ -1389,42 +1466,25 @@ sub Get($$$@) {
}
}
$res.=" subscribe:\n";
+ my @resa;
foreach my $subRec (@{$hash->{+HS_TAB_NAME_DEVICES}->{$dname}->{':subscribe'}}) {
my $qos = $subRec->{'qos'};
my $mode = $subRec->{'mode'};
my $expression = $subRec->{'expression'};
my $topic = $subRec->{'topic'};
$topic = '---' unless defined $topic;
- $res.= sprintf(' %-16s <= %s', $subRec->{'reading'}, $topic);
- $res.= " (mode: $mode";
- $res.= "; qos: $qos" if defined ($qos);
- $res.= ")\n";
- $res.= " exp: $expression\n" if defined ($expression);
- # TODO
- # my $qos = $subRec->{'qos'};
- # my $mode = $subRec->{'mode'};
- # my $expression = $subRec->{'expression'};
- # my $topic = "---";
- # if($mode eq 'R') {
- # $topic = $subRec->{'topic'};
- # } elsif($mode eq 'S') {
- # $topic = $subRec->{'stopic'};
- # } elsif($mode eq 'A') {
- # $topic = $subRec->{'atopic'};
- # } else {
- # $topic = '!unexpected mode!';
- # }
- # $topic = '---' unless defined $topic;
- # $res.= sprintf(' %-16s <= %s', $subRec->{'reading'}, $topic);
- # $res.= " (mode: $mode";
- # $res.= "; qos: $qos" if defined ($qos);
- # $res.= ")\n";
- # $res.= " exp: $expression\n" if defined ($expression);
- # # TODO
+ my $rest.= sprintf(' %-16s <= %s', $subRec->{'reading'}, $topic);
+ $rest.= " (mode: $mode";
+ $rest.= "; qos: $qos" if defined ($qos);
+ $rest.= ")\n";
+ $rest.= " exp: $expression\n" if defined ($expression);
+ push (@resa, $rest);
}
+ $res.=join('', sort @resa);
}
$res.= "\n";
}
+ # TODO : Weitere Dev Infos?
$res = "no devices found" unless ($res ne "");
return $res;
#last;
@@ -1472,7 +1532,7 @@ sub Notify() {
RefreshDeviceTable($hash, $sdev, $attrName, $val);
} else {
#Log3($hash->{NAME},1,"MQTT-GB:DEBUG:> attr created/changed: non observed attr");
- # TODO: check/ publish atopic => val
+ # check/ publish atopic => val
publishDeviceUpdate($hash, $defs{$sdev}, 'A', $attrName, $val);
}
} elsif($s =~ m/^DELETEATTR ([^ ]*) ([^ ]*)$/) {
@@ -1483,7 +1543,7 @@ sub Notify() {
if(IsObservedAttribute($hash,$attrName)) {
RefreshDeviceTable($hash, $sdev, $attrName, undef);
} else {
- # TODO: check/ publish atopic => null
+ # check/ publish atopic => null
publishDeviceUpdate($hash, $defs{$sdev}, 'A', $attrName, undef);
}
}
@@ -1638,8 +1698,54 @@ sub isTypeDevReadingExcluded($$$$) {
# MQTT-Nachricht senden
# Params: Bridge-Hash, Topic, Nachricht, QOS- und Retain-Flags
-sub doPublish($$$$$) {
- my ($hash,$topic,$message,$qos,$retain) = @_;
+sub doPublish($$$$$$) {
+ my ($hash,$topic,$message,$qos,$retain,$resendOnConnect) = @_;
+
+ #Log3($hash->{NAME},1,"MQTT_GENERIC_BRIDGE doPublish: topic: $topic, msg: $message, resend mode: ".(defined($resendOnConnect)?$resendOnConnect:"no"));
+ if(!isConnected($hash)) {
+ # store message?
+ if(defined($resendOnConnect)) {
+ $resendOnConnect = lc($resendOnConnect);
+ Log3($hash->{NAME},5,"MQTT_GENERIC_BRIDGE offline publish: store: topic: $topic, msg: $message, mode: $resendOnConnect");
+ if($resendOnConnect eq 'first' or $resendOnConnect eq 'last' or $resendOnConnect eq 'all') {
+ # store msg data
+ my $queue = $hash->{+HELPER}->{+HS_PROP_NAME_PUB_OFFLINE_QUEUE};
+ #my $queue = $hash->{+HS_PROP_NAME_PUB_OFFLINE_QUEUE};
+ $queue = {} unless defined $queue;
+
+ my $entry = {'topic'=>$topic, 'message'=>$message, 'qos'=>$qos, 'retain'=>$retain, 'resendOnConnect'=>$resendOnConnect};
+ my $topicQueue = $queue->{$topic};
+ unless (defined($topicQueue)) {
+ $topicQueue = [$entry];
+ }
+ else {
+ if ($resendOnConnect eq 'first') {
+ if (scalar @$topicQueue == 0) {
+ $topicQueue = [$entry];
+ }
+ } elsif($resendOnConnect eq 'last') {
+ $topicQueue = [$entry];
+ } else { # all
+ push (@$topicQueue, $entry);
+ }
+ }
+ # check max lng
+ my $max = $hash->{+HS_PROP_NAME_PUB_OFFLINE_QUEUE_MAX_CNT_PROTOPIC};
+ $max = 10 unless defined $max;
+ while (scalar @$topicQueue > $max) {
+ shift @$topicQueue;
+ }
+
+ $queue->{$topic} = $topicQueue;
+ #Log3($hash->{NAME},1,"MQTT_GENERIC_BRIDGE offline publish: stored: ".Dumper($queue));
+
+ $hash->{+HELPER}->{+HS_PROP_NAME_PUB_OFFLINE_QUEUE} = $queue;
+ }
+ }
+ return;
+ }
+
+ Log3($hash->{NAME},5,"MQTT_GENERIC_BRIDGE publish: $topic => $message (qos: $qos, retain: ".(defined($retain)?$retain:'0').")");
if (isIODevMQTT2($hash)){ #if ($hash->{+HELPER}->{+IO_DEV_TYPE} eq 'MQTT2_SERVER') {
# TODO: publish MQTT2
@@ -1695,6 +1801,7 @@ sub publishDeviceUpdate($$$$$) {
my $retain = $pubRec->{'retain'};
my $expression = $pubRec->{'expression'};
my $base = $pubRec->{'base'};
+ my $resendOnConnect = $pubRec->{'resendOnConnect'};
$base='' unless defined $base;
$value="\0" unless defined $value;
@@ -1732,10 +1839,10 @@ sub publishDeviceUpdate($$$$$) {
if(defined($redefMap)) {
foreach my $key (keys %{$redefMap}) {
my $val = $redefMap->{$key};
- doPublish($hash,$key,$val,$qos,$retain);
+ doPublish($hash,$key,$val,$qos,$retain,$resendOnConnect);
}
} else {
- doPublish($hash,$topic,$message,$qos,$retain) if defined $topic;
+ doPublish($hash,$topic,$message,$qos,$retain,$resendOnConnect) if defined $topic;
}
}
@@ -1846,7 +1953,22 @@ sub ioDevConnect($) {
#Log3($hash->{NAME},1,"MQTT_GENERIC_BRIDGE DEBUG: ioDevConnect");
- # TODO
+ # resend stored msgs => doPublish (...., undef)
+ my $queue = $hash->{+HELPER}->{+HS_PROP_NAME_PUB_OFFLINE_QUEUE};
+ if (defined($queue)) {
+ foreach my $topic (keys %{$queue}) {
+ my $topicQueue = $queue->{$topic};
+ my $topicRec = undef;
+ while ($topicRec = shift(@$topicQueue)) {
+ my $message = $topicRec->{'message'};
+ my $qos = $topicRec->{'qos'};
+ my $retain = $topicRec->{'retain'};
+ my $resendOnConnect = undef; #$topicRec->{'resendOnConnect'};
+ doPublish($hash,$topic,$message,$qos,$retain,$resendOnConnect);
+ }
+ }
+ }
+
}
# CallBack-Handler fuer IODev beim Disconnect
@@ -1964,7 +2086,7 @@ sub onmessage($$$) {
MQTT_GENERIC_BRIDGE
- This module is an MQTT bridge, which simultaneously collects data from several FHEM devices
+ This module is a MQTT bridge, which simultaneously collects data from several FHEM devices
and passes their readings via MQTT or set readings from the incoming MQTT messages or executes them
as a 'set' command on the configured FHEM device.
An MQTT device is needed as IODev.
@@ -1974,7 +2096,7 @@ sub onmessage($$$) {
Definition:
In the simplest case, two lines are enough:
- defmod mqttGeneric MQTT_GENERIC_BRIDGE [prefix] [devspec]
+ defmod mqttGeneric MQTT_GENERIC_BRIDGE [prefix] [devspec,[devspec]
attr mqttGeneric IODev
All parameters in the define are optional.
The first parameter is a prefix for the control attributes on which the devices to be
@@ -1983,6 +2105,7 @@ sub onmessage($$$) {
The second parameter ('devspec') allows to minimize the number of devices to be monitored
(otherwise all devices will be monitored, which may cost performance).
+ Example for devspec: 'TYPE=dummy' or 'dummy1,dummy2'. With comma separated list no spaces must be used!
see devspec
@@ -2159,13 +2282,27 @@ sub onmessage($$$) {
The return value is used as a new message value, the changed variables have priority.
If the return value is undef, setting / execution is suppressed.
If the return is a hash (topic only), its key values are used as the topic, and the contents of the messages are the values from the hash.
+ Option 'resendOnConnect' allows to save the messages,
+ if the bridge is not connected to the MQTT server.
+ The messages to be sent are stored in a queue.
+ When the connection is established, the messages are sent in the original order.
+
Possible values:
+ - none
discard all
+ - last
save only the last message
+ - first
save only the first message
+ then discard the following
+ - all
save all, but if there is an upper limit of 100, if it is more, the most supernatural messages are discarded.
+
Examples:
attr <dev> mqttPublish temperature:topic={"$base/$name"} temperature:qos=1 temperature:retain=0 *:topic={"$base/$name"} humidity:topic=/TEST/Feuchte
attr <dev> mqttPublish temperature|humidity:topic={"$base/$name"} temperature|humidity:qos=1 temperature|humidity:retain=0
attr <dev> mqttPublish *:topic={"$base/$name"} *:qos=2 *:retain=0
attr <dev> mqttPublish *:topic={"$base/$name"} reading:expression={"message: $value"}
attr <dev> mqttPublish *:topic={"$base/$name"} reading:expression={$value="message: $value"}
- attr <dev> mqttPublish *:topic={"$base/$name"} reading:expression={"/TEST/Topic1"=>"$message", "/TEST/Topic2"=>"message: $message"}
+ attr <dev> mqttPublish *:topic={"$base/$name"} reading:expression={"/TEST/Topic1"=>"$message", "/TEST/Topic2"=>"message: $message"}
+ attr <dev> mqttPublish *:resendOnConnect=last
+
+
@@ -2328,7 +2465,7 @@ sub onmessage($$$) {
Definition:
Im einfachsten Fall reichen schon zwei Zeilen:
- defmod mqttGeneric MQTT_GENERIC_BRIDGE [prefix] [devspec]
+ defmod mqttGeneric MQTT_GENERIC_BRIDGE [prefix] [devspec,[devspec]]
attr mqttGeneric IODev
Alle Parameter im Define sind optional.
Der erste ist ein Prefix fuer die Steuerattribute, worueber die zu ueberwachende Geraete (s.u.)
@@ -2336,6 +2473,7 @@ sub onmessage($$$) {
Wird dieser z.B. als 'hugo' redefiniert, heissen die Steuerungsattribute entsprechend hugoPublish etc.
Der zweite Parameter ('devspec') erlaubt die Menge der zu ueberwachenden Geraeten
zu begrenzen (sonst werden einfach alle ueberwacht, was jedoch Performance kosten kann).
+ Beispiel fuer devspec: 'TYPE=dummy' oder 'dummy1,dummy2'. Bei kommaseparierten Liste duerfen keine Leerzeichen verwendet werden!
s.a. devspec
@@ -2519,13 +2657,27 @@ sub onmessage($$$) {
Ist der Rueckgabewert undef, dann wird das Setzen/Ausfuehren unterbunden.
Ist die Rueckgabe ein Hash (nur 'topic'), werden seine Schluesselwerte als Topic verwendet,
die Inhalte der Nachrichten sind entsprechend die Werte aus dem Hash.
+ Option 'resendOnConnect' erlaubt eine Speicherung der Nachrichten,
+ wenn keine Verbindung zu dem MQTT-Server besteht.
+ Die zu sendende Nachrichten in einer Warteschlange gespeichet.
+ Wird die Verbindung aufgebaut, werden die Nachrichten in der ursprungichen Reihenfolge verschickt.
+
Moegliche Werte:
+ - none
alle verwerfen
+ - last
immer nur die letzte Nachricht speichern
+ - first
immer nur die erste Nachricht speichern danach folgende verwerfen
+ - all
alle speichern, allerdings existiert eine Obergrenze von 100,
+ wird es mehr, werden aelteste ueberzaelige Nachrichten verworfen.
+
+
Beispiele:
attr <dev> mqttPublish temperature:topic={"$base/$name"} temperature:qos=1 temperature:retain=0 *:topic={"$base/$name"} humidity:topic=/TEST/Feuchte
attr <dev> mqttPublish temperature|humidity:topic={"$base/$name"} temperature|humidity:qos=1 temperature|humidity:retain=0
attr <dev> mqttPublish *:topic={"$base/$name"} *:qos=2 *:retain=0
attr <dev> mqttPublish *:topic={"$base/$name"} reading:expression={"message: $value"}
attr <dev> mqttPublish *:topic={"$base/$name"} reading:expression={$value="message: $value"}
- attr <dev> mqttPublish *:topic={"$base/$name"} reading:expression={"/TEST/Topic1"=>"$message", "/TEST/Topic2"=>"message: $message"}
+ attr <dev> mqttPublish *:topic={"$base/$name"} reading:expression={"/TEST/Topic1"=>"$message", "/TEST/Topic2"=>"message: $message"}
+ attr <dev> mqttPublish [...] *:resendOnConnect=last
+