mirror of
https://github.com/fhem/fhem-mirror.git
synced 2025-01-31 12:49:34 +00:00
806469796f
git-svn-id: https://svn.fhem.de/fhem/trunk@24981 2b470e98-0d58-463d-a4d8-8e2adae1ed80
1051 lines
31 KiB
Perl
1051 lines
31 KiB
Perl
##############################################
|
|
#
|
|
# fhem bridge to mqtt (see http://mqtt.org)
|
|
#
|
|
# Copyright (C) 2018 Alexander Schulz
|
|
# Copyright (C) 2017 Stephan Eisler
|
|
# Copyright (C) 2014 - 2016 Norbert Truchsess
|
|
#
|
|
# This file is part of fhem.
|
|
#
|
|
# Fhem is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# Fhem is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with fhem. If not, see <http://www.gnu.org/licenses/>.
|
|
#
|
|
# $Id$
|
|
#
|
|
##############################################
|
|
|
|
package MQTT;
|
|
|
|
use Exporter ('import');
|
|
@EXPORT = ();
|
|
@EXPORT_OK = qw(send_publish send_subscribe send_unsubscribe client_attr client_subscribe_topic client_unsubscribe_topic topic_to_regexp parseParams);
|
|
%EXPORT_TAGS = (all => [@EXPORT_OK]);
|
|
|
|
use strict;
|
|
use warnings;
|
|
|
|
use GPUtils qw(GP_Import GP_ForallClients);
|
|
use Scalar::Util qw(looks_like_number);
|
|
use Carp qw(carp);
|
|
|
|
use Net::MQTT::Constants;
|
|
use Net::MQTT::Message;
|
|
|
|
our %qos = map {qos_string($_) => $_} (MQTT_QOS_AT_MOST_ONCE,MQTT_QOS_AT_LEAST_ONCE,MQTT_QOS_EXACTLY_ONCE);
|
|
|
|
BEGIN {GP_Import(qw(
|
|
gettimeofday
|
|
readingsSingleUpdate
|
|
readingsBeginUpdate
|
|
readingsBulkUpdate
|
|
readingsEndUpdate
|
|
DevIo_OpenDev
|
|
DevIo_SimpleWrite
|
|
DevIo_SimpleRead
|
|
DevIo_CloseDev
|
|
DevIo_IsOpen
|
|
DevIo_getState
|
|
IsDisabled
|
|
RemoveInternalTimer
|
|
InternalTimer
|
|
AttrVal
|
|
ReadingsVal
|
|
Log3
|
|
AssignIoPort
|
|
getKeyValue
|
|
setKeyValue
|
|
CallFn
|
|
defs
|
|
modules
|
|
init_done
|
|
))};
|
|
|
|
sub ::MQTT_Initialize { goto &Initialize };
|
|
|
|
my %sets = (
|
|
"connect" => "",
|
|
"disconnect" => "",
|
|
"publish" => "",
|
|
);
|
|
|
|
my %gets = (
|
|
"version" => ""
|
|
);
|
|
|
|
my @clients = qw(
|
|
MQTT_DEVICE
|
|
MQTT_BRIDGE
|
|
);
|
|
|
|
use DevIo;
|
|
|
|
sub Initialize {
|
|
|
|
my $hash = shift // return;
|
|
|
|
# Provider
|
|
$hash->{Clients} = join q{:}, @clients;
|
|
$hash->{ReadyFn} = \&Ready;
|
|
$hash->{ReadFn} = \&Read;
|
|
|
|
# Consumer
|
|
$hash->{DefFn} = \&Define;
|
|
$hash->{UndefFn} = \&Undef;
|
|
$hash->{DeleteFn} = \&Delete;
|
|
$hash->{RenameFn} = \&Rename;
|
|
$hash->{ShutdownFn} = \&Shutdown;
|
|
$hash->{SetFn} = \&Set;
|
|
$hash->{NotifyFn} = \&Notify;
|
|
$hash->{AttrFn} = \&Attr;
|
|
|
|
$hash->{AttrList} = "keep-alive last-will client-id on-connect on-disconnect on-timeout privacy:1,0 ".$::readingFnAttributes;
|
|
}
|
|
|
|
|
|
|
|
sub Define($$) {
|
|
my ( $hash, $def ) = @_;
|
|
|
|
$hash->{NOTIFYDEV} = "global";
|
|
$hash->{msgid} = 1;
|
|
$hash->{timeout} = 60;
|
|
$hash->{messages} = {};
|
|
|
|
my ($host,$username,$password) = split("[ \t]+", $hash->{DEF});
|
|
$hash->{DeviceName} = $host;
|
|
|
|
my $name = $hash->{NAME};
|
|
setKeyValue($name."_user",$username) if defined $username;
|
|
setKeyValue($name."_pass",$password) if defined $password;
|
|
|
|
$hash->{DEF} = $host;
|
|
|
|
#readingsSingleUpdate($hash,"connection","disconnected",0);
|
|
|
|
return Start($hash) if $init_done;
|
|
return;
|
|
}
|
|
|
|
sub Undef($) {
|
|
my $hash = shift;
|
|
Stop($hash);
|
|
return undef;
|
|
}
|
|
|
|
sub Delete($$) {
|
|
my ($hash, $name) = @_;
|
|
setKeyValue($name."_user",undef);
|
|
setKeyValue($name."_pass",undef);
|
|
return undef;
|
|
}
|
|
|
|
sub Shutdown($) {
|
|
my $hash = shift;
|
|
Stop($hash);
|
|
my $name = $hash->{NAME};
|
|
Log3($name,1,"Shutdown executed");
|
|
return undef;
|
|
}
|
|
|
|
sub onConnect($) {
|
|
my $hash = shift;
|
|
my $name = $hash->{NAME};
|
|
my $cmdstr = AttrVal($name,"on-connect",undef);
|
|
return process_event($hash,$cmdstr);
|
|
}
|
|
|
|
sub onDisconnect($) {
|
|
my $hash = shift;
|
|
my $name = $hash->{NAME};
|
|
my $cmdstr = AttrVal($name,"on-disconnect",undef);
|
|
return process_event($hash,$cmdstr);
|
|
}
|
|
|
|
sub onTimeout($) {
|
|
my $hash = shift;
|
|
my $name = $hash->{NAME};
|
|
my $cmdstr = AttrVal($name,"on-timeout",undef);
|
|
if($cmdstr) {
|
|
return eval($cmdstr);
|
|
}
|
|
}
|
|
|
|
sub isConnected($) {
|
|
my $hash = shift;
|
|
my $cstate=ReadingsVal($hash->{NAME}, "connection", "");
|
|
return 1 if($cstate eq "connected" || $cstate eq "active");
|
|
return undef;
|
|
}
|
|
|
|
sub process_event($$) {
|
|
my $hash = shift;
|
|
my $str = shift;
|
|
my ($qos, $retain,$topic, $message, $cmd) = parsePublishCmdStr($str);
|
|
|
|
my $do=1;
|
|
if($cmd) {
|
|
my $name = $hash->{NAME};
|
|
$do=eval($cmd);
|
|
$do=1 if (!defined($do));
|
|
#no strict "refs";
|
|
#my $ret = &{$hash->{WBCallback}}($hash);
|
|
#use strict "refs";
|
|
}
|
|
|
|
if($do && defined($topic)) {
|
|
$qos = MQTT_QOS_AT_MOST_ONCE unless defined($qos);
|
|
send_publish($hash, topic => $topic, message => $message, qos => $qos, retain => $retain);
|
|
}
|
|
}
|
|
|
|
sub Set($@) {
|
|
my ($hash, @a) = @_;
|
|
return 'Need at least one parameter!' if @a < 2;
|
|
return "Unknown argument $a[1], choose one of " . join(" ", sort keys %sets)
|
|
if(!defined($sets{$a[1]}));
|
|
my $command = $a[1];
|
|
#my $value = $a[2];
|
|
|
|
return Start($hash) if $command eq 'connect';
|
|
return Stop($hash) if $command eq 'disconnect';
|
|
|
|
return "unknown command $command" if $command ne 'publish';
|
|
shift(@a);
|
|
shift(@a);
|
|
my ($qos, $retain,$topic, $value) = parsePublishCmd(@a);
|
|
return 'missing parameters. usage: publish [qos:?] [retain:?] topic value1 [value2]...' if !$topic;
|
|
return "wrong parameter. topic may not be '#' or '+'" if ($topic eq '#' or $topic eq '+');
|
|
$qos = MQTT_QOS_AT_MOST_ONCE if !defined $qos;
|
|
send_publish($hash, topic => $topic, message => $value, qos => $qos, retain => $retain);
|
|
return;
|
|
}
|
|
|
|
sub parseParams($;$$$$) {
|
|
|
|
my ( $cmd, $separator, $joiner, $keyvalueseparator, $acceptedkeys ) = @_;
|
|
$separator = ' ' if ( !$separator );
|
|
$joiner = $separator if ( !$joiner ); # needed if separator is a regexp
|
|
$keyvalueseparator = ':' if(!$keyvalueseparator);
|
|
my ( @a, %h );
|
|
|
|
my @params;
|
|
if ( ref($cmd) eq 'ARRAY' ) {
|
|
@params = @{$cmd};
|
|
}
|
|
else {
|
|
@params = split( $separator, $cmd );
|
|
}
|
|
|
|
while (@params) {
|
|
my $param = shift(@params);
|
|
next if ( $param eq "" );
|
|
my ( $key, $value ) = split( $keyvalueseparator, $param, 2 );
|
|
|
|
if ( !defined($value) ) {
|
|
$value = $key;
|
|
$key = undef;
|
|
|
|
# the key can not start with a { -> it must be a perl expression # vim:}
|
|
}
|
|
elsif ( $key =~ m/^\s*{/ ) { # for vim: }
|
|
$value = $param;
|
|
$key = undef;
|
|
}
|
|
# the key can not start with a ' or "
|
|
elsif ( $key =~ m/^\s*('|")/ ) {
|
|
$value = $param;
|
|
$key = undef;
|
|
}
|
|
# accept known keys only (if defined $acceptedkeys)
|
|
elsif (defined($acceptedkeys) and !defined($acceptedkeys->{$key})) {
|
|
$value = $param;
|
|
$key = undef;
|
|
}
|
|
|
|
#collect all parts until the closing ' or "
|
|
while ( $param && $value =~ m/^('|")/ && $value !~ m/$1$/ ) {
|
|
my $next = shift(@params);
|
|
last if ( !defined($next) );
|
|
$value .= $joiner . $next;
|
|
}
|
|
|
|
#remove matching ' or " from the start and end
|
|
if ( $value =~ m/^('|")/ && $value =~ m/$1$/ ) {
|
|
$value =~ s/^.(.*).$/$1/;
|
|
}
|
|
|
|
#collect all parts until opening { and closing } are matched
|
|
if ( $value =~ m/^\s*{/ ) { # } for match
|
|
my $count = 0;
|
|
for my $i ( 0 .. length($value) - 1 ) {
|
|
my $c = substr( $value, $i, 1 );
|
|
++$count if ( $c eq '{' );
|
|
--$count if ( $c eq '}' );
|
|
}
|
|
|
|
while ( $param && $count != 0 ) {
|
|
my $next = shift(@params);
|
|
last if ( !defined($next) );
|
|
$value .= $joiner . $next;
|
|
|
|
for my $i ( 0 .. length($next) - 1 ) {
|
|
my $c = substr( $next, $i, 1 );
|
|
++$count if ( $c eq '{' );
|
|
--$count if ( $c eq '}' );
|
|
}
|
|
}
|
|
}
|
|
|
|
if ( defined($key) ) {
|
|
$h{$key} = $value;
|
|
}
|
|
else {
|
|
push @a, $value;
|
|
}
|
|
|
|
}
|
|
return ( \@a, \%h );
|
|
}
|
|
|
|
sub parsePublishCmdStr($) {
|
|
my ($str) = @_;
|
|
|
|
return undef unless defined($str);
|
|
|
|
my @lwa = split("[ \t]+",$str);
|
|
return parsePublishCmd(@lwa);
|
|
}
|
|
|
|
sub parsePublishCmd(@) {
|
|
my @a = @_;
|
|
my ( $aa, $bb ) = parseParams(\@a,undef,undef,undef,{qos=>1,retain=>1});
|
|
|
|
my $qos = 0;
|
|
my $retain = 0;
|
|
my $topic = undef;
|
|
my $value = "\0";
|
|
my $expression = undef;
|
|
|
|
if ( exists( $bb->{'qos'} ) ) {
|
|
$qos = $bb->{'qos'};
|
|
}
|
|
|
|
if ( exists $bb->{'retain'} ) {
|
|
$retain = $bb->{'retain'};
|
|
}
|
|
|
|
my @aaa = ();
|
|
my @xaa = @{$aa};
|
|
|
|
while ( scalar(@xaa) > 0 ) {
|
|
my $av = shift @xaa;
|
|
if (!defined($expression) and $av =~ /^\{.*\}$/ and scalar(@xaa)>0) {
|
|
$expression = $av;
|
|
next;
|
|
}
|
|
else {
|
|
push @aaa, $av;
|
|
}
|
|
}
|
|
|
|
$topic = shift(@aaa);
|
|
|
|
if ( scalar(@aaa) > 0 ) {
|
|
$value = join( " ", @aaa );
|
|
}
|
|
return undef unless $topic || $expression;
|
|
return ( $qos, $retain, $topic, $value, $expression );
|
|
|
|
}
|
|
|
|
sub Notify($$) {
|
|
my ($hash,$dev) = @_;
|
|
if( grep(m/^(INITIALIZED|REREADCFG)$/, @{$dev->{CHANGED}}) ) {
|
|
Start($hash);
|
|
} elsif( grep(m/^SAVE$/, @{$dev->{CHANGED}}) ) {
|
|
}
|
|
}
|
|
|
|
sub Attr($$$$) {
|
|
my ($command,$name,$attribute,$value) = @_;
|
|
|
|
my $hash = $main::defs{$name};
|
|
ATTRIBUTE_HANDLER: {
|
|
$attribute eq "keep-alive" and do {
|
|
if ($command eq "set") {
|
|
$hash->{timeout} = $value;
|
|
} else {
|
|
$hash->{timeout} = 60;
|
|
}
|
|
if ($init_done) {
|
|
$hash->{ping_received}=1;
|
|
Timer($hash);
|
|
};
|
|
last;
|
|
};
|
|
$attribute eq "last-will" and do {
|
|
if($hash->{STATE} ne "disconnected") {
|
|
Stop($hash);
|
|
InternalTimer(gettimeofday()+1, \&Start, $hash, 0);
|
|
}
|
|
last;
|
|
};
|
|
};
|
|
}
|
|
|
|
#sub Reconnect($){
|
|
# my $hash = shift;
|
|
# Stop($hash);
|
|
# Start($hash);
|
|
#}
|
|
|
|
sub Start($) {
|
|
my $hash = shift;
|
|
my $firsttime = $hash->{".cinitmark"};
|
|
|
|
if(defined($firsttime)) {
|
|
my $cstate=ReadingsVal($hash->{NAME},"connection","");
|
|
if($cstate ne "disconnected" && $cstate ne "timed-out") {
|
|
return undef;
|
|
}
|
|
} else {
|
|
$hash->{".cinitmark"} = 1;
|
|
$hash->{".reconnectmark"} = 1;
|
|
}
|
|
|
|
DevIo_CloseDev($hash);
|
|
return DevIo_OpenDev($hash, 0, \&Init);
|
|
}
|
|
|
|
sub Stop($) {
|
|
my $hash = shift;
|
|
|
|
$hash->{".reconnectmark"} = 0;
|
|
|
|
my $cstate=ReadingsVal($hash->{NAME},"connection","");
|
|
if($cstate eq "disconnected" || $cstate eq "timed-out") {
|
|
return undef;
|
|
}
|
|
|
|
send_disconnect($hash);
|
|
DevIo_CloseDev($hash);
|
|
RemoveInternalTimer($hash);
|
|
readingsBeginUpdate($hash);
|
|
readingsBulkUpdate($hash,'connection','disconnected');
|
|
readingsBulkUpdate($hash,'state','disconnected');
|
|
readingsEndUpdate($hash,1);
|
|
}
|
|
|
|
sub Ready($) {
|
|
my $hash = shift;
|
|
return if DevIo_IsOpen($hash) || IsDisabled($hash->{NAME});
|
|
return DevIo_OpenDev($hash, 1, \&Init, sub(){}) if DevIo_getState($hash) eq 'disconnected';
|
|
return;
|
|
}
|
|
|
|
sub Rename() {
|
|
my ($new,$old) = @_;
|
|
setKeyValue($new."_user",getKeyValue($old."_user"));
|
|
setKeyValue($new."_pass",getKeyValue($old."_pass"));
|
|
|
|
setKeyValue($old."_user",undef);
|
|
setKeyValue($old."_pass",undef);
|
|
return undef;
|
|
}
|
|
|
|
sub Init($) {
|
|
my $hash = shift;
|
|
send_connect($hash);
|
|
readingsSingleUpdate($hash,"connection","connecting",1);
|
|
$hash->{ping_received}=1;
|
|
$hash->{".reconnectmark"} = 1;
|
|
Timer($hash);
|
|
return undef;
|
|
}
|
|
|
|
sub Timer($) {
|
|
my $hash = shift;
|
|
#Log3($hash->{NAME},1,">>> timer ");
|
|
RemoveInternalTimer($hash);
|
|
unless ($hash->{ping_received}) {
|
|
onTimeout($hash);
|
|
readingsSingleUpdate($hash,"connection","timed-out",1) ;#unless $hash->{ping_received};
|
|
GP_ForallClients($hash,\¬ify_client_connection_timeout);
|
|
}
|
|
$hash->{ping_received} = 0;
|
|
|
|
#Log3($hash->{NAME},1,">>> reconnect mark: ".$hash->{".reconnectmark"});
|
|
#Log3($hash->{NAME},1,">>> state: ".ReadingsVal($hash->{NAME}, "state", ""));
|
|
if($hash->{".reconnectmark"} eq 1) {
|
|
if(ReadingsVal($hash->{NAME}, "state", "") eq "disconnected") {
|
|
#Log3($hash->{NAME},1,">>> reconnect ");
|
|
Start($hash);
|
|
}
|
|
}
|
|
InternalTimer(gettimeofday()+$hash->{timeout}, "MQTT::Timer", $hash, 0);
|
|
send_ping($hash);
|
|
}
|
|
|
|
sub Read {
|
|
my ($hash) = @_;
|
|
my $name = $hash->{NAME};
|
|
my $buf = DevIo_SimpleRead($hash);
|
|
return undef unless $buf;
|
|
$hash->{buf} .= $buf;
|
|
while (my $mqtt = Net::MQTT::Message->new_from_bytes($hash->{buf},1)) {
|
|
|
|
my $message_type = $mqtt->message_type();
|
|
|
|
Log3($name,5,"MQTT $name message received: ".$mqtt->string());
|
|
|
|
MESSAGE_TYPE: {
|
|
$message_type == MQTT_CONNACK and do {
|
|
readingsSingleUpdate($hash,"connection","connected",1);
|
|
onConnect($hash);
|
|
GP_ForallClients($hash,\&client_start);
|
|
GP_ForallClients($hash,\¬ify_client_connected);
|
|
foreach my $message_id (keys %{$hash->{messages}}) {
|
|
my $msg = $hash->{messages}->{$message_id}->{message};
|
|
if($msg->message_type != MQTT_SUBSCRIBE) {
|
|
# alle subscrube messages werden bei client_start bereits neu gesendet
|
|
$msg->{dup} = $msg->message_type == MQTT_PUBLISH;
|
|
DevIo_SimpleWrite($hash,$msg->bytes,undef);
|
|
} else {
|
|
delete($hash->{messages}->{$message_id});
|
|
}
|
|
}
|
|
last;
|
|
};
|
|
|
|
$message_type == MQTT_PUBLISH and do {
|
|
my $topic = $mqtt->topic();
|
|
GP_ForallClients($hash,sub {
|
|
my $client = shift;
|
|
if (grep { $topic =~ $_ } @{$client->{subscribeExpr}}) {
|
|
Log3($client->{NAME},5,"publish received for $topic, ".$mqtt->message());
|
|
readingsSingleUpdate($client,"transmission-state","incoming publish received",1);
|
|
my $fn = $modules{$defs{$client->{NAME}}{TYPE}}{OnMessageFn};
|
|
if($fn) {
|
|
CallFn($client->{NAME},"OnMessageFn",($client,$topic,$mqtt->message()))
|
|
} elsif ($client->{TYPE} eq "MQTT_DEVICE") {
|
|
MQTT::DEVICE::onmessage($client,$topic,$mqtt->message());
|
|
} elsif ($client->{TYPE} eq "MQTT_BRIDGE") {
|
|
MQTT::BRIDGE::onmessage($client,$topic,$mqtt->message());
|
|
} else {
|
|
Log3($client->{NAME},1,"unexpected client or no OnMessageFn defined: ".$client->{TYPE});
|
|
}
|
|
};
|
|
},undef);
|
|
if (my $qos = $mqtt->qos() > MQTT_QOS_AT_MOST_ONCE) {
|
|
my $message_id = $mqtt->message_id();
|
|
if ($qos == MQTT_QOS_AT_LEAST_ONCE) {
|
|
send_message($hash, message_type => MQTT_PUBACK, message_id => $message_id);
|
|
} else {
|
|
send_message($hash, message_type => MQTT_PUBREC, message_id => $message_id);
|
|
}
|
|
}
|
|
last;
|
|
};
|
|
|
|
$message_type == MQTT_PUBACK and do {
|
|
my $message_id = $mqtt->message_id();
|
|
GP_ForallClients($hash,sub {
|
|
my $client = shift;
|
|
if ($client->{message_ids}->{$message_id}) {
|
|
readingsSingleUpdate($client,"transmission-state","outgoing publish acknowledged",1);
|
|
delete $client->{message_ids}->{$message_id};
|
|
};
|
|
},undef);
|
|
delete $hash->{messages}->{$message_id}; #QoS Level 1: at_least_once handling
|
|
last;
|
|
};
|
|
|
|
$message_type == MQTT_PUBREC and do {
|
|
my $message_id = $mqtt->message_id();
|
|
GP_ForallClients($hash,sub {
|
|
my $client = shift;
|
|
if ($client->{message_ids}->{$message_id}) {
|
|
readingsSingleUpdate($client,"transmission-state","outgoing publish received",1);
|
|
};
|
|
},undef);
|
|
send_message($hash, message_type => MQTT_PUBREL, message_id => $message_id); #QoS Level 2: exactly_once handling
|
|
last;
|
|
};
|
|
|
|
$message_type == MQTT_PUBREL and do {
|
|
my $message_id = $mqtt->message_id();
|
|
GP_ForallClients($hash,sub {
|
|
my $client = shift;
|
|
if ($client->{message_ids}->{$message_id}) {
|
|
readingsSingleUpdate($client,"transmission-state","incoming publish released",1);
|
|
delete $client->{message_ids}->{$message_id};
|
|
};
|
|
},undef);
|
|
send_message($hash, message_type => MQTT_PUBCOMP, message_id => $message_id); #QoS Level 2: exactly_once handling
|
|
delete $hash->{messages}->{$message_id};
|
|
last;
|
|
};
|
|
|
|
$message_type == MQTT_PUBCOMP and do {
|
|
my $message_id = $mqtt->message_id();
|
|
GP_ForallClients($hash,sub {
|
|
my $client = shift;
|
|
if ($client->{message_ids}->{$message_id}) {
|
|
readingsSingleUpdate($client,"transmission-state","outgoing publish completed",1);
|
|
delete $client->{message_ids}->{$message_id};
|
|
};
|
|
},undef);
|
|
delete $hash->{messages}->{$message_id}; #QoS Level 2: exactly_once handling
|
|
last;
|
|
};
|
|
|
|
$message_type == MQTT_SUBACK and do {
|
|
my $message_id = $mqtt->message_id();
|
|
GP_ForallClients($hash,sub {
|
|
my $client = shift;
|
|
if ($client->{message_ids}->{$message_id}) {
|
|
readingsSingleUpdate($client,"transmission-state","subscription acknowledged",1);
|
|
delete $client->{message_ids}->{$message_id};
|
|
};
|
|
},undef);
|
|
delete $hash->{messages}->{$message_id}; #QoS Level 1: at_least_once handling
|
|
last;
|
|
};
|
|
|
|
$message_type == MQTT_UNSUBACK and do {
|
|
my $message_id = $mqtt->message_id();
|
|
GP_ForallClients($hash,sub {
|
|
my $client = shift;
|
|
if ($client->{message_ids}->{$message_id}) {
|
|
readingsSingleUpdate($client,"transmission-state","unsubscription acknowledged",1);
|
|
delete $client->{message_ids}->{$message_id};
|
|
};
|
|
},undef);
|
|
delete $hash->{messages}->{$message_id}; #QoS Level 1: at_least_once handling
|
|
last;
|
|
};
|
|
|
|
$message_type == MQTT_PINGRESP and do {
|
|
$hash->{ping_received} = 1;
|
|
readingsSingleUpdate($hash,"connection","active",1);
|
|
last;
|
|
};
|
|
|
|
Log3($hash->{NAME},4,"MQTT::Read '$hash->{NAME}' unexpected message type '".message_type_string($message_type)."'");
|
|
}
|
|
}
|
|
return undef;
|
|
};
|
|
|
|
sub send_connect($) {
|
|
my $hash = shift;
|
|
my $name = $hash->{NAME};
|
|
my $user = getKeyValue($name."_user");
|
|
my $pass = getKeyValue($name."_pass");
|
|
|
|
my $lw = AttrVal($name,"last-will",undef);
|
|
my $clientId = AttrVal($name,'client-id', $hash->{FUUID});
|
|
my ($willqos, $willretain,$willtopic, $willmessage) = parsePublishCmdStr($lw);
|
|
|
|
return send_message($hash, message_type => MQTT_CONNECT, keep_alive_timer => $hash->{timeout}, user_name => $user, password => $pass, client_id=>$clientId, will_topic => $willtopic, will_message => $willmessage, will_retain => $willretain, will_qos => $willqos);
|
|
};
|
|
|
|
sub send_publish($@) {
|
|
my ($hash,%msg) = @_;
|
|
if ($msg{qos} == MQTT_QOS_AT_MOST_ONCE) {
|
|
send_message(shift, message_type => MQTT_PUBLISH, %msg);
|
|
return undef;
|
|
} else {
|
|
my $msgid = $hash->{msgid}++;
|
|
send_message(shift, message_type => MQTT_PUBLISH, message_id => $msgid, %msg);
|
|
return $msgid;
|
|
}
|
|
};
|
|
|
|
sub send_subscribe($@) {
|
|
my $hash = shift;
|
|
my $msgid = $hash->{msgid}++;
|
|
send_message($hash, message_type => MQTT_SUBSCRIBE, message_id => $msgid, qos => MQTT_QOS_AT_LEAST_ONCE, @_);
|
|
return $msgid;
|
|
};
|
|
|
|
sub send_unsubscribe($@) {
|
|
my $hash = shift;
|
|
my $msgid = $hash->{msgid}++;
|
|
send_message($hash, message_type => MQTT_UNSUBSCRIBE, message_id => $msgid, qos => MQTT_QOS_AT_LEAST_ONCE, @_);
|
|
return $msgid;
|
|
};
|
|
|
|
sub send_ping($) {
|
|
return send_message(shift, message_type => MQTT_PINGREQ);
|
|
};
|
|
|
|
sub send_disconnect($) {
|
|
my $hash = shift;
|
|
onDisconnect($hash);
|
|
GP_ForallClients($hash,\¬ify_client_disconnected);
|
|
return send_message($hash, message_type => MQTT_DISCONNECT);
|
|
};
|
|
|
|
sub send_message($$$@) {
|
|
my ($hash,%msg) = @_;
|
|
my $name = $hash->{NAME};
|
|
my $message = Net::MQTT::Message->new(%msg);
|
|
Log3($name,5,"MQTT $name message sent: ".$message->string());
|
|
if (defined $msg{message_id}) {
|
|
$hash->{messages}->{$msg{message_id}} = {
|
|
message => $message,
|
|
timeout => gettimeofday()+$hash->{timeout},
|
|
};
|
|
}
|
|
|
|
DevIo_SimpleWrite($hash,$message->bytes,undef);
|
|
};
|
|
|
|
sub topic_to_regexp($) {
|
|
my $t = shift;
|
|
$t =~ s|#$|.\*|;
|
|
$t =~ s|\$|\\\$|g;
|
|
$t =~ s|\/\.\*$|.\*|;
|
|
$t =~ s|\/|\\\/|g;
|
|
$t =~ s|(\+)([^+]*$)|(+)$2|;
|
|
$t =~ s|\+|[^\/]+|g;
|
|
return "^$t\$";
|
|
}
|
|
|
|
sub client_subscribe_topic($$;$$) {
|
|
my ($client,$topic,$qos,$retain) = @_;
|
|
push @{$client->{subscribe}},$topic unless grep {$_ eq $topic} @{$client->{subscribe}};
|
|
$client->{subscribeQos}->{$topic}=$qos;
|
|
my $expr = topic_to_regexp($topic);
|
|
push @{$client->{subscribeExpr}},$expr unless grep {$_ eq $expr} @{$client->{subscribeExpr}};
|
|
if ($init_done) {
|
|
if (my $mqtt = $client->{IODev}) {;
|
|
$qos = $client->{".qos"}->{"*"} unless defined $qos; # MQTT_QOS_AT_MOST_ONCE
|
|
$retain = 0 unless defined $retain; # not supported yet
|
|
my $msgid = send_subscribe($mqtt,
|
|
topics => [[$topic => $qos || MQTT_QOS_AT_MOST_ONCE]],
|
|
);
|
|
$client->{message_ids}->{$msgid}++;
|
|
readingsSingleUpdate($client,"transmission-state","subscribe sent",1)
|
|
}
|
|
}
|
|
};
|
|
|
|
sub client_unsubscribe_topic($$) {
|
|
my ($client,$topic) = @_;
|
|
$client->{subscribe} = [grep { $_ ne $topic } @{$client->{subscribe}}];
|
|
delete $client->{subscribeQos}->{$topic};
|
|
my $expr = topic_to_regexp($topic);
|
|
$client->{subscribeExpr} = [grep { $_ ne $expr} @{$client->{subscribeExpr}}];
|
|
if ($init_done) {
|
|
if (my $mqtt = $client->{IODev}) {;
|
|
my $msgid = send_unsubscribe($mqtt,
|
|
topics => [$topic],
|
|
);
|
|
$client->{message_ids}->{$msgid}++;
|
|
readingsSingleUpdate($client,"transmission-state","unsubscribe sent",1)
|
|
}
|
|
}
|
|
};
|
|
|
|
sub Client_Define($$) {
|
|
my ( $client, $def ) = @_;
|
|
|
|
$client->{NOTIFYDEV} = $client->{DEF} if $client->{DEF};
|
|
#$client->{qos} = MQTT_QOS_AT_MOST_ONCE; ### ALT
|
|
$client->{".qos"}->{'*'} = 0;
|
|
$client->{".retain"}->{'*'} = "0";
|
|
$client->{subscribe} = [];
|
|
$client->{subscribeQos} = {};
|
|
$client->{subscribeExpr} = [];
|
|
AssignIoPort($client);
|
|
|
|
return client_start($client) if $init_done;
|
|
return;
|
|
}
|
|
|
|
sub Client_Undefine($) {
|
|
client_stop(shift);
|
|
return undef;
|
|
};
|
|
#use Data::Dumper;
|
|
sub client_attr($$$$$) {
|
|
my ($client,$command,$name,$attribute,$value) = @_;
|
|
|
|
ATTRIBUTE_HANDLER: {
|
|
$attribute eq "qos" and do {
|
|
#if ($command eq "set") {
|
|
# $client->{qos} = $MQTT::qos{$value}; ### ALT
|
|
#} else {
|
|
# $client->{qos} = MQTT_QOS_AT_MOST_ONCE; ### ALT
|
|
#}
|
|
|
|
delete($client->{".qos"});
|
|
|
|
if ($command ne "set") {
|
|
delete($client->{".qos"});
|
|
$client->{".qos"}->{"*"} = "0";
|
|
} else {
|
|
|
|
my @values = ();
|
|
if(!defined($value) || $value=~/^[ \t]*$/) {
|
|
return "QOS value may not be empty. Format: [<reading>|*:]0|1|2";
|
|
}
|
|
@values = split("[ \t]+",$value);
|
|
|
|
foreach my $set (@values) {
|
|
my($rname,$rvalue) = split(":",$set);
|
|
if(!defined($rvalue)) {
|
|
$rvalue=$rname;
|
|
$rname="";
|
|
$rname="*" if (scalar(@values)==1); # backward compatibility: single value without a reading name should be applied to all
|
|
}
|
|
#if ($command eq "set") {
|
|
# Map constants
|
|
#$rvalue = MQTT_QOS_AT_MOST_ONCE if($rvalue eq qos_string(MQTT_QOS_AT_MOST_ONCE));
|
|
#$rvalue = MQTT_QOS_AT_LEAST_ONCE if($rvalue eq qos_string(MQTT_QOS_AT_LEAST_ONCE));
|
|
#$rvalue = MQTT_QOS_EXACTLY_ONCE if($rvalue eq qos_string(MQTT_QOS_EXACTLY_ONCE));
|
|
$rvalue=$MQTT::qos{$rvalue} if(defined($MQTT::qos{$rvalue}));
|
|
if($rvalue ne "0" && $rvalue ne "1" && $rvalue ne "2") {
|
|
return "unexpected QOS value $rvalue. use 0, 1 or 2. Constants may be also used (".MQTT_QOS_AT_MOST_ONCE."=".qos_string(MQTT_QOS_AT_MOST_ONCE).", ".MQTT_QOS_AT_LEAST_ONCE."=".qos_string(MQTT_QOS_AT_LEAST_ONCE).", ".MQTT_QOS_EXACTLY_ONCE."=".qos_string(MQTT_QOS_EXACTLY_ONCE)."). Format: [<reading>|*:]0|1|2";
|
|
}
|
|
#$rvalue="1" unless ($rvalue eq "0");
|
|
$client->{".qos"}->{$rname} = $rvalue;
|
|
#} else {
|
|
# delete($client->{".qos"}->{$rname});
|
|
# $client->{".qos"}->{"*"} = "0" if($rname eq "*");
|
|
#}
|
|
}
|
|
}
|
|
|
|
my $showqos = "";
|
|
if(defined($client->{".qos"})) {
|
|
foreach my $rname (sort keys %{$client->{".qos"}}) {
|
|
my $rvalue = $client->{".qos"}->{$rname};
|
|
$rname="[state]" if ($rname eq "");
|
|
$showqos.=$rname.':'.$rvalue.' ';
|
|
}
|
|
}
|
|
$client->{"qos"} = $showqos;
|
|
last;
|
|
};
|
|
$attribute eq "retain" and do {
|
|
delete($client->{".retain"});
|
|
|
|
if ($command ne "set") {
|
|
delete($client->{".retain"});
|
|
$client->{".retain"}->{"*"} = "0";
|
|
} else {
|
|
my @values = ();
|
|
|
|
if(!defined($value) || $value=~/^[ \t]*$/) {
|
|
return "retain value may not be empty. Format: [<reading>|*:]0|1";
|
|
}
|
|
@values = split("[ \t]+",$value);
|
|
|
|
foreach my $set (@values) {
|
|
my($rname,$rvalue) = split(":",$set);
|
|
if(!defined($rvalue)) {
|
|
$rvalue=$rname;
|
|
$rname="";
|
|
$rname="*" if (scalar(@values)==1); # backward compatibility: single value without a reading name should be applied to all
|
|
}
|
|
if($rvalue ne "0" && $rvalue ne "1") {
|
|
return "unexpected retain value. use 0 or 1. Format: [<reading>|*:]0|1";
|
|
}
|
|
$client->{".retain"}->{$rname} = $rvalue;
|
|
}
|
|
}
|
|
|
|
my $showretain = "";
|
|
if(defined($client->{".retain"})) {
|
|
foreach my $rname (sort keys %{$client->{".retain"}}) {
|
|
my $rvalue = $client->{".retain"}->{$rname};
|
|
$rname="[state]" if ($rname eq "");
|
|
$showretain.=$rname.':'.$rvalue.' ';
|
|
}
|
|
}
|
|
$client->{"retain"} = $showretain;
|
|
last;
|
|
};
|
|
$attribute eq "IODev" and do {
|
|
if ($init_done) {
|
|
if ($command eq "set") {
|
|
client_stop($client);
|
|
$main::attr{$name}{IODev} = $value;
|
|
return client_start($client);
|
|
} else {
|
|
client_stop($client);
|
|
}
|
|
}
|
|
last;
|
|
};
|
|
}
|
|
};
|
|
|
|
sub notify_client_connected($) {
|
|
my $client = shift;
|
|
CallFn($client->{NAME},"OnClientConnectFn",($client));
|
|
}
|
|
|
|
sub notify_client_disconnected($) {
|
|
my $client = shift;
|
|
CallFn($client->{NAME},"OnClientDisconnectFn",($client));
|
|
}
|
|
|
|
sub notify_client_connection_timeout($) {
|
|
my $client = shift;
|
|
CallFn($client->{NAME},"OnClientConnectionTimeoutFn",($client));
|
|
}
|
|
|
|
sub client_start($) {
|
|
my $client = shift;
|
|
# probably internal failure
|
|
unless (defined $client) {
|
|
Log3("MQTT IODev",1,"no client device hash provided");
|
|
return "no client device hash provided";
|
|
}
|
|
|
|
# client device without IODev. probably internal failure
|
|
unless (defined $client->{IODev}) {
|
|
Log3("MQTT IODev",1,"client device hash no IODev provided");
|
|
return "client device hash no IODev provided";
|
|
}
|
|
|
|
CallFn($client->{NAME},"OnClientStartFn",($client));
|
|
|
|
unless (ref($client->{subscribe}) eq "ARRAY") {
|
|
Log3($client->{NAME},1,"unknown client or client initialization error");
|
|
return "unknown client or client initialization error";
|
|
}
|
|
|
|
if (@{$client->{subscribe}}) {
|
|
my $msgid = send_subscribe($client->{IODev},
|
|
topics => [map { [$_ => $client->{subscribeQos}->{$_} || MQTT_QOS_AT_MOST_ONCE] } @{$client->{subscribe}}],
|
|
);
|
|
$client->{message_ids}->{$msgid}++;
|
|
readingsSingleUpdate($client,"transmission-state","subscribe sent",1);
|
|
}
|
|
|
|
return undef;
|
|
};
|
|
|
|
sub client_stop($) {
|
|
my $client = shift;
|
|
|
|
if (@{$client->{subscribe}}) {
|
|
my $msgid = send_unsubscribe($client->{IODev},
|
|
topics => [@{$client->{subscribe}}],
|
|
);
|
|
$client->{message_ids}->{$msgid}++;
|
|
readingsSingleUpdate($client,"transmission-state","unsubscribe sent",1);
|
|
}
|
|
|
|
CallFn($client->{NAME},"OnClientStopFn",($client));
|
|
};
|
|
|
|
1;
|
|
__END__
|
|
|
|
=pod
|
|
=item [device]
|
|
=item summary connects fhem to MQTT
|
|
|
|
=begin html
|
|
|
|
<a id="MQTT"></a>
|
|
<h3>MQTT</h3>
|
|
<ul>
|
|
<p>connects fhem to <a href="http://mqtt.org">mqtt</a>.</p>
|
|
<p>A single MQTT device can serve multiple <a href="#MQTT_DEVICE">MQTT_DEVICE</a>, <a href="#MQTT_GENERIC_BRIDGE">MQTT_GENERIC_BRIDGE</a> clients and (<b>outdated</b>) <a href="#MQTT_BRIDGE">MQTT_BRIDGE</a> clients.<br/>
|
|
Each <a href="#MQTT_DEVICE">MQTT_DEVICE</a> acts as a bridge in between an fhem-device and mqtt.<br/>
|
|
Note: this module is based on <a href="https://metacpan.org/pod/distribution/Net-MQTT/lib/Net/MQTT.pod">Net::MQTT</a> which needs to be installed from CPAN first.</p>
|
|
<a id="MQTT-define"></a>
|
|
<p><b>Define</b></p>
|
|
<ul>
|
|
<p><code>define <name> MQTT <ip:port> [<username>] [<password>]</code></p>
|
|
<p>Specifies the MQTT device.</p>
|
|
</ul>
|
|
<a id="MQTT-set"></a>
|
|
<p><b>Set</b></p>
|
|
<ul>
|
|
<a id="MQTT-set-connect"></a>
|
|
<li>
|
|
<p><code>set <name> connect</code><br/>
|
|
(re-)connects the MQTT-device to the mqtt-broker</p>
|
|
</li>
|
|
<a id="MQTT-set-disconnect"></a>
|
|
<li>
|
|
<p><code>set <name> disconnect</code><br/>
|
|
disconnects the MQTT-device from the mqtt-broker</p>
|
|
</li>
|
|
<a id="MQTT-set-publish"></a>
|
|
<li>
|
|
<p><code>set <name> publish [qos:?] [retain:?] <topic> <message></code><br/>
|
|
sends message to the specified topic</p>
|
|
</li>
|
|
</ul>
|
|
<a id="MQTT-attr"></a>
|
|
<p><b>Attributes</b></p>
|
|
<ul>
|
|
<a id="MQTT-attr-keep-alive"></a>
|
|
<li>
|
|
<p>keep-alive<br/>
|
|
sets the keep-alive time (in seconds).</p>
|
|
</li>
|
|
<a id="MQTT-attr-keep-last-will"></a>
|
|
<li>
|
|
<p><code>attr <name> last-will [qos:?] [retain:?] <topic> <message></code><br/>
|
|
Support for MQTT feature "last will"
|
|
</p>
|
|
<p>example:<br/>
|
|
<code>attr mqtt last-will /fhem/status crashed</code>
|
|
</p>
|
|
</li>
|
|
<a id="MQTT-attr-keep-client-id"></a>
|
|
<li>
|
|
<p><code>attr <name> client-id client id</code><br/>
|
|
redefines client id
|
|
</p>
|
|
<p>example:<br/>
|
|
<code>attr mqtt client-id fhem1234567</code>
|
|
</p>
|
|
</li>
|
|
<a id="MQTT-attr-keep-last-will data-pattern="on-.*connect""></a>
|
|
<li>
|
|
<p>on-connect, on-disconnect<br/>
|
|
<code>attr <name> on-connect {Perl-expression} <topic> <message></code><br/>
|
|
Publish the specified message to a topic at connect / disconnect (counterpart to lastwill) and / or evaluation of Perl expression<br/>
|
|
If a Perl expression is provided, the message is sent only if expression returns true (for example, 1) or undef.<br/>
|
|
The following variables are passed to the expression at evaluation: $hash, $name, $qos, $retain, $topic, $message.
|
|
</p>
|
|
<p>examples:<br/>
|
|
<code>attr mqtt on-connect /topic/status connected</code><br/>
|
|
<code>attr mqtt on-connect {Log3("abc",1,"on-connect")} /fhem/status connected</code>
|
|
</p>
|
|
</li>
|
|
<a id="MQTT-attr-on-timeout"></a>
|
|
<li>
|
|
<p>on-timeout<br/>
|
|
<code>attr <name> on-timeout {Perl-expression}</code>
|
|
evaluate the given Perl expression on timeout<br/>
|
|
</p>
|
|
</li>
|
|
</ul>
|
|
</ul>
|
|
|
|
=end html
|
|
=cut
|