diff --git a/fhem/FHEM/10_MQTT_GENERIC_BRIDGE.pm b/fhem/FHEM/10_MQTT_GENERIC_BRIDGE.pm index 473e1419b..b29004f47 100644 --- a/fhem/FHEM/10_MQTT_GENERIC_BRIDGE.pm +++ b/fhem/FHEM/10_MQTT_GENERIC_BRIDGE.pm @@ -30,6 +30,19 @@ # # CHANGE LOG # +# 15.11.2018 0.9.9 +# fix : Pruefung im Parse auf das richtige IODev gefixt (mqtt2). +# fix : Trigger-Event bei Aenderung der Attribute (mqtt2). +# feature : Beim publish (global publish related) pruefen, ob das Geraet +# dem devspec im DEF entspricht (falls vorhanden) +# +# 14.11.2018 0.9.9 +# feature : Unterstuetzung fuer MQTT2 -> subscribe (Parse) +# ! Erfordert Aenderung in MQTT2_CLIENT und MQTT2_SERVER ! +# ! in $hash->{Clients} = ":MQTT2_DEVICE:MQTT_GENERIC_BRIDGE:"; +# ! und $hash->{MatchList}= { "1:MQTT2_DEVICE" => "^.*", +# "2:MQTT_GENERIC_BRIDGE"=>"^.*" }, +# # 11.11.2018 0.9.9 # change : import fuer json2nameValue aus main. # Damit geht JSON-Unterstuetzung ohne Prefix 'main::' @@ -256,6 +269,10 @@ sub MQTT_GENERIC_BRIDGE_Initialize($) { $hash->{NotifyFn} = "MQTT::GENERIC_BRIDGE::Notify"; $hash->{AttrFn} = "MQTT::GENERIC_BRIDGE::Attr"; $hash->{OnMessageFn} = "MQTT::GENERIC_BRIDGE::onmessage"; + #$hash->{RenameFn} = "MQTT::GENERIC_BRIDGE::Rename"; + + $hash->{Match} = ".*"; + $hash->{ParseFn} = "MQTT::GENERIC_BRIDGE::Parse"; $hash->{OnClientConnectFn} = "MQTT::GENERIC_BRIDGE::ioDevConnect"; $hash->{OnClientDisconnectFn} = "MQTT::GENERIC_BRIDGE::ioDevDisconnect"; @@ -331,6 +348,7 @@ BEGIN { InternalTimer RemoveInternalTimer json2nameValue + IOWrite CTRL_ATTR_NAME_DEFAULTS CTRL_ATTR_NAME_ALIAS CTRL_ATTR_NAME_PUBLISH @@ -501,25 +519,35 @@ sub retrieveIODev($) { $iodt = $defs{$iodn}{TYPE}; } $hash->{+HELPER}->{+IO_DEV_TYPE} = $iodt; - return $hash->{+HELPER}->{+IO_DEV_TYPE}; + return ($iodt, $iodn); } # prueft, ob IODev MQTT-Instanz ist sub isIODevMQTT($) { my ($hash) = @_; - my $iodt = retrieveIODev($hash); + my ($iodt, $iodn) = retrieveIODev($hash); return 0 unless defined $iodt; return 0 unless $iodt eq 'MQTT'; return 1; } +sub checkIODevMQTT2($) { + my ($iodt) = @_; + return 0 unless defined $iodt; + return 1 if $iodt eq 'MQTT2_SERVER'; + return 1 if $iodt eq 'MQTT2_CLIENT'; + return 0; +} + # prueft, ob IODev MQTT2-Instanz ist sub isIODevMQTT2($) { my ($hash) = @_; - my $iodt = retrieveIODev($hash); - return 0 unless defined $iodt; - return 0 unless $iodt eq 'MQTT2_SERVER'; - return 1; + my ($iodt, $iodn) = retrieveIODev($hash); + # return 0 unless defined $iodt; + # return 1 if $iodt eq 'MQTT2_SERVER'; + # return 1 if $iodt eq 'MQTT2_CLIENT'; + # return 0; + return checkIODevMQTT2($iodt); } # Fuegt notwendige UserAttr hinzu @@ -581,9 +609,9 @@ sub firstInit($) { if (isIODevMQTT($hash)) { MQTT::client_start($hash); #if defined $hash->{+HELPER}->{+IO_DEV_TYPE} and $hash->{+HELPER}->{+IO_DEV_TYPE} eq 'MQTT'; - readingsSingleUpdate($hash,"transmission-state","IO device initialized",1); + readingsSingleUpdate($hash,"transmission-state","IO device initialized (mqtt)",1); } elsif (isIODevMQTT2($hash)) { - readingsSingleUpdate($hash,"transmission-state","unsupported IO device",1); + readingsSingleUpdate($hash,"transmission-state","IO device initialized (mqtt2)",1); } else { readingsSingleUpdate($hash,"transmission-state","unknown IO device",1); } @@ -617,7 +645,20 @@ sub timerProc($) { sub isConnected($) { my $hash = shift; return MQTT::isConnected($hash->{IODev}) if isIODevMQTT($hash); #if $hash->{+HELPER}->{+IO_DEV_TYPE} eq 'MQTT'; - return 1 if isIODevMQTT2($hash); #if $hash->{+HELPER}->{+IO_DEV_TYPE} eq 'MQTT2_SERVER'; + + return 1 if isIODevMQTT2($hash); + # ich weiß nicht, ob das eine gute Idee ist, zu prüfen, evtl. wird FHEM-Standard-writeBuffef für das Senden nach dem Connect selbst sorgen + # in diesem Fall koenne wir annehmen, dass immer connected ist und keine eigene Warteschlangen verwenden + # my ($iodt, $iodn) = retrieveIODev($hash); + # return 0 unless defined $iodt; + # return 1 if $iodt eq 'MQTT2_SERVER'; # immer 'verbunden' + # if($iodt eq 'MQTT2_CLIENT') { # Status pruefen + # my $iodn = AttrVal($hash->{NAME}, "IODev", undef); + # return 1 if (ReadingsVal($iodn, "state", "") eq "opened"); + # return 0; + # } + + return 0; } # Berechnet Anzahl der ueberwachten Geraete neu @@ -2046,7 +2087,7 @@ sub doPublish($$$$$$$$) { $hash->{message_ids}->{$msgid}++ if defined $msgid; return 'empty topic or message'; } else { - my $iodt = retrieveIODev($hash); + my ($iodt, $iodn) = retrieveIODev($hash); $iodt = 'undef' unless defined $iodt; Log3($hash->{NAME},1,"MQTT_GENERIC_BRIDGE: unknown IODev: ".$iodt); return 'unknown IODev'; @@ -2067,6 +2108,15 @@ sub publishDeviceUpdate($$$$$) { return if($type eq "MQTT_GENERIC_BRIDGE"); return if($type eq "MQTT"); return if($reading eq "transmission-state"); + + # nicht durch devspec abgedeckte Geraete verwerfen + my $devspec = $hash->{+HS_PROP_NAME_DEVSPEC}; + if (defined($devspec) and ($devspec ne '') and ($devspec ne '.*')) { + my @devices = devspec2array($devspec); + # check device exists in the list + return unless grep {$_ eq $devn} @devices; + } + # extra definierte (ansonsten gilt eine Defaultliste) Types/Readings auschliessen. return if(isTypeDevReadingExcluded($hash, 'pub', $type, $devn, $reading)); @@ -2244,7 +2294,7 @@ sub Attr($$$$) { # CallBack-Handler fuer IODev beim Connect sub ioDevConnect($) { my $hash = shift; - return if isIODevMQTT2($hash); #if $hash->{+HELPER}->{+IO_DEV_TYPE} eq 'MQTT2_SERVER'; + return if isIODevMQTT2($hash); #if $hash->{+HELPER}->{+IO_DEV_TYPE} eq 'MQTT2_SERVER'; # TODO # ueberraschenderweise notwendig fuer eine subscribe-Initialisierung. MQTT::client_start($hash) if isIODevMQTT($hash); @@ -2279,11 +2329,11 @@ sub ioDevDisconnect($) { #Log3($hash->{NAME},1,"MQTT_GENERIC_BRIDGE:DEBUG:> ioDevDisconnect"); - # TODO + # TODO ? } # Per MQTT-Empfangenen Aktualisierungen an die entsprechende Geraete anwenden -# Params: Bridge-Hash, Modus (R=Readings, A=Attribute), Reading/Attribute-Name, Nachricht +# Params: Bridge-Hash, Modus (R=Readings, A=Attribute), Device, Reading/Attribute-Name, Nachricht sub doSetUpdate($$$$$) { my ($hash,$mode,$device,$reading,$message) = @_; @@ -2336,6 +2386,32 @@ sub doSetUpdate($$$$$) { return "internal error"; } +# Call von IODev-Dispatch (e.g.MQTT2) +sub Parse($$) { + my ($iodev, $msg) = @_; + my $ioname = $iodev->{NAME}; + #my $iotype = $iodev->{TYPE}; + #Log3($iodev->{NAME},1,"MQTT_GENERIC_BRIDGE: Parse: IODev: $ioname"); + #Log3("XXX",1,"MQTT_GENERIC_BRIDGE: Parse: $msg"); + + my ($cid, $topic, $value) = split(":", $msg, 3); + + my @instances = devspec2array("TYPE=MQTT_GENERIC_BRIDGE"); + foreach my $dev (@instances) { + my $hash = $defs{$dev}; + # Name mit IODev vegleichen + my ($iiodt, $iiodn) = retrieveIODev($hash); + #Log3($hash->{NAME},1,"MQTT_GENERIC_BRIDGE: Parse: test IODev: $iiodn vs. $ioname"); + next unless $ioname eq $iiodn; + next unless checkIODevMQTT2($iiodt); + #next unless isIODevMQTT2($hash); + + Log3($hash->{NAME},5,"MQTT_GENERIC_BRIDGE: Parse ($iiodt : '$ioname'): Msg: $topic => $value"); + + return onmessage($hash, $topic, $value); + } +} + # Routine MQTT-Message Callback sub onmessage($$$) { my ($hash,$topic,$message) = @_; @@ -2397,21 +2473,31 @@ sub onmessage($$$) { next unless defined $message; my $updated = 0; + my @updatedList; if(defined($redefMap)) { foreach my $key (keys %{$redefMap}) { my $val = $redefMap->{$key}; my $r = doSetUpdate($hash,$mode,$device,$key,$val); - $updated = 1 unless defined $r; + unless (defined($r)) { + $updated = 1; + push(@updatedList, $device); + } } } else { my $r = doSetUpdate($hash,$mode,$device,$reading,$message); - $updated = 1 unless defined $r; + unless (defined($r)) { + $updated = 1; + push(@updatedList, $device); + } } # TODO: ggf. Update Last Received implementieren (nicht ganz einfach). #if($updated) { #updateSubTime($device,$reading); #} + + return @updatedList if($updated); + return undef; } }