mirror of
https://github.com/fhem/fhem-mirror.git
synced 2025-03-10 09:16:53 +00:00
MQTT: implement qos
git-svn-id: https://svn.fhem.de/fhem/trunk@6709 2b470e98-0d58-463d-a4d8-8e2adae1ed80
This commit is contained in:
parent
4ccf88c21e
commit
de665f1995
@ -93,6 +93,8 @@ sub Define($$) {
|
||||
|
||||
$hash->{NOTIFYDEV} = "global";
|
||||
$hash->{msgid} = 1;
|
||||
$hash->{timeout} = 60;
|
||||
$hash->{messages} = {};
|
||||
|
||||
if ($main::init_done) {
|
||||
return Start($hash);
|
||||
@ -133,6 +135,26 @@ sub Notify($$) {
|
||||
}
|
||||
}
|
||||
|
||||
sub Attr($$$$) {
|
||||
my ($command,$name,$attribute,$value) = @_;
|
||||
|
||||
my $hash = $main::defs{$name};
|
||||
ATTRIBUTE_HANDLER: {
|
||||
$attribute eq "keep-alive" and do {
|
||||
if ($command eq "set") {
|
||||
$hash->{timeout} = $value;
|
||||
} else {
|
||||
$hash->{timeout} = 60;
|
||||
}
|
||||
if ($main::init_done) {
|
||||
$hash->{ping_received}=1;
|
||||
Timer($hash);
|
||||
};
|
||||
last;
|
||||
};
|
||||
};
|
||||
}
|
||||
|
||||
sub Start($) {
|
||||
my $hash = shift;
|
||||
my ($dev) = split("[ \t]+", $hash->{DEF});
|
||||
@ -168,7 +190,7 @@ sub Timer($) {
|
||||
RemoveInternalTimer($hash);
|
||||
readingsSingleUpdate($hash,"connection","timed-out",1) unless $hash->{ping_received};
|
||||
$hash->{ping_received} = 0;
|
||||
InternalTimer(gettimeofday()+AttrVal($hash-> {NAME},"keep-alive",60), "MQTT::Timer", $hash, 0);
|
||||
InternalTimer(gettimeofday()+$hash->{timeout}, "MQTT::Timer", $hash, 0);
|
||||
send_ping($hash);
|
||||
}
|
||||
|
||||
@ -179,6 +201,7 @@ sub Read {
|
||||
return undef unless $buf;
|
||||
$hash->{buf} .= $buf;
|
||||
while (my $mqtt = Net::MQTT::Message->new_from_bytes($hash->{buf},1)) {
|
||||
|
||||
my $message_type = $mqtt->message_type();
|
||||
|
||||
Log3($name,5,"MQTT $name message received: ".$mqtt->string());
|
||||
@ -187,6 +210,11 @@ sub Read {
|
||||
$message_type == MQTT_CONNACK and do {
|
||||
readingsSingleUpdate($hash,"connection","connected",1);
|
||||
GP_ForallClients($hash,\&client_start);
|
||||
foreach my $message_id (keys %{$hash->{messages}}) {
|
||||
my $msg = $hash->{messages}->{$message_id}->{message};
|
||||
$msg->{dup} = 1;
|
||||
DevIo_SimpleWrite($hash,$msg->bytes,undef);
|
||||
}
|
||||
last;
|
||||
};
|
||||
|
||||
@ -196,7 +224,7 @@ sub Read {
|
||||
my $client = shift;
|
||||
Log3($client->{NAME},5,"publish received for $topic, ".$mqtt->message());
|
||||
if (grep { $topic =~ $_ } @{$client->{subscribeExpr}}) {
|
||||
readingsSingleUpdate($client,"transmission-state","publish received",1);
|
||||
readingsSingleUpdate($client,"transmission-state","incoming publish received",1);
|
||||
if ($client->{TYPE} eq "MQTT_DEVICE") {
|
||||
MQTT::DEVICE::onmessage($client,$topic,$mqtt->message());
|
||||
} else {
|
||||
@ -204,6 +232,14 @@ sub Read {
|
||||
}
|
||||
};
|
||||
},undef);
|
||||
if (my $qos = $mqtt->qos() > MQTT_QOS_AT_MOST_ONCE) {
|
||||
my $message_id = $mqtt->message_id();
|
||||
if ($qos == MQTT_QOS_AT_LEAST_ONCE) {
|
||||
send_message($hash, message_type => MQTT_PUBACK, message_id => $message_id);
|
||||
} else {
|
||||
send_message($hash, message_type => MQTT_PUBREC, message_id => $message_id);
|
||||
}
|
||||
}
|
||||
last;
|
||||
};
|
||||
|
||||
@ -212,10 +248,11 @@ sub Read {
|
||||
GP_ForallClients($hash,sub {
|
||||
my $client = shift;
|
||||
if ($client->{message_ids}->{$message_id}) {
|
||||
readingsSingleUpdate($client,"transmission-state","pubacknowledge received",1);
|
||||
readingsSingleUpdate($client,"transmission-state","outgoing publish acknowledged",1);
|
||||
delete $client->{message_ids}->{$message_id};
|
||||
};
|
||||
},undef);
|
||||
delete $hash->{messages}->{$message_id}; #QoS Level 1: at_least_once handling
|
||||
last;
|
||||
};
|
||||
|
||||
@ -224,10 +261,10 @@ sub Read {
|
||||
GP_ForallClients($hash,sub {
|
||||
my $client = shift;
|
||||
if ($client->{message_ids}->{$message_id}) {
|
||||
readingsSingleUpdate($client,"transmission-state","pubreceive received",1);
|
||||
delete $client->{message_ids}->{$message_id};
|
||||
readingsSingleUpdate($client,"transmission-state","outgoing publish received",1);
|
||||
};
|
||||
},undef);
|
||||
send_message($hash, message_type => MQTT_PUBREL, message_id => $message_id); #QoS Level 2: exactly_once handling
|
||||
last;
|
||||
};
|
||||
|
||||
@ -236,10 +273,12 @@ sub Read {
|
||||
GP_ForallClients($hash,sub {
|
||||
my $client = shift;
|
||||
if ($client->{message_ids}->{$message_id}) {
|
||||
readingsSingleUpdate($client,"transmission-state","pubrelease received",1);
|
||||
readingsSingleUpdate($client,"transmission-state","incoming publish released",1);
|
||||
delete $client->{message_ids}->{$message_id};
|
||||
};
|
||||
},undef);
|
||||
send_message($hash, message_type => MQTT_PUBCOMP, message_id => $message_id); #QoS Level 2: exactly_once handling
|
||||
delete $hash->{messages}->{$message_id};
|
||||
last;
|
||||
};
|
||||
|
||||
@ -248,10 +287,11 @@ sub Read {
|
||||
GP_ForallClients($hash,sub {
|
||||
my $client = shift;
|
||||
if ($client->{message_ids}->{$message_id}) {
|
||||
readingsSingleUpdate($client,"transmission-state","pubcomplete received",1);
|
||||
readingsSingleUpdate($client,"transmission-state","outgoing publish completed",1);
|
||||
delete $client->{message_ids}->{$message_id};
|
||||
};
|
||||
},undef);
|
||||
delete $hash->{messages}->{$message_id}; #QoS Level 2: exactly_once handling
|
||||
last;
|
||||
};
|
||||
|
||||
@ -264,6 +304,7 @@ sub Read {
|
||||
delete $client->{message_ids}->{$message_id};
|
||||
};
|
||||
},undef);
|
||||
delete $hash->{messages}->{$message_id}; #QoS Level 1: at_least_once handling
|
||||
last;
|
||||
};
|
||||
|
||||
@ -276,6 +317,7 @@ sub Read {
|
||||
delete $client->{message_ids}->{$message_id};
|
||||
};
|
||||
},undef);
|
||||
delete $hash->{messages}->{$message_id}; #QoS Level 1: at_least_once handling
|
||||
last;
|
||||
};
|
||||
|
||||
@ -293,20 +335,33 @@ sub Read {
|
||||
|
||||
sub send_connect($) {
|
||||
my $hash = shift;
|
||||
return send_message($hash, message_type => MQTT_CONNECT, keep_alive_timer => AttrVal($hash->{NAME},"keep-alive",60));
|
||||
return send_message($hash, message_type => MQTT_CONNECT, keep_alive_timer => $hash->{timeout});
|
||||
};
|
||||
|
||||
sub send_publish($@) {
|
||||
return send_message(shift, message_type => MQTT_PUBLISH, @_);
|
||||
my ($hash,%msg) = @_;
|
||||
if ($msg{qos} == MQTT_QOS_AT_MOST_ONCE) {
|
||||
send_message(shift, message_type => MQTT_PUBLISH, %msg);
|
||||
return undef;
|
||||
} else {
|
||||
my $msgid = $hash->{msgid}++;
|
||||
send_message(shift, message_type => MQTT_PUBLISH, message_id => $msgid, %msg);
|
||||
return $msgid;
|
||||
}
|
||||
};
|
||||
|
||||
sub send_subscribe($@) {
|
||||
my $hash = shift;
|
||||
return send_message($hash, message_type => MQTT_SUBSCRIBE, @_);
|
||||
my $msgid = $hash->{msgid}++;
|
||||
send_message($hash, message_type => MQTT_SUBSCRIBE, message_id => $msgid, qos => MQTT_QOS_AT_LEAST_ONCE, @_);
|
||||
return $msgid;
|
||||
};
|
||||
|
||||
sub send_unsubscribe($@) {
|
||||
return send_message(shift, message_type => MQTT_UNSUBSCRIBE, @_);
|
||||
my $hash = shift;
|
||||
my $msgid = $hash->{msgid}++;
|
||||
send_message($hash, message_type => MQTT_UNSUBSCRIBE, message_id => $msgid, qos => MQTT_QOS_AT_LEAST_ONCE, @_);
|
||||
return $msgid;
|
||||
};
|
||||
|
||||
sub send_ping($) {
|
||||
@ -318,13 +373,17 @@ sub send_disconnect($) {
|
||||
};
|
||||
|
||||
sub send_message($$$@) {
|
||||
my $hash = shift;
|
||||
my ($hash,%msg) = @_;
|
||||
my $name = $hash->{NAME};
|
||||
my $msgid = $hash->{msgid}++;
|
||||
my $msg = Net::MQTT::Message->new(message_id => $msgid,@_);
|
||||
Log3($name,5,"MQTT $name message sent: ".$msg->string());
|
||||
DevIo_SimpleWrite($hash,$msg->bytes,undef);
|
||||
return $msgid;
|
||||
my $message = Net::MQTT::Message->new(%msg);
|
||||
Log3($name,5,"MQTT $name message sent: ".$message->string());
|
||||
if (defined $msg{message_id}) {
|
||||
$hash->{messages}->{$msg{message_id}} = {
|
||||
message => $message,
|
||||
timeout => gettimeofday()+$hash->{timeout},
|
||||
};
|
||||
}
|
||||
DevIo_SimpleWrite($hash,$message->bytes,undef);
|
||||
};
|
||||
|
||||
sub topic_to_regexp($) {
|
||||
|
@ -105,17 +105,19 @@ sub Notify() {
|
||||
foreach my $event (@{$dev->{CHANGED}}) {
|
||||
$event =~ /^([^:]+)(: )?(.*)$/;
|
||||
Log3($hash->{NAME},5,"$event, '".((defined $1) ? $1 : "-undef-")."', '".((defined $3) ? $3 : "-undef-")."'");
|
||||
my $msgid;
|
||||
if (defined $3 and $3 ne "") {
|
||||
if (defined $hash->{publishReadings}->{$1}) {
|
||||
send_publish($hash->{IODev}, topic => $hash->{publishReadings}->{$1}, message => $3, qos => $hash->{qos});
|
||||
readingsSingleUpdate($hash,"transmission-state","publish sent",1);
|
||||
$msgid = send_publish($hash->{IODev}, topic => $hash->{publishReadings}->{$1}, message => $3, qos => $hash->{qos});
|
||||
readingsSingleUpdate($hash,"transmission-state","outgoing publish sent",1);
|
||||
}
|
||||
} else {
|
||||
if (defined $hash->{publishState}) {
|
||||
send_publish($hash->{IODev}, topic => $hash->{publishState}, message => $1, qos => $hash->{qos});
|
||||
readingsSingleUpdate($hash,"transmission-state","publish sent",1);
|
||||
$msgid = send_publish($hash->{IODev}, topic => $hash->{publishState}, message => $1, qos => $hash->{qos});
|
||||
readingsSingleUpdate($hash,"transmission-state","outgoing publish sent",1);
|
||||
}
|
||||
}
|
||||
$hash->{message_ids}->{$msgid}++ if defined $msgid;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -81,13 +81,16 @@ sub Set($@) {
|
||||
if(!defined($sets{$a[1]}));
|
||||
my $command = $a[1];
|
||||
my $value = $a[2];
|
||||
my $msgid;
|
||||
if (defined $value) {
|
||||
send_publish($hash->{IODev}, topic => $hash->{publishSets}->{$command}->{topic}, message => $value, qos => $hash->{qos});
|
||||
$msgid = send_publish($hash->{IODev}, topic => $hash->{publishSets}->{$command}->{topic}, message => $value, qos => $hash->{qos});
|
||||
readingsSingleUpdate($hash,$command,$value,1);
|
||||
} else {
|
||||
send_publish($hash->{IODev}, topic => $hash->{publishSets}->{""}->{topic}, message => $command, qos => $hash->{qos});
|
||||
$msgid = send_publish($hash->{IODev}, topic => $hash->{publishSets}->{""}->{topic}, message => $command, qos => $hash->{qos});
|
||||
readingsSingleUpdate($hash,"state",$command,1);
|
||||
}
|
||||
$hash->{message_ids}->{$msgid}++ if defined $msgid;
|
||||
readingsSingleUpdate($hash,"transmission-state","outgoing publish sent",1);
|
||||
return undef;
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user