diff --git a/fhem/FHEM/00_MQTT.pm b/fhem/FHEM/00_MQTT.pm index d8295ce55..43032c9b8 100644 --- a/fhem/FHEM/00_MQTT.pm +++ b/fhem/FHEM/00_MQTT.pm @@ -163,7 +163,7 @@ sub MQTT_Read { MESSAGE_TYPE: { $message_type == MQTT_CONNACK and do { readingsSingleUpdate($hash,"connection","connected",1); - GP_ForallClients($hash,\&MQTT_DEVICE_Start); + GP_ForallClients($hash,\&MQTT_client_start); last; }; @@ -174,7 +174,11 @@ sub MQTT_Read { main::Log3($client->{NAME},5,"publish received for $topic, ".$mqtt->message()); if (grep { $_ eq $topic } @{$client->{subscribe}}) { readingsSingleUpdate($client,"transmission-state","publish received",1); - MQTT_DEVICE_onmessage($client,$topic,$mqtt->message()); + if ($client->{TYPE} eq "MQTT_DEVICE") { + MQTT_DEVICE_onmessage($client,$topic,$mqtt->message()); + } else { + MQTT_BRIDGE_onmessage($client,$topic,$mqtt->message()); + } }; },undef); last; @@ -300,6 +304,50 @@ sub MQTT_send_message($$$@) { return $msgid; }; +sub MQTT_client_define($$) { + my ( $client, $def ) = @_; + + $client->{NOTIFYDEV} = $client->{DEF} if $client->{DEF}; + $client->{qos} = MQTT_QOS_AT_MOST_ONCE; + $client->{subscribe} = []; + if ($main::init_done) { + return MQTT_client_start($client); + } else { + return undef; + } +} + +sub MQTT_client_undefine($) { + MQTT_client_stop(shift); +} + + +sub MQTT_client_start($) { + my $client = shift; + my $name = $client->{NAME}; + if (! (defined AttrVal($name,"stateFormat",undef))) { + $main::attr{$name}{stateFormat} = "transmission-state"; + } + if (@{$client->{subscribe}}) { + my $msgid = MQTT_send_subscribe($client->{IODev}, + topics => [map { [$_ => $client->{qos} || MQTT_QOS_AT_MOST_ONCE] } @{$client->{subscribe}}], + ); + $client->{message_ids}->{$msgid}++; + readingsSingleUpdate($client,"transmission-state","subscribe sent",1) + } +} + +sub MQTT_client_stop($) { + my $client = shift; + if (@{$client->{subscribe}}) { + my $msgid = MQTT_send_unsubscribe($client->{IODev}, + topics => [@{$client->{subscribe}}], + ); + $client->{message_ids}->{$msgid}++; + readingsSingleUpdate($client,"transmission-state","unsubscribe sent",1) + } +} + 1; =pod diff --git a/fhem/FHEM/10_MQTT_DEVICE.pm b/fhem/FHEM/10_MQTT_DEVICE.pm index 80c633521..67e8ad787 100644 --- a/fhem/FHEM/10_MQTT_DEVICE.pm +++ b/fhem/FHEM/10_MQTT_DEVICE.pm @@ -35,7 +35,6 @@ my %sets = ( my %gets = ( "version" => "", - "readings" => "" ); my %qos = map {qos_string($_) => $_} (MQTT_QOS_AT_MOST_ONCE,MQTT_QOS_AT_LEAST_ONCE,MQTT_QOS_EXACTLY_ONCE); @@ -45,80 +44,35 @@ sub MQTT_DEVICE_Initialize($) { my $hash = shift @_; # Consumer - $hash->{DefFn} = "MQTT_DEVICE_Define"; - $hash->{UndefFn} = "MQTT_DEVICE_Undef"; - #$hash->{SetFn} = "MQTT_DEVICE_Set"; - $hash->{GetFn} = "MQTT_DEVICE_Get"; - $hash->{NotifyFn} = "MQTT_DEVICE_Notify"; + $hash->{DefFn} = "MQTT_client_define"; + $hash->{UndefFn} = "MQTT_client_undefine"; + $hash->{SetFn} = "MQTT_DEVICE_Set"; $hash->{AttrFn} = "MQTT_DEVICE_Attr"; $hash->{AttrList} = "IODev ". "qos:".join(",",keys %qos)." ". - "publish-topic-base ". - "publishState ". - "publishReading_.* ". - "subscribeSet ". - "subscribeSet_.* ". + "publishSet ". + "publishSet_.* ". + "subscribeReading_.* ". $main::readingFnAttributes; } -sub MQTT_DEVICE_Define($$) { - my ( $hash, $def ) = @_; - - $hash->{NOTIFYDEV} = $hash->{DEF}; - $hash->{qos} = MQTT_QOS_AT_MOST_ONCE; - $hash->{subscribe} = []; - if ($main::init_done) { - return MQTT_DEVICE_Start($hash); +sub MQTT_DEVICE_Set($@) { + my ($hash, @a) = @_; + return "Need at least one parameters" if(@a < 2); + return "Unknown argument $a[1], choose one of " . join(" ", map {$sets{$_} eq "" ? $_ : "$_:$sets{$_}"} sort keys %sets) + if(!defined($sets{$a[1]})); + my $command = $a[1]; + my $value = $a[2]; + if (defined $value) { + MQTT_send_publish($hash->{IODev}, topic => $hash->{publishSets}->{$command}->{topic}, message => $value, qos => $hash->{qos}); + readingsSingleUpdate($hash,$command,$value,1); } else { - return undef; - } -} - -sub MQTT_DEVICE_Undef($) { - MQTT_DEVICE_Stop(shift); -} - -sub MQTT_DEVICE_Get($$@) { - my ($hash, $name, $command) = @_; - return "Need at least one parameters" unless (defined $command); - return "Unknown argument $command, choose one of " . join(" ", sort keys %gets) - unless (defined($gets{$command})); - - COMMAND_HANDLER: { - # populate dynamically from keys %{$defs{$sdev}{READINGS}} - $command eq "readings" and do { - my $base = AttrVal($name,"publish-topic-base","/$hash->{DEF}/"); - foreach my $reading (keys %{$main::defs{$hash->{DEF}}{READINGS}}) { - unless (defined main::AttrVal($name,"publishReading_$reading",undef)) { - main::CommandAttr($hash,"$name publishReading_$reading $base$reading"); - } - }; - last; - }; - }; -} - -sub MQTT_DEVICE_Notify() { - my ($hash,$dev) = @_; - - main::Log3($hash->{NAME},5,"Notify for $dev->{NAME}"); - foreach my $event (@{$dev->{CHANGED}}) { - $event =~ /^([^:]+)(: )?(.*)$/; - main::Log3($hash->{NAME},5,"$event, '".((defined $1) ? $1 : "-undef-")."', '".((defined $3) ? $3 : "-undef-")."'"); - if (defined $3 and $3 ne "") { - if (defined $hash->{publishReadings}->{$1}) { - MQTT_send_publish($hash->{IODev}, topic => $hash->{publishReadings}->{$1}, message => $3, qos => $hash->{qos}); - readingsSingleUpdate($hash,"transmission-state","publish sent",1); - } - } else { - if (defined $hash->{publishState}) { - MQTT_send_publish($hash->{IODev}, topic => $hash->{publishState}, message => $1, qos => $hash->{qos}); - readingsSingleUpdate($hash,"transmission-state","publish sent",1); - } - } + MQTT_send_publish($hash->{IODev}, topic => $hash->{publishSets}->{""}->{topic}, message => $command, qos => $hash->{qos}); + readingsSingleUpdate($hash,"state",$command,1); } + return undef; } sub MQTT_DEVICE_Attr($$$$) { @@ -126,9 +80,9 @@ sub MQTT_DEVICE_Attr($$$$) { my $hash = $main::defs{$name}; ATTRIBUTE_HANDLER: { - $attribute =~ /^subscribeSet(_?)(.*)/ and do { + $attribute =~ /^subscribeReading_(.+)/ and do { if ($command eq "set") { - $hash->{subscribeSets}->{$value} = $2; + $hash->{subscribeReadings}->{$value} = $1; push @{$hash->{subscribe}},$value unless grep {$_ eq $value} @{$hash->{subscribe}}; if ($main::init_done) { if (my $mqtt = $hash->{IODev}) {; @@ -140,32 +94,48 @@ sub MQTT_DEVICE_Attr($$$$) { } } } else { - if ($main::init_done) { - if (my $mqtt = $hash->{IODev}) {; - my $msgid = MQTT_send_unsubscribe($mqtt, - topics => [$value], - ); - $hash->{message_ids}->{$msgid}++; + foreach my $topic (keys %{$hash->{subscribeReadings}}) { + if ($hash->{subscribeReadings}->{$topic} eq $1) { + $hash->{subscribe} = [grep { $_ != $topic } @{$hash->{subscribe}}]; + delete $hash->{subscribeReadings}->{$topic}; + if ($main::init_done) { + if (my $mqtt = $hash->{IODev}) {; + my $msgid = MQTT_send_unsubscribe($mqtt, + topics => [$topic], + ); + $hash->{message_ids}->{$msgid}++; + } + } + last; } } - delete $hash->{subscribeSets}->{$value}; - $hash->{subscribe} = [grep { $_ != $value } @{$hash->{subscribe}}]; } last; }; - $attribute eq "publishState" and do { + $attribute =~ /^publishSet(_?)(.*)/ and do { if ($command eq "set") { - $hash->{publishState} = $value; + my @values = split ("[ \t]+",$value); + my $topic = pop @values; + $hash->{publishSets}->{$2} = { + 'values' => \@values, + topic => $topic, + }; + if ($2 eq "") { + foreach my $set (@values) { + $sets{$set}=""; + } + } else { + $sets{$2}=join(",",@values); + } } else { - delete $hash->{publishState}; - } - last; - }; - $attribute =~ /^publishReading_(.+)$/ and do { - if ($command eq "set") { - $hash->{publishReadings}->{$1} = $value; - } else { - delete $hash->{publishReadings}->{$1}; + if ($2 eq "") { + foreach my $set (@{$hash->{publishSets}->{$2}->{'values'}}) { + delete $sets{$set}; + } + } else { + delete $sets{$2}; + } + delete $hash->{publishSets}->{$2}; } last; }; @@ -186,45 +156,14 @@ sub MQTT_DEVICE_Attr($$$$) { } } -sub MQTT_DEVICE_Start($) { - my $hash = shift; - my $name = $hash->{NAME}; - if (! (defined AttrVal($name,"stateFormat",undef))) { - $main::attr{$name}{stateFormat} = "transmission-state"; - } - if (@{$hash->{subscribe}}) { - my $msgid = MQTT_send_subscribe($hash->{IODev}, - topics => [map { [$_ => $hash->{qos} || MQTT_QOS_AT_MOST_ONCE] } @{$hash->{subscribe}}], - ); - $hash->{message_ids}->{$msgid}++; - readingsSingleUpdate($hash,"transmission-state","subscribe sent",1) - } -} - -sub MQTT_DEVICE_Stop($) { - my $hash = shift; - if (@{$hash->{subscribe}}) { - my $msgid = MQTT_send_unsubscribe($hash->{IODev}, - topics => [@{$hash->{subscribe}}], - ); - $hash->{message_ids}->{$msgid}++; - readingsSingleUpdate($hash,"transmission-state","unsubscribe sent",1) - } -} - sub MQTT_DEVICE_onmessage($$$) { my ($hash,$topic,$message) = @_; - if (defined (my $command = $hash->{subscribeSets}->{$topic})) { - my @args = split ("[ \t]+",$message); - if ($command eq "") { - main::Log3($hash->{NAME},5,"calling DoSet($hash->{DEF}".(@args ? ",".join(",",@args) : "")); - main::DoSet($hash->{DEF},@args); - } else { - main::Log3($hash->{NAME},5,"calling DoSet($hash->{DEF},$command".(@args ? ",".join(",",@args) : "")); - main::DoSet($hash->{DEF},$command,@args); - } + if (defined (my $reading = $hash->{subscribeReadings}->{$topic})) { + main::Log3($hash->{NAME},5,"calling readingsSingleUpdate($hash->{NAME},$reading,$message,1"); + main::readingsSingleUpdate($hash,$reading,$message,1); } } + 1; =pod @@ -233,7 +172,7 @@ sub MQTT_DEVICE_onmessage($$$) {

MQTT