2
0
mirror of https://github.com/fhem/fhem-mirror.git synced 2025-04-22 14:16:42 +00:00

added: subscribe support for mqtt2 IODev

feature: check devspec (from DEF) while wildcard publishing

git-svn-id: https://svn.fhem.de/fhem/trunk@17754 2b470e98-0d58-463d-a4d8-8e2adae1ed80
This commit is contained in:
hexenmeister 2018-11-15 21:39:09 +00:00
parent a54fe91ebf
commit 8a4c98881a

View File

@ -30,6 +30,19 @@
# #
# CHANGE LOG # CHANGE LOG
# #
# 15.11.2018 0.9.9
# fix : Pruefung im Parse auf das richtige IODev gefixt (mqtt2).
# fix : Trigger-Event bei Aenderung der Attribute (mqtt2).
# feature : Beim publish (global publish related) pruefen, ob das Geraet
# dem devspec im DEF entspricht (falls vorhanden)
#
# 14.11.2018 0.9.9
# feature : Unterstuetzung fuer MQTT2 -> subscribe (Parse)
# ! Erfordert Aenderung in MQTT2_CLIENT und MQTT2_SERVER !
# ! in $hash->{Clients} = ":MQTT2_DEVICE:MQTT_GENERIC_BRIDGE:";
# ! und $hash->{MatchList}= { "1:MQTT2_DEVICE" => "^.*",
# "2:MQTT_GENERIC_BRIDGE"=>"^.*" },
#
# 11.11.2018 0.9.9 # 11.11.2018 0.9.9
# change : import fuer json2nameValue aus main. # change : import fuer json2nameValue aus main.
# Damit geht JSON-Unterstuetzung ohne Prefix 'main::' # Damit geht JSON-Unterstuetzung ohne Prefix 'main::'
@ -256,6 +269,10 @@ sub MQTT_GENERIC_BRIDGE_Initialize($) {
$hash->{NotifyFn} = "MQTT::GENERIC_BRIDGE::Notify"; $hash->{NotifyFn} = "MQTT::GENERIC_BRIDGE::Notify";
$hash->{AttrFn} = "MQTT::GENERIC_BRIDGE::Attr"; $hash->{AttrFn} = "MQTT::GENERIC_BRIDGE::Attr";
$hash->{OnMessageFn} = "MQTT::GENERIC_BRIDGE::onmessage"; $hash->{OnMessageFn} = "MQTT::GENERIC_BRIDGE::onmessage";
#$hash->{RenameFn} = "MQTT::GENERIC_BRIDGE::Rename";
$hash->{Match} = ".*";
$hash->{ParseFn} = "MQTT::GENERIC_BRIDGE::Parse";
$hash->{OnClientConnectFn} = "MQTT::GENERIC_BRIDGE::ioDevConnect"; $hash->{OnClientConnectFn} = "MQTT::GENERIC_BRIDGE::ioDevConnect";
$hash->{OnClientDisconnectFn} = "MQTT::GENERIC_BRIDGE::ioDevDisconnect"; $hash->{OnClientDisconnectFn} = "MQTT::GENERIC_BRIDGE::ioDevDisconnect";
@ -331,6 +348,7 @@ BEGIN {
InternalTimer InternalTimer
RemoveInternalTimer RemoveInternalTimer
json2nameValue json2nameValue
IOWrite
CTRL_ATTR_NAME_DEFAULTS CTRL_ATTR_NAME_DEFAULTS
CTRL_ATTR_NAME_ALIAS CTRL_ATTR_NAME_ALIAS
CTRL_ATTR_NAME_PUBLISH CTRL_ATTR_NAME_PUBLISH
@ -501,25 +519,35 @@ sub retrieveIODev($) {
$iodt = $defs{$iodn}{TYPE}; $iodt = $defs{$iodn}{TYPE};
} }
$hash->{+HELPER}->{+IO_DEV_TYPE} = $iodt; $hash->{+HELPER}->{+IO_DEV_TYPE} = $iodt;
return $hash->{+HELPER}->{+IO_DEV_TYPE}; return ($iodt, $iodn);
} }
# prueft, ob IODev MQTT-Instanz ist # prueft, ob IODev MQTT-Instanz ist
sub isIODevMQTT($) { sub isIODevMQTT($) {
my ($hash) = @_; my ($hash) = @_;
my $iodt = retrieveIODev($hash); my ($iodt, $iodn) = retrieveIODev($hash);
return 0 unless defined $iodt; return 0 unless defined $iodt;
return 0 unless $iodt eq 'MQTT'; return 0 unless $iodt eq 'MQTT';
return 1; return 1;
} }
sub checkIODevMQTT2($) {
my ($iodt) = @_;
return 0 unless defined $iodt;
return 1 if $iodt eq 'MQTT2_SERVER';
return 1 if $iodt eq 'MQTT2_CLIENT';
return 0;
}
# prueft, ob IODev MQTT2-Instanz ist # prueft, ob IODev MQTT2-Instanz ist
sub isIODevMQTT2($) { sub isIODevMQTT2($) {
my ($hash) = @_; my ($hash) = @_;
my $iodt = retrieveIODev($hash); my ($iodt, $iodn) = retrieveIODev($hash);
return 0 unless defined $iodt; # return 0 unless defined $iodt;
return 0 unless $iodt eq 'MQTT2_SERVER'; # return 1 if $iodt eq 'MQTT2_SERVER';
return 1; # return 1 if $iodt eq 'MQTT2_CLIENT';
# return 0;
return checkIODevMQTT2($iodt);
} }
# Fuegt notwendige UserAttr hinzu # Fuegt notwendige UserAttr hinzu
@ -581,9 +609,9 @@ sub firstInit($) {
if (isIODevMQTT($hash)) { if (isIODevMQTT($hash)) {
MQTT::client_start($hash); #if defined $hash->{+HELPER}->{+IO_DEV_TYPE} and $hash->{+HELPER}->{+IO_DEV_TYPE} eq 'MQTT'; MQTT::client_start($hash); #if defined $hash->{+HELPER}->{+IO_DEV_TYPE} and $hash->{+HELPER}->{+IO_DEV_TYPE} eq 'MQTT';
readingsSingleUpdate($hash,"transmission-state","IO device initialized",1); readingsSingleUpdate($hash,"transmission-state","IO device initialized (mqtt)",1);
} elsif (isIODevMQTT2($hash)) { } elsif (isIODevMQTT2($hash)) {
readingsSingleUpdate($hash,"transmission-state","unsupported IO device",1); readingsSingleUpdate($hash,"transmission-state","IO device initialized (mqtt2)",1);
} else { } else {
readingsSingleUpdate($hash,"transmission-state","unknown IO device",1); readingsSingleUpdate($hash,"transmission-state","unknown IO device",1);
} }
@ -617,7 +645,20 @@ sub timerProc($) {
sub isConnected($) { sub isConnected($) {
my $hash = shift; my $hash = shift;
return MQTT::isConnected($hash->{IODev}) if isIODevMQTT($hash); #if $hash->{+HELPER}->{+IO_DEV_TYPE} eq 'MQTT'; return MQTT::isConnected($hash->{IODev}) if isIODevMQTT($hash); #if $hash->{+HELPER}->{+IO_DEV_TYPE} eq 'MQTT';
return 1 if isIODevMQTT2($hash); #if $hash->{+HELPER}->{+IO_DEV_TYPE} eq 'MQTT2_SERVER';
return 1 if isIODevMQTT2($hash);
# ich weiß nicht, ob das eine gute Idee ist, zu prüfen, evtl. wird FHEM-Standard-writeBuffef für das Senden nach dem Connect selbst sorgen
# in diesem Fall koenne wir annehmen, dass immer connected ist und keine eigene Warteschlangen verwenden
# my ($iodt, $iodn) = retrieveIODev($hash);
# return 0 unless defined $iodt;
# return 1 if $iodt eq 'MQTT2_SERVER'; # immer 'verbunden'
# if($iodt eq 'MQTT2_CLIENT') { # Status pruefen
# my $iodn = AttrVal($hash->{NAME}, "IODev", undef);
# return 1 if (ReadingsVal($iodn, "state", "") eq "opened");
# return 0;
# }
return 0;
} }
# Berechnet Anzahl der ueberwachten Geraete neu # Berechnet Anzahl der ueberwachten Geraete neu
@ -2046,7 +2087,7 @@ sub doPublish($$$$$$$$) {
$hash->{message_ids}->{$msgid}++ if defined $msgid; $hash->{message_ids}->{$msgid}++ if defined $msgid;
return 'empty topic or message'; return 'empty topic or message';
} else { } else {
my $iodt = retrieveIODev($hash); my ($iodt, $iodn) = retrieveIODev($hash);
$iodt = 'undef' unless defined $iodt; $iodt = 'undef' unless defined $iodt;
Log3($hash->{NAME},1,"MQTT_GENERIC_BRIDGE: unknown IODev: ".$iodt); Log3($hash->{NAME},1,"MQTT_GENERIC_BRIDGE: unknown IODev: ".$iodt);
return 'unknown IODev'; return 'unknown IODev';
@ -2067,6 +2108,15 @@ sub publishDeviceUpdate($$$$$) {
return if($type eq "MQTT_GENERIC_BRIDGE"); return if($type eq "MQTT_GENERIC_BRIDGE");
return if($type eq "MQTT"); return if($type eq "MQTT");
return if($reading eq "transmission-state"); return if($reading eq "transmission-state");
# nicht durch devspec abgedeckte Geraete verwerfen
my $devspec = $hash->{+HS_PROP_NAME_DEVSPEC};
if (defined($devspec) and ($devspec ne '') and ($devspec ne '.*')) {
my @devices = devspec2array($devspec);
# check device exists in the list
return unless grep {$_ eq $devn} @devices;
}
# extra definierte (ansonsten gilt eine Defaultliste) Types/Readings auschliessen. # extra definierte (ansonsten gilt eine Defaultliste) Types/Readings auschliessen.
return if(isTypeDevReadingExcluded($hash, 'pub', $type, $devn, $reading)); return if(isTypeDevReadingExcluded($hash, 'pub', $type, $devn, $reading));
@ -2244,7 +2294,7 @@ sub Attr($$$$) {
# CallBack-Handler fuer IODev beim Connect # CallBack-Handler fuer IODev beim Connect
sub ioDevConnect($) { sub ioDevConnect($) {
my $hash = shift; my $hash = shift;
return if isIODevMQTT2($hash); #if $hash->{+HELPER}->{+IO_DEV_TYPE} eq 'MQTT2_SERVER'; return if isIODevMQTT2($hash); #if $hash->{+HELPER}->{+IO_DEV_TYPE} eq 'MQTT2_SERVER'; # TODO
# ueberraschenderweise notwendig fuer eine subscribe-Initialisierung. # ueberraschenderweise notwendig fuer eine subscribe-Initialisierung.
MQTT::client_start($hash) if isIODevMQTT($hash); MQTT::client_start($hash) if isIODevMQTT($hash);
@ -2279,11 +2329,11 @@ sub ioDevDisconnect($) {
#Log3($hash->{NAME},1,"MQTT_GENERIC_BRIDGE:DEBUG:> ioDevDisconnect"); #Log3($hash->{NAME},1,"MQTT_GENERIC_BRIDGE:DEBUG:> ioDevDisconnect");
# TODO # TODO ?
} }
# Per MQTT-Empfangenen Aktualisierungen an die entsprechende Geraete anwenden # Per MQTT-Empfangenen Aktualisierungen an die entsprechende Geraete anwenden
# Params: Bridge-Hash, Modus (R=Readings, A=Attribute), Reading/Attribute-Name, Nachricht # Params: Bridge-Hash, Modus (R=Readings, A=Attribute), Device, Reading/Attribute-Name, Nachricht
sub doSetUpdate($$$$$) { sub doSetUpdate($$$$$) {
my ($hash,$mode,$device,$reading,$message) = @_; my ($hash,$mode,$device,$reading,$message) = @_;
@ -2336,6 +2386,32 @@ sub doSetUpdate($$$$$) {
return "internal error"; return "internal error";
} }
# Call von IODev-Dispatch (e.g.MQTT2)
sub Parse($$) {
my ($iodev, $msg) = @_;
my $ioname = $iodev->{NAME};
#my $iotype = $iodev->{TYPE};
#Log3($iodev->{NAME},1,"MQTT_GENERIC_BRIDGE: Parse: IODev: $ioname");
#Log3("XXX",1,"MQTT_GENERIC_BRIDGE: Parse: $msg");
my ($cid, $topic, $value) = split(":", $msg, 3);
my @instances = devspec2array("TYPE=MQTT_GENERIC_BRIDGE");
foreach my $dev (@instances) {
my $hash = $defs{$dev};
# Name mit IODev vegleichen
my ($iiodt, $iiodn) = retrieveIODev($hash);
#Log3($hash->{NAME},1,"MQTT_GENERIC_BRIDGE: Parse: test IODev: $iiodn vs. $ioname");
next unless $ioname eq $iiodn;
next unless checkIODevMQTT2($iiodt);
#next unless isIODevMQTT2($hash);
Log3($hash->{NAME},5,"MQTT_GENERIC_BRIDGE: Parse ($iiodt : '$ioname'): Msg: $topic => $value");
return onmessage($hash, $topic, $value);
}
}
# Routine MQTT-Message Callback # Routine MQTT-Message Callback
sub onmessage($$$) { sub onmessage($$$) {
my ($hash,$topic,$message) = @_; my ($hash,$topic,$message) = @_;
@ -2397,21 +2473,31 @@ sub onmessage($$$) {
next unless defined $message; next unless defined $message;
my $updated = 0; my $updated = 0;
my @updatedList;
if(defined($redefMap)) { if(defined($redefMap)) {
foreach my $key (keys %{$redefMap}) { foreach my $key (keys %{$redefMap}) {
my $val = $redefMap->{$key}; my $val = $redefMap->{$key};
my $r = doSetUpdate($hash,$mode,$device,$key,$val); my $r = doSetUpdate($hash,$mode,$device,$key,$val);
$updated = 1 unless defined $r; unless (defined($r)) {
$updated = 1;
push(@updatedList, $device);
}
} }
} else { } else {
my $r = doSetUpdate($hash,$mode,$device,$reading,$message); my $r = doSetUpdate($hash,$mode,$device,$reading,$message);
$updated = 1 unless defined $r; unless (defined($r)) {
$updated = 1;
push(@updatedList, $device);
}
} }
# TODO: ggf. Update Last Received implementieren (nicht ganz einfach). # TODO: ggf. Update Last Received implementieren (nicht ganz einfach).
#if($updated) { #if($updated) {
#updateSubTime($device,$reading); #updateSubTime($device,$reading);
#} #}
return @updatedList if($updated);
return undef;
} }
} }