From de665f19952593eebff49433a27791087fc2a646 Mon Sep 17 00:00:00 2001 From: ntruchsess <> Date: Wed, 8 Oct 2014 15:31:39 +0000 Subject: [PATCH] MQTT: implement qos git-svn-id: https://svn.fhem.de/fhem/trunk@6709 2b470e98-0d58-463d-a4d8-8e2adae1ed80 --- fhem/FHEM/00_MQTT.pm | 93 ++++++++++++++++++++++++++++++------- fhem/FHEM/10_MQTT_BRIDGE.pm | 10 ++-- fhem/FHEM/10_MQTT_DEVICE.pm | 7 ++- 3 files changed, 87 insertions(+), 23 deletions(-) diff --git a/fhem/FHEM/00_MQTT.pm b/fhem/FHEM/00_MQTT.pm index 86789cd84..cb7ef7904 100644 --- a/fhem/FHEM/00_MQTT.pm +++ b/fhem/FHEM/00_MQTT.pm @@ -93,6 +93,8 @@ sub Define($$) { $hash->{NOTIFYDEV} = "global"; $hash->{msgid} = 1; + $hash->{timeout} = 60; + $hash->{messages} = {}; if ($main::init_done) { return Start($hash); @@ -133,6 +135,26 @@ sub Notify($$) { } } +sub Attr($$$$) { + my ($command,$name,$attribute,$value) = @_; + + my $hash = $main::defs{$name}; + ATTRIBUTE_HANDLER: { + $attribute eq "keep-alive" and do { + if ($command eq "set") { + $hash->{timeout} = $value; + } else { + $hash->{timeout} = 60; + } + if ($main::init_done) { + $hash->{ping_received}=1; + Timer($hash); + }; + last; + }; + }; +} + sub Start($) { my $hash = shift; my ($dev) = split("[ \t]+", $hash->{DEF}); @@ -168,7 +190,7 @@ sub Timer($) { RemoveInternalTimer($hash); readingsSingleUpdate($hash,"connection","timed-out",1) unless $hash->{ping_received}; $hash->{ping_received} = 0; - InternalTimer(gettimeofday()+AttrVal($hash-> {NAME},"keep-alive",60), "MQTT::Timer", $hash, 0); + InternalTimer(gettimeofday()+$hash->{timeout}, "MQTT::Timer", $hash, 0); send_ping($hash); } @@ -179,6 +201,7 @@ sub Read { return undef unless $buf; $hash->{buf} .= $buf; while (my $mqtt = Net::MQTT::Message->new_from_bytes($hash->{buf},1)) { + my $message_type = $mqtt->message_type(); Log3($name,5,"MQTT $name message received: ".$mqtt->string()); @@ -187,6 +210,11 @@ sub Read { $message_type == MQTT_CONNACK and do { readingsSingleUpdate($hash,"connection","connected",1); GP_ForallClients($hash,\&client_start); + foreach my $message_id (keys %{$hash->{messages}}) { + my $msg = $hash->{messages}->{$message_id}->{message}; + $msg->{dup} = 1; + DevIo_SimpleWrite($hash,$msg->bytes,undef); + } last; }; @@ -196,7 +224,7 @@ sub Read { my $client = shift; Log3($client->{NAME},5,"publish received for $topic, ".$mqtt->message()); if (grep { $topic =~ $_ } @{$client->{subscribeExpr}}) { - readingsSingleUpdate($client,"transmission-state","publish received",1); + readingsSingleUpdate($client,"transmission-state","incoming publish received",1); if ($client->{TYPE} eq "MQTT_DEVICE") { MQTT::DEVICE::onmessage($client,$topic,$mqtt->message()); } else { @@ -204,6 +232,14 @@ sub Read { } }; },undef); + if (my $qos = $mqtt->qos() > MQTT_QOS_AT_MOST_ONCE) { + my $message_id = $mqtt->message_id(); + if ($qos == MQTT_QOS_AT_LEAST_ONCE) { + send_message($hash, message_type => MQTT_PUBACK, message_id => $message_id); + } else { + send_message($hash, message_type => MQTT_PUBREC, message_id => $message_id); + } + } last; }; @@ -212,10 +248,11 @@ sub Read { GP_ForallClients($hash,sub { my $client = shift; if ($client->{message_ids}->{$message_id}) { - readingsSingleUpdate($client,"transmission-state","pubacknowledge received",1); + readingsSingleUpdate($client,"transmission-state","outgoing publish acknowledged",1); delete $client->{message_ids}->{$message_id}; }; },undef); + delete $hash->{messages}->{$message_id}; #QoS Level 1: at_least_once handling last; }; @@ -224,10 +261,10 @@ sub Read { GP_ForallClients($hash,sub { my $client = shift; if ($client->{message_ids}->{$message_id}) { - readingsSingleUpdate($client,"transmission-state","pubreceive received",1); - delete $client->{message_ids}->{$message_id}; + readingsSingleUpdate($client,"transmission-state","outgoing publish received",1); }; },undef); + send_message($hash, message_type => MQTT_PUBREL, message_id => $message_id); #QoS Level 2: exactly_once handling last; }; @@ -236,10 +273,12 @@ sub Read { GP_ForallClients($hash,sub { my $client = shift; if ($client->{message_ids}->{$message_id}) { - readingsSingleUpdate($client,"transmission-state","pubrelease received",1); + readingsSingleUpdate($client,"transmission-state","incoming publish released",1); delete $client->{message_ids}->{$message_id}; }; },undef); + send_message($hash, message_type => MQTT_PUBCOMP, message_id => $message_id); #QoS Level 2: exactly_once handling + delete $hash->{messages}->{$message_id}; last; }; @@ -248,10 +287,11 @@ sub Read { GP_ForallClients($hash,sub { my $client = shift; if ($client->{message_ids}->{$message_id}) { - readingsSingleUpdate($client,"transmission-state","pubcomplete received",1); + readingsSingleUpdate($client,"transmission-state","outgoing publish completed",1); delete $client->{message_ids}->{$message_id}; }; },undef); + delete $hash->{messages}->{$message_id}; #QoS Level 2: exactly_once handling last; }; @@ -264,6 +304,7 @@ sub Read { delete $client->{message_ids}->{$message_id}; }; },undef); + delete $hash->{messages}->{$message_id}; #QoS Level 1: at_least_once handling last; }; @@ -276,6 +317,7 @@ sub Read { delete $client->{message_ids}->{$message_id}; }; },undef); + delete $hash->{messages}->{$message_id}; #QoS Level 1: at_least_once handling last; }; @@ -293,20 +335,33 @@ sub Read { sub send_connect($) { my $hash = shift; - return send_message($hash, message_type => MQTT_CONNECT, keep_alive_timer => AttrVal($hash->{NAME},"keep-alive",60)); + return send_message($hash, message_type => MQTT_CONNECT, keep_alive_timer => $hash->{timeout}); }; sub send_publish($@) { - return send_message(shift, message_type => MQTT_PUBLISH, @_); + my ($hash,%msg) = @_; + if ($msg{qos} == MQTT_QOS_AT_MOST_ONCE) { + send_message(shift, message_type => MQTT_PUBLISH, %msg); + return undef; + } else { + my $msgid = $hash->{msgid}++; + send_message(shift, message_type => MQTT_PUBLISH, message_id => $msgid, %msg); + return $msgid; + } }; sub send_subscribe($@) { my $hash = shift; - return send_message($hash, message_type => MQTT_SUBSCRIBE, @_); + my $msgid = $hash->{msgid}++; + send_message($hash, message_type => MQTT_SUBSCRIBE, message_id => $msgid, qos => MQTT_QOS_AT_LEAST_ONCE, @_); + return $msgid; }; sub send_unsubscribe($@) { - return send_message(shift, message_type => MQTT_UNSUBSCRIBE, @_); + my $hash = shift; + my $msgid = $hash->{msgid}++; + send_message($hash, message_type => MQTT_UNSUBSCRIBE, message_id => $msgid, qos => MQTT_QOS_AT_LEAST_ONCE, @_); + return $msgid; }; sub send_ping($) { @@ -318,13 +373,17 @@ sub send_disconnect($) { }; sub send_message($$$@) { - my $hash = shift; + my ($hash,%msg) = @_; my $name = $hash->{NAME}; - my $msgid = $hash->{msgid}++; - my $msg = Net::MQTT::Message->new(message_id => $msgid,@_); - Log3($name,5,"MQTT $name message sent: ".$msg->string()); - DevIo_SimpleWrite($hash,$msg->bytes,undef); - return $msgid; + my $message = Net::MQTT::Message->new(%msg); + Log3($name,5,"MQTT $name message sent: ".$message->string()); + if (defined $msg{message_id}) { + $hash->{messages}->{$msg{message_id}} = { + message => $message, + timeout => gettimeofday()+$hash->{timeout}, + }; + } + DevIo_SimpleWrite($hash,$message->bytes,undef); }; sub topic_to_regexp($) { diff --git a/fhem/FHEM/10_MQTT_BRIDGE.pm b/fhem/FHEM/10_MQTT_BRIDGE.pm index 0de2c9eda..8a89e2ea9 100644 --- a/fhem/FHEM/10_MQTT_BRIDGE.pm +++ b/fhem/FHEM/10_MQTT_BRIDGE.pm @@ -105,17 +105,19 @@ sub Notify() { foreach my $event (@{$dev->{CHANGED}}) { $event =~ /^([^:]+)(: )?(.*)$/; Log3($hash->{NAME},5,"$event, '".((defined $1) ? $1 : "-undef-")."', '".((defined $3) ? $3 : "-undef-")."'"); + my $msgid; if (defined $3 and $3 ne "") { if (defined $hash->{publishReadings}->{$1}) { - send_publish($hash->{IODev}, topic => $hash->{publishReadings}->{$1}, message => $3, qos => $hash->{qos}); - readingsSingleUpdate($hash,"transmission-state","publish sent",1); + $msgid = send_publish($hash->{IODev}, topic => $hash->{publishReadings}->{$1}, message => $3, qos => $hash->{qos}); + readingsSingleUpdate($hash,"transmission-state","outgoing publish sent",1); } } else { if (defined $hash->{publishState}) { - send_publish($hash->{IODev}, topic => $hash->{publishState}, message => $1, qos => $hash->{qos}); - readingsSingleUpdate($hash,"transmission-state","publish sent",1); + $msgid = send_publish($hash->{IODev}, topic => $hash->{publishState}, message => $1, qos => $hash->{qos}); + readingsSingleUpdate($hash,"transmission-state","outgoing publish sent",1); } } + $hash->{message_ids}->{$msgid}++ if defined $msgid; } } diff --git a/fhem/FHEM/10_MQTT_DEVICE.pm b/fhem/FHEM/10_MQTT_DEVICE.pm index 5171483ee..b379c3c16 100644 --- a/fhem/FHEM/10_MQTT_DEVICE.pm +++ b/fhem/FHEM/10_MQTT_DEVICE.pm @@ -81,13 +81,16 @@ sub Set($@) { if(!defined($sets{$a[1]})); my $command = $a[1]; my $value = $a[2]; + my $msgid; if (defined $value) { - send_publish($hash->{IODev}, topic => $hash->{publishSets}->{$command}->{topic}, message => $value, qos => $hash->{qos}); + $msgid = send_publish($hash->{IODev}, topic => $hash->{publishSets}->{$command}->{topic}, message => $value, qos => $hash->{qos}); readingsSingleUpdate($hash,$command,$value,1); } else { - send_publish($hash->{IODev}, topic => $hash->{publishSets}->{""}->{topic}, message => $command, qos => $hash->{qos}); + $msgid = send_publish($hash->{IODev}, topic => $hash->{publishSets}->{""}->{topic}, message => $command, qos => $hash->{qos}); readingsSingleUpdate($hash,"state",$command,1); } + $hash->{message_ids}->{$msgid}++ if defined $msgid; + readingsSingleUpdate($hash,"transmission-state","outgoing publish sent",1); return undef; }