From e306e9d5d67f221cfb1630ead9e4ca3fc65103c5 Mon Sep 17 00:00:00 2001
From: herrmannj <>
Date: Thu, 10 Feb 2022 22:06:52 +0000
Subject: [PATCH] MQTT2_Dispatcher.pm: initial check in (Forum #126046)
git-svn-id: https://svn.fhem.de/fhem/trunk@25668 2b470e98-0d58-463d-a4d8-8e2adae1ed80
---
fhem/lib/FHEM/Utility/MQTT2_Dispatcher.pm | 307 ++++++++++++++++++++++
1 file changed, 307 insertions(+)
create mode 100755 fhem/lib/FHEM/Utility/MQTT2_Dispatcher.pm
diff --git a/fhem/lib/FHEM/Utility/MQTT2_Dispatcher.pm b/fhem/lib/FHEM/Utility/MQTT2_Dispatcher.pm
new file mode 100755
index 000000000..7e44f04fe
--- /dev/null
+++ b/fhem/lib/FHEM/Utility/MQTT2_Dispatcher.pm
@@ -0,0 +1,307 @@
+# $Id$
+###############################################################################
+#
+# This file is part of fhem.
+#
+# Fhem is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 2 of the License, or
+# (at your option) any later version.
+#
+# Fhem is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with fhem. If not, see .
+#
+#
+###############################################################################
+
+###############################################################################
+#
+# MQTT2_Dispatcher permits any fhem module to act as a MQTT2 Device
+#
+#
+###############################################################################
+
+package FHEM::Utility::MQTT2_Dispatcher;
+
+use 5.018;
+use feature qw( lexical_subs );
+
+use strict;
+use warnings;
+use utf8;
+use Carp;
+use Encode qw(encode decode find_encoding);
+
+use constant {
+ L_SUBSCRIPTION => 0,
+ L_TEMPLATE => 1,
+ L_FUNCTION => 2,
+};
+
+sub import {
+ my $import_options = join(' ', @_[1..$#_]);
+ my ($package, $filename, $line) = caller;
+
+ if (not exists($main::modules{'MQTT2_Dispatcher'})) {
+ $main::modules{'MQTT2_Dispatcher'} = __PACKAGE__->new();
+ };
+
+ no strict "refs";
+
+ if ($import_options =~ qr/\bDEFAULT\b/) {
+ # on_mqtt
+ *{$package.'::on_mqtt'} = sub {
+ my ($topic, $fn) = @_;
+ if (exists($main::modules{'MQTT2_Dispatcher'})) {
+ return $main::modules{'MQTT2_Dispatcher'}->add_listener($topic, $fn);
+ };
+ return;
+ };
+
+ # del_mqtt
+ *{$package.'::del_mqtt'} = sub {
+ my ($fn) = @_;
+ if (exists($main::modules{'MQTT2_Dispatcher'})) {
+ return $main::modules{'MQTT2_Dispatcher'}->del_listener($fn);
+ };
+ return;
+ };
+ };
+
+ _init();
+};
+
+sub _init {
+ if (exists($main::modules{'MQTT2_Dispatcher'}) and (not $main::modules{'MQTT2_Dispatcher'}->{Pendig})) {
+ $main::modules{'MQTT2_Dispatcher'}->{Pendig} = 1;
+ main::InternalTimer(0, \&_find_MQTT2, undef);
+ };
+};
+
+sub _find_MQTT2 {
+ foreach my $device (keys %main::defs) {
+ if ($main::defs{$device}->{TYPE} =~ qr/^MQTT2_SERVER$|^MQTT2_CLIENT$/) {
+ _patch_MQTT2($device);
+ };
+ };
+ $main::modules{'MQTT2_Dispatcher'}->{Pendig} = 0;
+};
+
+sub _patch_MQTT2 {
+ my ($io_name) = @_;
+ my $io_dev = $main::defs{$io_name};
+
+ return if ($io_dev->{Clients} =~ qr/:MQTT2_Dispatcher:/);
+
+ # modify io
+ $io_dev->{Clients} = ':MQTT2_Dispatcher'.$io_dev->{Clients};
+ $io_dev->{MatchList} = []; # force rebuild
+
+ # also anchor permanently at 'attr clientOrder delete'.
+ no warnings qw(redefine prototype);
+
+ if (($io_dev->{TYPE}) eq 'MQTT2_CLIENT' and not $io_dev->{'._io_patched'}) {
+ my $fn = \&main::MQTT2_CLIENT_resetClients;
+ *main::MQTT2_CLIENT_resetClients = sub {
+ $fn->(@_);
+ $_[0]->{Clients} = ':MQTT2_Dispatcher'.$_[0]->{Clients};
+ };
+ $io_dev->{'._io_patched'} = 1;
+ };
+
+ if (($io_dev->{TYPE}) eq 'MQTT2_SERVER' and not $io_dev->{'._io_patched'}) {
+ my $fn = \&main::MQTT2_SERVER_resetClients;
+ *main::MQTT2_SERVER_resetClients = sub {
+ $fn->(@_);
+ $_[0]->{Clients} = ':MQTT2_Dispatcher'.$_[0]->{Clients};
+ };
+ $io_dev->{'._io_patched'} = 1;
+ };
+
+ return;
+};
+
+sub _parse {
+ my ($io_dev, $dmsg) = @_;
+ my @msg = split(/\0/, $dmsg);
+ # print "parse $msg[2] $msg[3]\n";
+ # UTF-8 to UNICODE converation
+ eval {$msg[2] = decode(find_encoding('UTF-8'), $msg[2], Encode::FB_CROAK)};
+ if (exists($main::modules{'MQTT2_Dispatcher'})) {
+ my $dispatcher = $main::modules{'MQTT2_Dispatcher'};
+ # query listener
+ foreach my $listener (@{$dispatcher->{Listener}}) {
+ if ($msg[2] =~ $listener->[L_TEMPLATE]) {
+ # need to call it async, otherwise trigger wont work
+ main::InternalTimer(0, sub {
+ eval {$listener->[L_FUNCTION]->($msg[2], $msg[3], $io_dev->{NAME})};
+ warn $@ if $@;
+ }, undef);
+ };
+ };
+ };
+ return;
+};
+
+sub new {
+ my ($class) = @_;
+
+ my $self = {
+ Match => qr/^.*/,
+ ParseFn => \&_parse,
+ Listener => [],
+ LOADED => 1,
+ };
+
+ bless $self, $class;
+ return $self;
+};
+
+sub add_listener {
+ my ($self, $subscription, $fn) = @_;
+
+ # rereadcfg could have wiped the io patch
+ if (not $self->{Pendig} and not $main::init_done) {
+ $main::modules{'MQTT2_Dispatcher'}->{Pendig} = 1;
+ main::InternalTimer(0, \&_find_MQTT2, undef);
+ };
+
+ my @parts = split('/', $subscription);
+ my $template = '^';
+ my $seperator = '';
+
+ eval {
+ while (defined(my $p = shift @parts)) {
+ if ($p eq '#') {
+ $template .= '(?:$|'.$seperator.'.+)';
+ croak(q('#' must be the last character)) if (scalar @parts);
+ } elsif ($p eq '+') {
+ $template .= $seperator.'(?:[^\/]*)';
+ $seperator = '\/';
+ } elsif ($p eq '') {
+ $template .= $seperator;
+ $seperator = '\/';
+ } else {
+ croak(q('#' and '+' cannot be part of a topic name)) if ($p =~ m/[#+]/);
+ # escape Special Regex Characters
+ $p =~ s/([\.\+\*\?\^\$\(\)\[\]\{\}\|\\])/\\$1/g;
+ $template .= $seperator.$p;
+ $seperator = '\/';
+ };
+ };
+ $template .= '$';
+ } or do {
+ return;
+ };
+
+ my $r = qr/$template/;
+
+ push @{$self->{Listener}}, [$subscription, $r, $fn];
+ return $fn;
+};
+
+sub del_listener {
+ my ($self, $fn) = @_;
+ if ($fn and exists($main::modules{'MQTT2_Dispatcher'})) {
+ my $dispatcher = $main::modules{'MQTT2_Dispatcher'};
+ @{$dispatcher->{Listener}} = grep {$_->[L_FUNCTION] ne $fn} @{$dispatcher->{Listener}};
+ };
+ return;
+};
+
+sub DESTROY {
+ my $self = shift;
+ $self->SUPER::DESTROY if $self->can("SUPER::DESTROY");
+ return;
+};
+
+1;
+__END__
+
+=head1 NAME
+
+FHEM::Utility::MQTT2_Dispatcher - Subscribe to MQTT messages in fhem modules
+
+=head1 SYNOPSIS
+
+ package main;
+
+ use FHEM::Utility::MQTT2_Dispatcher qw( :DEFAULT );
+
+ sub X_Define {
+ ...
+
+ $hash->{Listener} = on_mqtt('home/#', sub {
+ X_MQTT($hash, @_);
+ }) or do {
+ Log3($hash, 2, $@);
+ };
+ };
+
+ sub X_MQTT {
+ my ($hash, $topic, $value, $io_name) = @_;
+ };
+
+ # required for rereadcfg etc
+ sub X_Undef {
+ ...
+ del_mqtt($hash->{Listener});
+ return;
+ };
+
+=head1 DESCRIPTION
+
+This module provides a couple of helper functions to let a fhem module
+subscripe to MQTT topics using a defined MQTT2_SERVER or MQTT2_CLIENT.
+
+=head1 FUNCTIONS
+
+=over 4
+
+=item C
+
+This function subscribes to a given C<$topic>. MQTT wildcards are allowed. The
+MQTT2 IO device, of course, must receive that topic. The coderef can be
+an anonyious sub or a variable holding one. This function receive 3 params:
+the C<$topic> (unicode), the C<$value> (raw) and the C<$name> of the receiving
+IO device.
+
+Returns an identifier that is used to delete that handler at any time. Undef
+in case of an error.
+
+Multiple handlers can be installed at the same time.
+
+=item C
+
+This function removes (deletes) the handler specified with C<$id>. It must be
+called in C function of the embedding fhem module.
+
+=back
+
+=head1 AUTHOR
+
+Joerg Herrmann
+
+=head1 COPYRIGHT AND LICENSE
+
+This file is part of fhem.
+
+Fhem is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 2 of the License, or
+(at your option) any later version.
+
+Fhem is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with fhem. If not, see .
+
+=cut