mirror of
https://github.com/fhem/fhem-mirror.git
synced 2025-03-09 20:57:11 +00:00
bugfix: disconnects with new mosquitto version (wrong use of DUP flag)
fix commandref git-svn-id: https://svn.fhem.de/fhem/trunk@24953 2b470e98-0d58-463d-a4d8-8e2adae1ed80
This commit is contained in:
parent
c147fd8b6f
commit
f4a82eea92
@ -25,6 +25,54 @@
|
||||
#
|
||||
##############################################
|
||||
|
||||
package MQTT;
|
||||
|
||||
use Exporter ('import');
|
||||
@EXPORT = ();
|
||||
@EXPORT_OK = qw(send_publish send_subscribe send_unsubscribe client_attr client_subscribe_topic client_unsubscribe_topic topic_to_regexp parseParams);
|
||||
%EXPORT_TAGS = (all => [@EXPORT_OK]);
|
||||
|
||||
use strict;
|
||||
use warnings;
|
||||
|
||||
use GPUtils qw(GP_Import GP_ForallClients);
|
||||
use Scalar::Util qw(looks_like_number);
|
||||
use Carp qw(carp);
|
||||
|
||||
use Net::MQTT::Constants;
|
||||
use Net::MQTT::Message;
|
||||
|
||||
our %qos = map {qos_string($_) => $_} (MQTT_QOS_AT_MOST_ONCE,MQTT_QOS_AT_LEAST_ONCE,MQTT_QOS_EXACTLY_ONCE);
|
||||
|
||||
BEGIN {GP_Import(qw(
|
||||
gettimeofday
|
||||
readingsSingleUpdate
|
||||
readingsBeginUpdate
|
||||
readingsBulkUpdate
|
||||
readingsEndUpdate
|
||||
DevIo_OpenDev
|
||||
DevIo_SimpleWrite
|
||||
DevIo_SimpleRead
|
||||
DevIo_CloseDev
|
||||
DevIo_IsOpen
|
||||
DevIo_getState
|
||||
IsDisabled
|
||||
RemoveInternalTimer
|
||||
InternalTimer
|
||||
AttrVal
|
||||
ReadingsVal
|
||||
Log3
|
||||
AssignIoPort
|
||||
getKeyValue
|
||||
setKeyValue
|
||||
CallFn
|
||||
defs
|
||||
modules
|
||||
init_done
|
||||
))};
|
||||
|
||||
sub ::MQTT_Initialize { goto &Initialize };
|
||||
|
||||
my %sets = (
|
||||
"connect" => "",
|
||||
"disconnect" => "",
|
||||
@ -42,66 +90,29 @@ my @clients = qw(
|
||||
|
||||
use DevIo;
|
||||
|
||||
sub MQTT_Initialize($) {
|
||||
sub Initialize {
|
||||
|
||||
my $hash = shift @_;
|
||||
my $hash = shift // return;
|
||||
|
||||
# Provider
|
||||
$hash->{Clients} = join (':',@clients);
|
||||
$hash->{ReadyFn} = "MQTT::Ready";
|
||||
$hash->{ReadFn} = "MQTT::Read";
|
||||
$hash->{Clients} = join q{:}, @clients;
|
||||
$hash->{ReadyFn} = \&Ready;
|
||||
$hash->{ReadFn} = \&Read;
|
||||
|
||||
# Consumer
|
||||
$hash->{DefFn} = "MQTT::Define";
|
||||
$hash->{UndefFn} = "MQTT::Undef";
|
||||
$hash->{DeleteFn} = "MQTT::Delete";
|
||||
$hash->{RenameFn} = "MQTT::Rename";
|
||||
$hash->{ShutdownFn} = "MQTT::Shutdown";
|
||||
$hash->{SetFn} = "MQTT::Set";
|
||||
$hash->{NotifyFn} = "MQTT::Notify";
|
||||
$hash->{AttrFn} = "MQTT::Attr";
|
||||
$hash->{DefFn} = \&Define;
|
||||
$hash->{UndefFn} = \&Undef;
|
||||
$hash->{DeleteFn} = \&Delete;
|
||||
$hash->{RenameFn} = \&Rename;
|
||||
$hash->{ShutdownFn} = \&Shutdown;
|
||||
$hash->{SetFn} = \&Set;
|
||||
$hash->{NotifyFn} = \&Notify;
|
||||
$hash->{AttrFn} = \&Attr;
|
||||
|
||||
$hash->{AttrList} = "keep-alive "."last-will client-id "."on-connect on-disconnect on-timeout privacy:1,0 ".$main::readingFnAttributes;
|
||||
$hash->{AttrList} = "keep-alive last-will client-id on-connect on-disconnect on-timeout privacy:1,0 ".$::readingFnAttributes;
|
||||
}
|
||||
|
||||
package MQTT;
|
||||
|
||||
use Exporter ('import');
|
||||
@EXPORT = ();
|
||||
@EXPORT_OK = qw(send_publish send_subscribe send_unsubscribe client_attr client_subscribe_topic client_unsubscribe_topic topic_to_regexp parseParams);
|
||||
%EXPORT_TAGS = (all => [@EXPORT_OK]);
|
||||
|
||||
use strict;
|
||||
use warnings;
|
||||
|
||||
use GPUtils qw(:all);
|
||||
|
||||
use Net::MQTT::Constants;
|
||||
use Net::MQTT::Message;
|
||||
|
||||
our %qos = map {qos_string($_) => $_} (MQTT_QOS_AT_MOST_ONCE,MQTT_QOS_AT_LEAST_ONCE,MQTT_QOS_EXACTLY_ONCE);
|
||||
|
||||
BEGIN {GP_Import(qw(
|
||||
gettimeofday
|
||||
readingsSingleUpdate
|
||||
DevIo_OpenDev
|
||||
DevIo_SimpleWrite
|
||||
DevIo_SimpleRead
|
||||
DevIo_CloseDev
|
||||
RemoveInternalTimer
|
||||
InternalTimer
|
||||
AttrVal
|
||||
ReadingsVal
|
||||
Log3
|
||||
AssignIoPort
|
||||
getKeyValue
|
||||
setKeyValue
|
||||
CallFn
|
||||
defs
|
||||
modules
|
||||
looks_like_number
|
||||
fhem
|
||||
))};
|
||||
|
||||
sub Define($$) {
|
||||
my ( $hash, $def ) = @_;
|
||||
@ -115,21 +126,15 @@ sub Define($$) {
|
||||
$hash->{DeviceName} = $host;
|
||||
|
||||
my $name = $hash->{NAME};
|
||||
my $user = getKeyValue($name."_user");
|
||||
my $pass = getKeyValue($name."_pass");
|
||||
|
||||
setKeyValue($name."_user",$username) unless(defined($user));
|
||||
setKeyValue($name."_pass",$password) unless(defined($pass));
|
||||
setKeyValue($name."_user",$username) if defined $username;
|
||||
setKeyValue($name."_pass",$password) if defined $password;
|
||||
|
||||
$hash->{DEF} = $host;
|
||||
|
||||
#readingsSingleUpdate($hash,"connection","disconnected",0);
|
||||
|
||||
if ($main::init_done) {
|
||||
return Start($hash);
|
||||
} else {
|
||||
return undef;
|
||||
}
|
||||
return Start($hash) if $init_done;
|
||||
return;
|
||||
}
|
||||
|
||||
sub Undef($) {
|
||||
@ -206,49 +211,22 @@ sub process_event($$) {
|
||||
|
||||
sub Set($@) {
|
||||
my ($hash, @a) = @_;
|
||||
return "Need at least one parameters" if(@a < 2);
|
||||
return 'Need at least one parameter!' if @a < 2;
|
||||
return "Unknown argument $a[1], choose one of " . join(" ", sort keys %sets)
|
||||
if(!defined($sets{$a[1]}));
|
||||
my $command = $a[1];
|
||||
my $value = $a[2];
|
||||
#my $value = $a[2];
|
||||
|
||||
COMMAND_HANDLER: {
|
||||
$command eq "connect" and do {
|
||||
Start($hash);
|
||||
last;
|
||||
};
|
||||
$command eq "disconnect" and do {
|
||||
Stop($hash);
|
||||
last;
|
||||
};
|
||||
$command eq "publish" and do {
|
||||
shift(@a);
|
||||
shift(@a);
|
||||
#if(scalar(@a)<2) {return "not enough parameters. usage: publish [qos [retain]] topic value";}
|
||||
#my $qos=0;
|
||||
#my $retain=0;
|
||||
#if(looks_like_number ($a[0])) {
|
||||
# $qos = int($a[0]);
|
||||
# $qos = 0 if $qos>1;
|
||||
# shift(@a);
|
||||
# if(looks_like_number ($a[0])) {
|
||||
# $retain = int($a[0]);
|
||||
# $retain = 0 if $retain>2;
|
||||
# shift(@a);
|
||||
# }
|
||||
#}
|
||||
#if(scalar(@a)<2) {return "missing parameters. usage: publish [qos [retain]] topic value";}
|
||||
#my $topic = shift(@a);
|
||||
#my $value = join (" ", @a);
|
||||
|
||||
my ($qos, $retain,$topic, $value) = parsePublishCmd(@a);
|
||||
return "missing parameters. usage: publish [qos:?] [retain:?] topic value1 [value2]..." if(!$topic);
|
||||
return "wrong parameter. topic may nob be '#' or '+'" if ($topic eq '#' or $topic eq '+');
|
||||
$qos = MQTT_QOS_AT_MOST_ONCE unless defined($qos);
|
||||
my $msgid = send_publish($hash, topic => $topic, message => $value, qos => $qos, retain => $retain);
|
||||
last;
|
||||
}
|
||||
};
|
||||
return Start($hash) if $command eq 'connect';
|
||||
return Stop($hash) if $command eq 'disconnect';
|
||||
|
||||
return "unknown command $command" if $command ne 'publish';
|
||||
my ($qos, $retain,$topic, $value) = parsePublishCmd(@a);
|
||||
return 'missing parameters. usage: publish [qos:?] [retain:?] topic value1 [value2]...' if !$topic;
|
||||
return "wrong parameter. topic may not be '#' or '+'" if ($topic eq '#' or $topic eq '+');
|
||||
$qos = MQTT_QOS_AT_MOST_ONCE if !defined $qos;
|
||||
send_publish($hash, topic => $topic, message => $value, qos => $qos, retain => $retain);
|
||||
return;
|
||||
}
|
||||
|
||||
sub parseParams($;$$$$) {
|
||||
@ -408,7 +386,7 @@ sub Attr($$$$) {
|
||||
} else {
|
||||
$hash->{timeout} = 60;
|
||||
}
|
||||
if ($main::init_done) {
|
||||
if ($init_done) {
|
||||
$hash->{ping_received}=1;
|
||||
Timer($hash);
|
||||
};
|
||||
@ -417,7 +395,7 @@ sub Attr($$$$) {
|
||||
$attribute eq "last-will" and do {
|
||||
if($hash->{STATE} ne "disconnected") {
|
||||
Stop($hash);
|
||||
InternalTimer(gettimeofday()+1, "MQTT::Start", $hash, 0);
|
||||
InternalTimer(gettimeofday()+1, \&Start, $hash, 0);
|
||||
}
|
||||
last;
|
||||
};
|
||||
@ -445,7 +423,7 @@ sub Start($) {
|
||||
}
|
||||
|
||||
DevIo_CloseDev($hash);
|
||||
return DevIo_OpenDev($hash, 0, "MQTT::Init");
|
||||
return DevIo_OpenDev($hash, 0, \&Init);
|
||||
}
|
||||
|
||||
sub Stop($) {
|
||||
@ -461,20 +439,24 @@ sub Stop($) {
|
||||
send_disconnect($hash);
|
||||
DevIo_CloseDev($hash);
|
||||
RemoveInternalTimer($hash);
|
||||
readingsSingleUpdate($hash,"connection","disconnected",1);
|
||||
readingsSingleUpdate($hash,"state","disconnected",1);
|
||||
readingsBeginUpdate($hash);
|
||||
readingsBulkUpdate($hash,'connection','disconnected');
|
||||
readingsBulkUpdate($hash,'state','disconnected');
|
||||
readingsEndUpdate($hash,1);
|
||||
}
|
||||
|
||||
sub Ready($) {
|
||||
my $hash = shift;
|
||||
return DevIo_OpenDev($hash, 1, "MQTT::Init") if($hash->{STATE} eq "disconnected");
|
||||
return if DevIo_IsOpen($hash) || IsDisabled($hash->{NAME});
|
||||
return DevIo_OpenDev($hash, 1, \&Init, sub(){}) if DevIo_getState($hash) eq 'disconnected';
|
||||
return;
|
||||
}
|
||||
|
||||
sub Rename() {
|
||||
my ($new,$old) = @_;
|
||||
setKeyValue($new."_user",getKeyValue($old."_user"));
|
||||
setKeyValue($new."_pass",getKeyValue($old."_pass"));
|
||||
|
||||
|
||||
setKeyValue($old."_user",undef);
|
||||
setKeyValue($old."_pass",undef);
|
||||
return undef;
|
||||
@ -741,7 +723,7 @@ sub client_subscribe_topic($$;$$) {
|
||||
$client->{subscribeQos}->{$topic}=$qos;
|
||||
my $expr = topic_to_regexp($topic);
|
||||
push @{$client->{subscribeExpr}},$expr unless grep {$_ eq $expr} @{$client->{subscribeExpr}};
|
||||
if ($main::init_done) {
|
||||
if ($init_done) {
|
||||
if (my $mqtt = $client->{IODev}) {;
|
||||
$qos = $client->{".qos"}->{"*"} unless defined $qos; # MQTT_QOS_AT_MOST_ONCE
|
||||
$retain = 0 unless defined $retain; # not supported yet
|
||||
@ -760,7 +742,7 @@ sub client_unsubscribe_topic($$) {
|
||||
delete $client->{subscribeQos}->{$topic};
|
||||
my $expr = topic_to_regexp($topic);
|
||||
$client->{subscribeExpr} = [grep { $_ ne $expr} @{$client->{subscribeExpr}}];
|
||||
if ($main::init_done) {
|
||||
if ($init_done) {
|
||||
if (my $mqtt = $client->{IODev}) {;
|
||||
my $msgid = send_unsubscribe($mqtt,
|
||||
topics => [$topic],
|
||||
@ -783,12 +765,9 @@ sub Client_Define($$) {
|
||||
$client->{subscribeExpr} = [];
|
||||
AssignIoPort($client);
|
||||
|
||||
if ($main::init_done) {
|
||||
return client_start($client);
|
||||
} else {
|
||||
return undef;
|
||||
}
|
||||
};
|
||||
return client_start($client) if $init_done;
|
||||
return;
|
||||
}
|
||||
|
||||
sub Client_Undefine($) {
|
||||
client_stop(shift);
|
||||
@ -895,7 +874,7 @@ sub client_attr($$$$$) {
|
||||
last;
|
||||
};
|
||||
$attribute eq "IODev" and do {
|
||||
if ($main::init_done) {
|
||||
if ($init_done) {
|
||||
if ($command eq "set") {
|
||||
client_stop($client);
|
||||
$main::attr{$name}{IODev} = $value;
|
||||
@ -971,48 +950,55 @@ sub client_stop($) {
|
||||
};
|
||||
|
||||
1;
|
||||
__END__
|
||||
|
||||
=pod
|
||||
=item [device]
|
||||
=item summary connects fhem to MQTT
|
||||
|
||||
=begin html
|
||||
|
||||
<a name="MQTT"></a>
|
||||
<a id="MQTT"></a>
|
||||
<h3>MQTT</h3>
|
||||
<ul>
|
||||
<p>connects fhem to <a href="http://mqtt.org">mqtt</a>.</p>
|
||||
<p>A single MQTT device can serve multiple <a href="#MQTT_DEVICE">MQTT_DEVICE</a> and <a href="#MQTT_BRIDGE">MQTT_BRIDGE</a> clients.<br/>
|
||||
<p>A single MQTT device can serve multiple <a href="#MQTT_DEVICE">MQTT_DEVICE</a>, <a href="#MQTT_GENERIC_BRIDGE">MQTT_GENERIC_BRIDGE</a> clients and (<b>outdated</b>) <a href="#MQTT_BRIDGE">MQTT_BRIDGE</a> clients.<br/>
|
||||
Each <a href="#MQTT_DEVICE">MQTT_DEVICE</a> acts as a bridge in between an fhem-device and mqtt.<br/>
|
||||
Note: this module is based on <a href="https://metacpan.org/pod/distribution/Net-MQTT/lib/Net/MQTT.pod">Net::MQTT</a> which needs to be installed from CPAN first.</p>
|
||||
<a name="MQTTdefine"></a>
|
||||
<a id="MQTT-define"></a>
|
||||
<p><b>Define</b></p>
|
||||
<ul>
|
||||
<p><code>define <name> MQTT <ip:port> [<username>] [<password>]</code></p>
|
||||
<p>Specifies the MQTT device.</p>
|
||||
</ul>
|
||||
<a name="MQTTset"></a>
|
||||
<a id="MQTT-set"></a>
|
||||
<p><b>Set</b></p>
|
||||
<ul>
|
||||
<a id="MQTT-set-connect"></a>
|
||||
<li>
|
||||
<p><code>set <name> connect</code><br/>
|
||||
(re-)connects the MQTT-device to the mqtt-broker</p>
|
||||
</li>
|
||||
<a id="MQTT-set-disconnect"></a>
|
||||
<li>
|
||||
<p><code>set <name> disconnect</code><br/>
|
||||
disconnects the MQTT-device from the mqtt-broker</p>
|
||||
</li>
|
||||
<a id="MQTT-set-publish"></a>
|
||||
<li>
|
||||
<p><code>set <name> publish [qos:?] [retain:?] <topic> <message></code><br/>
|
||||
sends message to the specified topic</p>
|
||||
</li>
|
||||
</ul>
|
||||
<a name="MQTTattr"></a>
|
||||
<a id="MQTT-attr"></a>
|
||||
<p><b>Attributes</b></p>
|
||||
<ul>
|
||||
<a id="MQTT-attr-keep-alive"></a>
|
||||
<li>
|
||||
<p>keep-alive<br/>
|
||||
sets the keep-alive time (in seconds).</p>
|
||||
</li>
|
||||
<a id="MQTT-attr-keep-last-will"></a>
|
||||
<li>
|
||||
<p><code>attr <name> last-will [qos:?] [retain:?] <topic> <message></code><br/>
|
||||
Support for MQTT feature "last will"
|
||||
@ -1021,6 +1007,7 @@ sub client_stop($) {
|
||||
<code>attr mqtt last-will /fhem/status crashed</code>
|
||||
</p>
|
||||
</li>
|
||||
<a id="MQTT-attr-keep-client-id"></a>
|
||||
<li>
|
||||
<p><code>attr <name> client-id client id</code><br/>
|
||||
redefines client id
|
||||
@ -1029,6 +1016,7 @@ sub client_stop($) {
|
||||
<code>attr mqtt client-id fhem1234567</code>
|
||||
</p>
|
||||
</li>
|
||||
<a id="MQTT-attr-keep-last-will data-pattern="on-.*connect""></a>
|
||||
<li>
|
||||
<p>on-connect, on-disconnect<br/>
|
||||
<code>attr <name> on-connect {Perl-expression} <topic> <message></code><br/>
|
||||
@ -1041,6 +1029,7 @@ sub client_stop($) {
|
||||
<code>attr mqtt on-connect {Log3("abc",1,"on-connect")} /fhem/status connected</code>
|
||||
</p>
|
||||
</li>
|
||||
<a id="MQTT-attr-on-timeout"></a>
|
||||
<li>
|
||||
<p>on-timeout<br/>
|
||||
<code>attr <name> on-timeout {Perl-expression}</code>
|
||||
|
Loading…
x
Reference in New Issue
Block a user