mirror of
https://github.com/fhem/fhem-mirror.git
synced 2025-04-17 05:16:02 +00:00
00_MQTT2_CLIENT.pm: "MQTT_GENERiC_BRIDGE" changes (Forum #93255)
git-svn-id: https://svn.fhem.de/fhem/trunk@17757 2b470e98-0d58-463d-a4d8-8e2adae1ed80
This commit is contained in:
parent
f12c8dd297
commit
56d6122a6f
@ -9,6 +9,7 @@ use DevIo;
|
||||
sub MQTT2_CLIENT_Read($@);
|
||||
sub MQTT2_CLIENT_Write($$$);
|
||||
sub MQTT2_CLIENT_Undef($@);
|
||||
sub MQTT2_CLIENT_doPublish($@);
|
||||
|
||||
my $keepalive = 30;
|
||||
|
||||
@ -25,14 +26,15 @@ MQTT2_CLIENT_Initialize($)
|
||||
"1:MQTT2_DEVICE" => "^.*",
|
||||
"2:MQTT_GENERIC_BRIDGE" => "^.*"
|
||||
};
|
||||
$hash->{ReadFn} = "MQTT2_CLIENT_Read";
|
||||
$hash->{DefFn} = "MQTT2_CLIENT_Define";
|
||||
$hash->{AttrFn} = "MQTT2_CLIENT_Attr";
|
||||
$hash->{SetFn} = "MQTT2_CLIENT_Set";
|
||||
$hash->{UndefFn} = "MQTT2_CLIENT_Undef";
|
||||
$hash->{DeleteFn}= "MQTT2_CLIENT_Delete";
|
||||
$hash->{WriteFn} = "MQTT2_CLIENT_Write";
|
||||
$hash->{ReadyFn} = "MQTT2_CLIENT_connect";
|
||||
$hash->{ReadFn} = "MQTT2_CLIENT_Read";
|
||||
$hash->{DefFn} = "MQTT2_CLIENT_Define";
|
||||
$hash->{AttrFn} = "MQTT2_CLIENT_Attr";
|
||||
$hash->{SetFn} = "MQTT2_CLIENT_Set";
|
||||
$hash->{UndefFn} = "MQTT2_CLIENT_Undef";
|
||||
$hash->{ShutdownFn} = "MQTT2_CLIENT_Undef";
|
||||
$hash->{DeleteFn} = "MQTT2_CLIENT_Delete";
|
||||
$hash->{WriteFn} = "MQTT2_CLIENT_Write";
|
||||
$hash->{ReadyFn} = "MQTT2_CLIENT_connect";
|
||||
|
||||
no warnings 'qw';
|
||||
my @attrList = qw(
|
||||
@ -42,8 +44,9 @@ MQTT2_CLIENT_Initialize($)
|
||||
disabledForIntervals
|
||||
lwt
|
||||
lwtRetain
|
||||
msgAfterConnect
|
||||
msgBeforeDisconnect
|
||||
mqttVersion:3.1.1,3.1
|
||||
onConnect
|
||||
rawEvents
|
||||
subscriptions
|
||||
SSL
|
||||
@ -124,10 +127,14 @@ MQTT2_CLIENT_doinit($)
|
||||
|
||||
############################## SUBSCRIBE
|
||||
} elsif($hash->{connecting} == 2) {
|
||||
my $s = AttrVal($name, "subscriptions", "#");
|
||||
if($s eq "setByTheProgram") {
|
||||
$s = ($hash->{".subscribe"} ? $hash->{".subscribe"} : "#");
|
||||
}
|
||||
my $msg =
|
||||
pack("n", $hash->{FD}). # packed Identifier
|
||||
join("", map { pack("n", length($_)).$_.pack("C",0) } # QOS:0
|
||||
split(" ", AttrVal($name, "subscriptions", "#")));
|
||||
split(" ", $s));
|
||||
addToWritebuffer($hash,
|
||||
pack("C",0x80).
|
||||
MQTT2_CLIENT_calcRemainingLength(length($msg)).$msg);
|
||||
@ -144,10 +151,7 @@ MQTT2_CLIENT_keepalive($)
|
||||
my $name = $hash->{NAME};
|
||||
return if(ReadingsVal($name, "state", "") ne "opened");
|
||||
Log3 $name, 5, "$name: keepalive $keepalive";
|
||||
my $msg = join("", map { pack("n", length($_)).$_.pack("C",0) } # QOS:0
|
||||
split(" ", AttrVal($name, "subscriptions", "#")));
|
||||
addToWritebuffer($hash,
|
||||
pack("C",0xC0).pack("C",0));
|
||||
addToWritebuffer($hash, pack("C",0xC0).pack("C",0)); # PINGREQ
|
||||
InternalTimer(gettimeofday()+$keepalive, "MQTT2_CLIENT_keepalive", $hash, 0);
|
||||
}
|
||||
|
||||
@ -155,10 +159,27 @@ sub
|
||||
MQTT2_CLIENT_Undef($@)
|
||||
{
|
||||
my ($hash, $arg) = @_;
|
||||
RemoveInternalTimer($hash);
|
||||
my $ond = AttrVal($hash->{NAME}, "msgBeforeDisconnect", "");
|
||||
MQTT2_CLIENT_doPublish($hash, split(" ", $ond, 2), 0, 1) if($ond);
|
||||
DevIo_SimpleWrite($hash, pack("C",0xE0).pack("C",0), 0); # DISCONNECT
|
||||
DevIo_CloseDev($hash);
|
||||
return undef;
|
||||
}
|
||||
|
||||
sub
|
||||
MQTT2_CLIENT_Disco($)
|
||||
{
|
||||
my ($hash) = @_;
|
||||
RemoveInternalTimer($hash);
|
||||
$hash->{connecting} = 1;
|
||||
my $ond = AttrVal($hash->{NAME}, "msgBeforeDisconnect", "");
|
||||
MQTT2_CLIENT_doPublish($hash, split(" ", $ond, 2), 0, 0) if($ond);
|
||||
addToWritebuffer($hash, pack("C",0xE0).pack("C",0)); # DISCONNECT
|
||||
DevIo_Disconnected($hash);
|
||||
}
|
||||
|
||||
|
||||
sub
|
||||
MQTT2_CLIENT_Delete($@)
|
||||
{
|
||||
@ -190,15 +211,6 @@ MQTT2_CLIENT_Attr(@)
|
||||
return undef;
|
||||
}
|
||||
|
||||
sub
|
||||
MQTT2_CLIENT_Disco($)
|
||||
{
|
||||
my ($hash) = @_;
|
||||
RemoveInternalTimer($hash);
|
||||
$hash->{connecting} = 1;
|
||||
DevIo_Disconnected($hash);
|
||||
}
|
||||
|
||||
sub
|
||||
MQTT2_CLIENT_Set($@)
|
||||
{
|
||||
@ -291,7 +303,7 @@ MQTT2_CLIENT_Read($@)
|
||||
if($cpt eq "CONNACK") {
|
||||
my $rc = ord(substr($pl,1,1));
|
||||
if($rc == 0) {
|
||||
my $onc = AttrVal($name, "onConnect", "");
|
||||
my $onc = AttrVal($name, "msgAfterConnect", "");
|
||||
MQTT2_CLIENT_doPublish($hash, split(" ", $onc, 2)) if($onc);
|
||||
MQTT2_CLIENT_doinit($hash);
|
||||
|
||||
@ -340,30 +352,47 @@ MQTT2_CLIENT_Read($@)
|
||||
######################################
|
||||
# send topic to client if its subscription matches the topic
|
||||
sub
|
||||
MQTT2_CLIENT_doPublish($$$$)
|
||||
MQTT2_CLIENT_doPublish($@)
|
||||
{
|
||||
my ($hash, $topic, $val, $retain) = @_;
|
||||
my ($hash, $topic, $val, $retain, $immediate) = @_;
|
||||
my $name = $hash->{NAME};
|
||||
return if(IsDisabled($name));
|
||||
$val = "" if(!defined($val));
|
||||
Log3 $name, 5, "$name: sending PUBLISH $topic $val";
|
||||
addToWritebuffer($hash,
|
||||
pack("C",0x30).
|
||||
MQTT2_CLIENT_calcRemainingLength(2+length($topic)+length($val)).
|
||||
pack("n", length($topic)).
|
||||
$topic.$val);
|
||||
|
||||
my $msg = pack("C",0x30).
|
||||
MQTT2_CLIENT_calcRemainingLength(2+length($topic)+length($val)).
|
||||
pack("n", length($topic)).
|
||||
$topic.$val;
|
||||
if($immediate) {
|
||||
DevIo_SimpleWrite($hash, $msg, 0);
|
||||
} else {
|
||||
addToWritebuffer($hash, $msg);
|
||||
}
|
||||
}
|
||||
|
||||
sub
|
||||
MQTT2_CLIENT_Write($$$)
|
||||
{
|
||||
my ($hash,$topic,$msg) = @_;
|
||||
my $retain;
|
||||
if($topic =~ m/^(.*):r$/) {
|
||||
$topic = $1;
|
||||
$retain = 1;
|
||||
my ($hash, $function, $topicMsg) = @_;
|
||||
|
||||
if($function eq "publish") {
|
||||
my ($topic, $msg) = split(" ", $topicMsg, 2);
|
||||
my $retain;
|
||||
if($topic =~ m/^(.*):r$/) {
|
||||
$topic = $1;
|
||||
$retain = 1;
|
||||
}
|
||||
MQTT2_CLIENT_doPublish($hash, $topic, $msg, $retain);
|
||||
|
||||
} elsif($function eq "subscribe") {
|
||||
$hash->{".subscribtion"} = $topicMsg;
|
||||
MQTT2_CLIENT_Disco($hash);
|
||||
|
||||
} else {
|
||||
my $name = $hash->{NAME};
|
||||
Log3 $name, 1, "$name: ERROR: Ignoring function $function";
|
||||
}
|
||||
MQTT2_CLIENT_doPublish($hash, $topic, $msg, $retain);
|
||||
}
|
||||
|
||||
sub
|
||||
@ -491,11 +520,16 @@ MQTT2_CLIENT_getStr($$)
|
||||
set the MQTT protocol version in the CONNECT header, default is 3.1
|
||||
</li></br>
|
||||
|
||||
<a name="onConnect"></a>
|
||||
<li>onConnect topic message<br>
|
||||
<a name="msgAfterConnect"></a>
|
||||
<li>msgAfterConnect topic message<br>
|
||||
publish the topic after each connect or reconnect.
|
||||
</li></br>
|
||||
|
||||
<a name="msgBeforeDisconnect"></a>
|
||||
<li>msgBeforeDisconnect topic message<br>
|
||||
publish the topic bofore each disconnect.
|
||||
</li></br>
|
||||
|
||||
<a name="rawEvents"></a>
|
||||
<li>rawEvents <topic-regexp><br>
|
||||
send all messages as events attributed to this MQTT2_CLIENT instance.
|
||||
@ -504,7 +538,9 @@ MQTT2_CLIENT_getStr($$)
|
||||
|
||||
<a name="subscriptions"></a>
|
||||
<li>subscriptions <subscriptions><br>
|
||||
space separated list of MQTT subscriptions, default is #
|
||||
space separated list of MQTT subscriptions, default is #<br>
|
||||
Note: if the value is the literal setByTheProgram, then the value sent by
|
||||
the client (e.g. MQTT_GENERIC_BRIDGE) is used.
|
||||
</li><br>
|
||||
|
||||
<a name="SSL"></a>
|
||||
|
@ -378,6 +378,7 @@ MQTT2_SERVER_Read($@)
|
||||
####################################
|
||||
} elsif($cpt eq "DISCONNECT") {
|
||||
Log3 $sname, 4, "$cname $hash->{cid} $cpt";
|
||||
delete($hash->{lwt}); # no LWT on disconnect, see doc, chapter 3.14
|
||||
CommandDelete(undef, $cname);
|
||||
|
||||
####################################
|
||||
@ -457,13 +458,21 @@ MQTT2_SERVER_terminate($$)
|
||||
sub
|
||||
MQTT2_SERVER_Write($$$)
|
||||
{
|
||||
my ($hash,$topic,$msg) = @_;
|
||||
my $retain;
|
||||
if($topic =~ m/^(.*):r$/) {
|
||||
$topic = $1;
|
||||
$retain = 1;
|
||||
my ($hash,$function,$topicMsg) = @_;
|
||||
|
||||
if($function eq "publish") {
|
||||
my ($topic, $msg) = split(" ", $topicMsg, 2);
|
||||
my $retain;
|
||||
if($topic =~ m/^(.*):r$/) {
|
||||
$topic = $1;
|
||||
$retain = 1;
|
||||
}
|
||||
MQTT2_SERVER_doPublish($hash, $hash, $topic, $msg, $retain);
|
||||
|
||||
} else {
|
||||
my $name = $hash->{NAME};
|
||||
Log3 $name, 1, "$name: ERROR: Ignoring function $function";
|
||||
}
|
||||
MQTT2_SERVER_doPublish($hash, $hash, $topic, $msg, $retain);
|
||||
}
|
||||
|
||||
sub
|
||||
|
@ -251,7 +251,7 @@ MQTT2_DEVICE_Get($@)
|
||||
|
||||
$cmd = MQTT2_buildCmd($hash, \@a, $cmd);
|
||||
return if(!$cmd);
|
||||
IOWrite($hash, split(" ",$cmd,2));
|
||||
IOWrite($hash, "publish", $cmd);
|
||||
|
||||
return undef;
|
||||
}
|
||||
@ -271,7 +271,7 @@ MQTT2_DEVICE_Set($@)
|
||||
|
||||
$cmd = MQTT2_buildCmd($hash, \@a, $cmd);
|
||||
return if(!$cmd);
|
||||
IOWrite($hash, split(" ",$cmd,2));
|
||||
IOWrite($hash, "publish", $cmd);
|
||||
readingsSingleUpdate($hash, "state", $cmdName, 1);
|
||||
return undef;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user