2
0
mirror of https://github.com/fhem/fhem-mirror.git synced 2025-03-10 09:16:53 +00:00

MQTT: initial checkin of MQTT and MQTT_DEVICE

git-svn-id: https://svn.fhem.de/fhem/trunk@6648 2b470e98-0d58-463d-a4d8-8e2adae1ed80
This commit is contained in:
ntruchsess 2014-10-01 10:53:51 +00:00
parent bf66e70e90
commit 7962529c09
21 changed files with 2586 additions and 0 deletions

352
fhem/FHEM/00_MQTT.pm Normal file
View File

@ -0,0 +1,352 @@
##############################################
#
# fhem bridge to mqtt (see http://mqtt.org)
#
# Copyright (C) 2014 Norbert Truchsess
#
# This file is part of fhem.
#
# Fhem is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# Fhem is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with fhem. If not, see <http://www.gnu.org/licenses/>.
#
# $Id$
#
##############################################
use strict;
use warnings;
use GPUtils qw(:all);
use Net::MQTT::Constants;
use Net::MQTT::Message;
require "10_MQTT_DEVICE.pm";
my %sets = (
"connect" => "",
"disconnect" => "",
);
my %gets = (
"version" => ""
);
my @clients = qw(
MQTT_DEVICE
);
sub MQTT_Initialize($) {
my $hash = shift @_;
require "$main::attr{global}{modpath}/FHEM/DevIo.pm";
# Provider
$hash->{Clients} = join (':',@clients);
$hash->{ReadyFn} = "MQTT_Ready";
$hash->{ReadFn} = "MQTT_Read";
# Consumer
$hash->{DefFn} = "MQTT_Define";
$hash->{UndefFn} = "MQTT_Undef";
$hash->{SetFn} = "MQTT_Set";
$hash->{NotifyFn} = "MQTT_Notify";
$hash->{AttrList} = "keep-alive";
}
sub MQTT_Define($$) {
my ( $hash, $def ) = @_;
$hash->{NOTIFYDEV} = "global";
$hash->{msgid} = 1;
if ($main::init_done) {
return MQTT_Start($hash);
} else {
return undef;
}
}
sub MQTT_Undef($) {
MQTT_Stop(shift);
}
sub MQTT_Set($@) {
my ($hash, @a) = @_;
return "Need at least one parameters" if(@a < 2);
return "Unknown argument $a[1], choose one of " . join(" ", sort keys %sets)
if(!defined($sets{$a[1]}));
my $command = $a[1];
my $value = $a[2];
COMMAND_HANDLER: {
$command eq "connect" and do {
MQTT_Start($hash);
last;
};
$command eq "disconnect" and do {
MQTT_Stop($hash);
last;
};
};
}
sub MQTT_Notify($$) {
my ($hash,$dev) = @_;
if( grep(m/^(INITIALIZED|REREADCFG)$/, @{$dev->{CHANGED}}) ) {
MQTT_Start($hash);
} elsif( grep(m/^SAVE$/, @{$dev->{CHANGED}}) ) {
}
}
sub MQTT_Start($) {
my $hash = shift;
my ($dev) = split("[ \t]+", $hash->{DEF});
$hash->{DeviceName} = $dev;
DevIo_CloseDev($hash);
return DevIo_OpenDev($hash, 0, "MQTT_Init");
}
sub MQTT_Stop($) {
my $hash = shift;
MQTT_send_disconnect($hash);
DevIo_CloseDev($hash);
main::RemoveInternalTimer($hash);
main::readingsSingleUpdate($hash,"connection","disconnected",1);
}
sub MQTT_Ready($) {
my $hash = shift;
return DevIo_OpenDev($hash, 1, "MQTT_Init") if($hash->{STATE} eq "disconnected");
}
sub MQTT_Init($) {
my $hash = shift;
MQTT_send_connect($hash);
main::readingsSingleUpdate($hash,"connection","connecting",1);
$hash->{ping_received}=1;
MQTT_Timer($hash);
return undef;
}
sub MQTT_Timer($) {
my $hash = shift;
main::RemoveInternalTimer($hash);
main::readingsSingleUpdate($hash,"connection","timed-out",1) unless $hash->{ping_received};
$hash->{ping_received} = 0;
main::InternalTimer(gettimeofday()+main::AttrVal($hash-> {NAME},"keep-alive",60), "MQTT_Timer", $hash, 0);
MQTT_send_ping($hash);
}
sub MQTT_Read {
my ($hash) = @_;
my $name = $hash->{NAME};
my $buf = DevIo_SimpleRead($hash);
return undef unless $buf;
$hash->{buf} .= $buf;
while (my $mqtt = Net::MQTT::Message->new_from_bytes($hash->{buf},1)) {
my $message_type = $mqtt->message_type();
main::Log3($name,5,"MQTT $name message received: ".$mqtt->string());
MESSAGE_TYPE: {
$message_type == MQTT_CONNACK and do {
readingsSingleUpdate($hash,"connection","connected",1);
GP_ForallClients($hash,\&MQTT_DEVICE_Start);
last;
};
$message_type == MQTT_PUBLISH and do {
my $topic = $mqtt->topic();
GP_ForallClients($hash,sub {
my $client = shift;
main::Log3($client->{NAME},5,"publish received for $topic, ".$mqtt->message());
if (grep { $_ eq $topic } @{$client->{subscribe}}) {
readingsSingleUpdate($client,"transmission-state","publish received",1);
MQTT_DEVICE_onmessage($client,$topic,$mqtt->message());
};
},undef);
last;
};
$message_type == MQTT_PUBACK and do {
my $message_id = $mqtt->message_id();
GP_ForallClients($hash,sub {
my $client = shift;
if ($client->{message_ids}->{$message_id}) {
readingsSingleUpdate($client,"transmission-state","pubacknowledge received",1);
delete $client->{message_ids}->{$message_id};
};
},undef);
last;
};
$message_type == MQTT_PUBREC and do {
my $message_id = $mqtt->message_id();
GP_ForallClients($hash,sub {
my $client = shift;
if ($client->{message_ids}->{$message_id}) {
readingsSingleUpdate($client,"transmission-state","pubreceive received",1);
delete $client->{message_ids}->{$message_id};
};
},undef);
last;
};
$message_type == MQTT_PUBREL and do {
my $message_id = $mqtt->message_id();
GP_ForallClients($hash,sub {
my $client = shift;
if ($client->{message_ids}->{$message_id}) {
readingsSingleUpdate($client,"transmission-state","pubrelease received",1);
delete $client->{message_ids}->{$message_id};
};
},undef);
last;
};
$message_type == MQTT_PUBCOMP and do {
my $message_id = $mqtt->message_id();
GP_ForallClients($hash,sub {
my $client = shift;
if ($client->{message_ids}->{$message_id}) {
readingsSingleUpdate($client,"transmission-state","pubcomplete received",1);
delete $client->{message_ids}->{$message_id};
};
},undef);
last;
};
$message_type == MQTT_SUBACK and do {
my $message_id = $mqtt->message_id();
GP_ForallClients($hash,sub {
my $client = shift;
if ($client->{message_ids}->{$message_id}) {
readingsSingleUpdate($client,"transmission-state","subscription acknowledged",1);
delete $client->{message_ids}->{$message_id};
};
},undef);
last;
};
$message_type == MQTT_UNSUBACK and do {
my $message_id = $mqtt->message_id();
GP_ForallClients($hash,sub {
my $client = shift;
if ($client->{message_ids}->{$message_id}) {
readingsSingleUpdate($client,"transmission-state","unsubscription acknowledged",1);
delete $client->{message_ids}->{$message_id};
};
},undef);
last;
};
$message_type == MQTT_PINGRESP and do {
$hash->{ping_received} = 1;
main::readingsSingleUpdate($hash,"connection","active",1);
last;
};
main::Log3($hash->{NAME},4,"MQTT_Read '$hash->{NAME}' unexpected message type '".message_type_string($message_type)."'");
}
}
return undef;
};
sub MQTT_send_connect($) {
my $hash = shift;
return MQTT_send_message($hash, message_type => MQTT_CONNECT, keep_alive_timer => main::AttrVal($hash->{NAME},"keep-alive",60));
};
sub MQTT_send_publish($@) {
return MQTT_send_message(shift, message_type => MQTT_PUBLISH, @_);
};
sub MQTT_send_subscribe($@) {
my $hash = shift;
return MQTT_send_message($hash, message_type => MQTT_SUBSCRIBE, @_);
};
sub MQTT_send_unsubscribe($@) {
return MQTT_send_message(shift, message_type => MQTT_UNSUBSCRIBE, @_);
};
sub MQTT_send_ping($) {
return MQTT_send_message(shift, message_type => MQTT_PINGREQ);
};
sub MQTT_send_disconnect($) {
return MQTT_send_message(shift, message_type => MQTT_DISCONNECT);
};
sub MQTT_send_message($$$@) {
my $hash = shift;
my $name = $hash->{NAME};
my $msgid = $hash->{msgid}++;
my $msg = Net::MQTT::Message->new(message_id => $msgid,@_);
main::Log3($name,5,"MQTT $name message sent: ".$msg->string());
DevIo_SimpleWrite($hash,$msg->bytes);
return $msgid;
};
1;
=pod
=begin html
<a name="MQTT"></a>
<h3>MQTT</h3>
<ul>
connects fhem to <a href="http://mqtt.org">mqtt</a>
<br><br>
A single MQTT device can serve multiple <a href="#MQTT_DEVICE">MQTT_DEVICE</a> clients.<br><br>
Each <a href="#MQTT_DEVICE">MQTT_DEVICE</a> acts as a bridge in between an fhem-device and mqtt.
Note: this module is based on module <a href="https://metacpan.org/pod/distribution/Net-MQTT/lib/Net/MQTT.pod>Net::MQTT</a>.
<br><br>
<a name="MQTTdefine"></a>
<b>Define</b><br>
<ul><br>
<code>define &lt;name&gt; MQTT &lt;ip:port&gt;</code> <br>
Specifies the MQTT device.<br>
<br>
<br>
<a name="MQTTset"></a>
<b>Set</b>
<ul>
<li>
<code>set &lt;name&gt; connect</code><br>
(re-)connects the MQTT-device to the mqtt-broker
</li><br>
<li>
<code>set &lt;name&gt; disconnect</code><br>
disconnects the MQTT-device from the mqtt-broker
</li>
</ul>
<br><br>
<a name="MQTTattr"></a>
<b>Attributes</b><br>
<ul>
<li>keep-alive<br>
sets the keep-alive time (in seconds).
</li>
</ul>
</ul>
</ul>
<br>
=end html
=cut

272
fhem/FHEM/10_MQTT_DEVICE.pm Normal file
View File

@ -0,0 +1,272 @@
##############################################
#
# fhem bridge to mqtt (see http://mqtt.org)
#
# Copyright (C) 2014 Norbert Truchsess
#
# This file is part of fhem.
#
# Fhem is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# Fhem is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with fhem. If not, see <http://www.gnu.org/licenses/>.
#
# $Id$
#
##############################################
use strict;
use warnings;
use GPUtils qw(:all);
use Net::MQTT::Constants;
use Net::MQTT::Message;
my %sets = (
);
my %gets = (
"version" => "",
"readings" => ""
);
my %qos = map {qos_string($_) => $_} (MQTT_QOS_AT_MOST_ONCE,MQTT_QOS_AT_LEAST_ONCE,MQTT_QOS_EXACTLY_ONCE);
sub MQTT_DEVICE_Initialize($) {
my $hash = shift @_;
# Consumer
$hash->{DefFn} = "MQTT_DEVICE_Define";
$hash->{UndefFn} = "MQTT_DEVICE_Undef";
#$hash->{SetFn} = "MQTT_DEVICE_Set";
$hash->{GetFn} = "MQTT_DEVICE_Get";
$hash->{NotifyFn} = "MQTT_DEVICE_Notify";
$hash->{AttrFn} = "MQTT_DEVICE_Attr";
$hash->{AttrList} =
"IODev ".
"qos:".join(",",keys %qos)." ".
"publish-topic-base ".
"publishState ".
"publishReading_.* ".
"subscribeSet ".
"subscribeSet_.* ".
$main::readingFnAttributes;
}
sub MQTT_DEVICE_Define($$) {
my ( $hash, $def ) = @_;
$hash->{NOTIFYDEV} = $hash->{DEF};
$hash->{qos} = MQTT_QOS_AT_MOST_ONCE;
$hash->{subscribe} = [];
if ($main::init_done) {
return MQTT_DEVICE_Start($hash);
} else {
return undef;
}
}
sub MQTT_DEVICE_Undef($) {
MQTT_DEVICE_Stop(shift);
}
sub MQTT_DEVICE_Get($$@) {
my ($hash, $name, $command) = @_;
return "Need at least one parameters" unless (defined $command);
return "Unknown argument $command, choose one of " . join(" ", sort keys %gets)
unless (defined($gets{$command}));
COMMAND_HANDLER: {
# populate dynamically from keys %{$defs{$sdev}{READINGS}}
$command eq "readings" and do {
my $base = AttrVal($name,"publish-topic-base","/$hash->{DEF}/");
foreach my $reading (keys %{$main::defs{$hash->{DEF}}{READINGS}}) {
unless (defined main::AttrVal($name,"publishReading_$reading",undef)) {
main::CommandAttr($hash,"$name publishReading_$reading $base$reading");
}
};
last;
};
};
}
sub MQTT_DEVICE_Notify() {
my ($hash,$dev) = @_;
main::Log3($hash->{NAME},5,"Notify for $dev->{NAME}");
foreach my $event (@{$dev->{CHANGED}}) {
$event =~ /^([^:]+)(: )?(.*)$/;
main::Log3($hash->{NAME},5,"$event, '".((defined $1) ? $1 : "-undef-")."', '".((defined $3) ? $3 : "-undef-")."'");
if (defined $3 and $3 ne "") {
if (defined $hash->{publishReadings}->{$1}) {
MQTT_send_publish($hash->{IODev}, topic => $hash->{publishReadings}->{$1}, message => $3, qos => $hash->{qos});
readingsSingleUpdate($hash,"transmission-state","publish sent",1);
}
} else {
if (defined $hash->{publishState}) {
MQTT_send_publish($hash->{IODev}, topic => $hash->{publishState}, message => $1, qos => $hash->{qos});
readingsSingleUpdate($hash,"transmission-state","publish sent",1);
}
}
}
}
sub MQTT_DEVICE_Attr($$$$) {
my ($command,$name,$attribute,$value) = @_;
my $hash = $main::defs{$name};
ATTRIBUTE_HANDLER: {
$attribute =~ /^subscribeSet(_?)(.*)/ and do {
if ($command eq "set") {
$hash->{subscribeSets}->{$value} = $2;
push @{$hash->{subscribe}},$value unless grep {$_ eq $value} @{$hash->{subscribe}};
if ($main::init_done) {
if (my $mqtt = $hash->{IODev}) {;
my $msgid = MQTT_send_subscribe($mqtt,
topics => [[$value => $hash->{qos} || MQTT_QOS_AT_MOST_ONCE]],
);
$hash->{message_ids}->{$msgid}++;
readingsSingleUpdate($hash,"transmission-state","subscribe sent",1)
}
}
} else {
if ($main::init_done) {
if (my $mqtt = $hash->{IODev}) {;
my $msgid = MQTT_send_unsubscribe($mqtt,
topics => [$value],
);
$hash->{message_ids}->{$msgid}++;
}
}
delete $hash->{subscribeSets}->{$value};
$hash->{subscribe} = [grep { $_ != $value } @{$hash->{subscribe}}];
}
last;
};
$attribute eq "publishState" and do {
if ($command eq "set") {
$hash->{publishState} = $value;
} else {
delete $hash->{publishState};
}
last;
};
$attribute =~ /^publishReading_(.+)$/ and do {
if ($command eq "set") {
$hash->{publishReadings}->{$1} = $value;
} else {
delete $hash->{publishReadings}->{$1};
}
last;
};
$attribute eq "qos" and do {
if ($command eq "set") {
$hash->{qos} = $qos{$value};
} else {
$hash->{qos} = MQTT_QOS_AT_MOST_ONCE;
}
last;
};
$attribute eq "IODev" and do {
if ($command eq "set") {
} else {
}
last;
};
}
}
sub MQTT_DEVICE_Start($) {
my $hash = shift;
my $name = $hash->{NAME};
if (! (defined AttrVal($name,"stateFormat",undef))) {
$main::attr{$name}{stateFormat} = "transmission-state";
}
if (@{$hash->{subscribe}}) {
my $msgid = MQTT_send_subscribe($hash->{IODev},
topics => [map { [$_ => $hash->{qos} || MQTT_QOS_AT_MOST_ONCE] } @{$hash->{subscribe}}],
);
$hash->{message_ids}->{$msgid}++;
readingsSingleUpdate($hash,"transmission-state","subscribe sent",1)
}
}
sub MQTT_DEVICE_Stop($) {
my $hash = shift;
if (@{$hash->{subscribe}}) {
my $msgid = MQTT_send_unsubscribe($hash->{IODev},
topics => [@{$hash->{subscribe}}],
);
$hash->{message_ids}->{$msgid}++;
readingsSingleUpdate($hash,"transmission-state","unsubscribe sent",1)
}
}
sub MQTT_DEVICE_onmessage($$$) {
my ($hash,$topic,$message) = @_;
if (defined (my $command = $hash->{subscribeSets}->{$topic})) {
my @args = split ("[ \t]+",$message);
if ($command eq "") {
main::Log3($hash->{NAME},5,"calling DoSet($hash->{DEF}".(@args ? ",".join(",",@args) : ""));
main::DoSet($hash->{DEF},@args);
} else {
main::Log3($hash->{NAME},5,"calling DoSet($hash->{DEF},$command".(@args ? ",".join(",",@args) : ""));
main::DoSet($hash->{DEF},$command,@args);
}
}
}
1;
=pod
=begin html
<a name="MQTT_DEVICE"></a>
<h3>MQTT</h3>
<ul>
acts as a bridge in between an fhem-device and <a href="http://mqtt.org">mqtt</a>-topics.
<br><br>
requires a <a href="#MQTT">MQTT</a>-device as IODev<br><br>
Note: this module is based on module <a href="https://metacpan.org/pod/distribution/Net-MQTT/lib/Net/MQTT.pod>Net::MQTT</a>.
<br><br>
<a name="MQTT_DEVICEdefine"></a>
<b>Define</b><br>
<ul><br>
<code>define &lt;name&gt; MQTT_DEVICE &lt;fhem-device-name&gt;</code> <br>
Specifies the MQTT device.<br>
&lt;fhem-device-name&gt; is the fhem-device this MQTT_DEVICE is linked to.<br>
<br>
<a name="MQTT_DEVICEget"></a>
<b>Get</b>
<ul>
<li>
<code>get &lt;name&gt; readings</code><br>
retrieves all existing readings from fhem-device and configures (default-)topics for them.
</li><br>
</ul>
<br><br>
<a name="MQTTattr"></a>
<b>Attributes</b><br>
<ul>
<li>...<br>
...
</li>
</ul>
</ul>
</ul>
<br>
=end html
=cut

View File

@ -0,0 +1,59 @@
use strict;
use warnings;
package Net::MQTT;
$Net::MQTT::VERSION = '1.142010';
# ABSTRACT: Perl modules for MQTT Protocol (http://mqtt.org/)
1;
__END__
=pod
=encoding UTF-8
=head1 NAME
Net::MQTT - Perl modules for MQTT Protocol (http://mqtt.org/)
=head1 VERSION
version 1.142010
=head1 SYNOPSIS
# net-mqtt-sub /topic
# net-mqtt-pub /topic message
# net-mqtt-trace mqtt.tcp
=head1 DESCRIPTION
Low level API for the MQTT protocol described at L<http://mqtt.org>.
B<IMPORTANT:> This is an early release and the API is still subject to
change.
=head1 DISCLAIMER
This is B<not> official IBM code. I work for IBM but I'm writing this
in my spare time (with permission) for fun.
=head1 SEE ALSO
net-mqtt-sub(1), net-mqtt-pub(1), net-mqtt-trace(1), Net::MQTT::Message(3)
=head1 AUTHOR
Mark Hindess <soft-cpan@temporalanomaly.com>
=head1 COPYRIGHT AND LICENSE
This software is copyright (c) 2014 by Mark Hindess.
This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.
=cut

View File

@ -0,0 +1,270 @@
use strict;
use warnings;
package Net::MQTT::Constants;
$Net::MQTT::Constants::VERSION = '1.142010';
# ABSTRACT: Module to export constants for MQTT protocol
use Carp qw/croak/;
my %constants =
(
MQTT_CONNECT => 0x1,
MQTT_CONNACK => 0x2,
MQTT_PUBLISH => 0x3,
MQTT_PUBACK => 0x4,
MQTT_PUBREC => 0x5,
MQTT_PUBREL => 0x6,
MQTT_PUBCOMP => 0x7,
MQTT_SUBSCRIBE => 0x8,
MQTT_SUBACK => 0x9,
MQTT_UNSUBSCRIBE => 0xa,
MQTT_UNSUBACK => 0xb,
MQTT_PINGREQ => 0xc,
MQTT_PINGRESP => 0xd,
MQTT_DISCONNECT => 0xe,
MQTT_QOS_AT_MOST_ONCE => 0x0,
MQTT_QOS_AT_LEAST_ONCE => 0x1,
MQTT_QOS_EXACTLY_ONCE => 0x2,
MQTT_CONNECT_ACCEPTED => 0,
MQTT_CONNECT_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION => 1,
MQTT_CONNECT_REFUSED_IDENTIFIER_REJECTED => 2,
MQTT_CONNECT_REFUSED_SERVER_UNAVAILABLE => 3,
MQTT_CONNECT_REFUSED_BAD_USER_NAME_OR_PASSWORD => 4,
MQTT_CONNECT_REFUSED_NOT_AUTHORIZED => 5,
);
sub import {
no strict qw/refs/; ## no critic
my $pkg = caller(0);
foreach (keys %constants) {
my $v = $constants{$_};
*{$pkg.'::'.$_} = sub () { $v };
}
foreach (qw/decode_byte encode_byte
decode_short encode_short
decode_string encode_string
decode_remaining_length encode_remaining_length
qos_string
message_type_string
dump_string
connect_return_code_string
/) {
*{$pkg.'::'.$_} = \&{$_};
}
}
sub decode_remaining_length {
my ($data, $offset) = @_;
my $multiplier = 1;
my $v = 0;
my $d;
do {
$d = decode_byte($data, $offset);
$v += ($d&0x7f) * $multiplier;
$multiplier *= 128;
} while ($d&0x80);
$v
}
sub encode_remaining_length {
my $v = shift;
my $o;
my $d;
do {
$d = $v % 128;
$v = int($v/128);
if ($v) {
$d |= 0x80;
}
$o .= encode_byte($d);
} while ($d&0x80);
$o;
}
sub decode_byte {
my ($data, $offset) = @_;
croak 'decode_byte: insufficient data' unless (length $data >= $$offset+1);
my $res = unpack 'C', substr $data, $$offset, 1;
$$offset++;
$res
}
sub encode_byte {
pack 'C', $_[0];
}
sub decode_short {
my ($data, $offset) = @_;
croak 'decode_short: insufficient data' unless (length $data >= $$offset+2);
my $res = unpack 'n', substr $data, $$offset, 2;
$$offset += 2;
$res;
}
sub encode_short {
pack 'n', $_[0];
}
sub decode_string {
my ($data, $offset) = @_;
my $len = decode_short($data, $offset);
croak 'decode_string: insufficient data'
unless (length $data >= $$offset+$len);
my $res = substr $data, $$offset, $len;
$$offset += $len;
$res;
}
sub encode_string {
pack "n/a*", $_[0];
}
sub qos_string {
[qw/at-most-once at-least-once exactly-once reserved/]->[$_[0]]
}
sub message_type_string {
[qw/Reserved0 Connect ConnAck Publish PubAck PubRec PubRel PubComp
Subscribe SubAck Unsubscribe UnsubAck PingReq PingResp Disconnect
Reserved15/]->[$_[0]];
}
sub dump_string {
my $data = shift || '';
my $prefix = shift || '';
$prefix .= ' ';
my @lines;
while (length $data) {
my $d = substr $data, 0, 16, '';
my $line = unpack 'H*', $d;
$line =~ s/([A-F0-9]{2})/$1 /ig;
$d =~ s/[^ -~]/./g;
$line = sprintf "%-48s %s", $line, $d;
push @lines, $line
}
scalar @lines ? "\n".$prefix.(join "\n".$prefix, @lines) : ''
}
sub connect_return_code_string {
[
'Connection Accepted',
'Connection Refused: unacceptable protocol version',
'Connection Refused: identifier rejected',
'Connection Refused: server unavailable',
'Connection Refused: bad user name or password',
'Connection Refused: not authorized',
]->[$_[0]] || 'Reserved'
}
__END__
=pod
=encoding UTF-8
=head1 NAME
Net::MQTT::Constants - Module to export constants for MQTT protocol
=head1 VERSION
version 1.142010
=head1 SYNOPSIS
use Net::MQTT::Constants;
=head1 DESCRIPTION
Module to export constants for MQTT protocol.
=head1 C<FUNCTIONS>
=head2 C<decode_remaining_length( $data, \$offset )>
Calculates the C<remaining length> from the bytes in C<$data> starting
at the offset read from the scalar reference. The offset reference is
subsequently incremented by the number of bytes processed.
=head2 C<encode_remaining_length( $length )>
Calculates the C<remaining length> bytes from the length, C<$length>,
and returns the packed bytes as a string.
=head2 C<decode_byte( $data, \$offset )>
Returns a byte by unpacking it from C<$data> starting at the offset
read from the scalar reference. The offset reference is subsequently
incremented by the number of bytes processed.
=head2 C<encode_byte( $byte )>
Returns a packed byte.
=head2 C<decode_short( $data, \$offset )>
Returns a short (two bytes) by unpacking it from C<$data> starting at
the offset read from the scalar reference. The offset reference is
subsequently incremented by the number of bytes processed.
=head2 C<encode_short( $short )>
Returns a packed short (two bytes).
=head2 C<decode_string( $data, \$offset )>
Returns a string (short length followed by length bytes) by unpacking
it from C<$data> starting at the offset read from the scalar
reference. The offset reference is subsequently incremented by the
number of bytes processed.
=head2 C<encode_string( $string )>
Returns a packed string (length as a short and then the bytes of the
string).
=head2 C<qos_string( $qos_value )>
Returns a string describing the given QoS value.
=head2 C<message_type_string( $message_type_value )>
Returns a string describing the given C<message_type> value.
=head2 C<dump_string( $data )>
Returns a string representation of arbitrary data - as a string if it
contains only printable characters or as a hex dump otherwise.
=head2 C<connect_return_code_string( $return_code_value )>
Returns a string describing the given C<connect_return_code> value.
=head1 AUTHOR
Mark Hindess <soft-cpan@temporalanomaly.com>
=head1 COPYRIGHT AND LICENSE
This software is copyright (c) 2014 by Mark Hindess.
This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.
=cut

View File

@ -0,0 +1,246 @@
use strict;
use warnings;
package Net::MQTT::Message;
$Net::MQTT::Message::VERSION = '1.142010';
# ABSTRACT: Perl module to represent MQTT messages
use Net::MQTT::Constants qw/:all/;
use Module::Pluggable search_path => __PACKAGE__, require => 1;
our %types;
foreach (plugins()) {
my $m = $_.'::message_type';
next unless (defined &{$m}); # avoid super classes
my $t = $_->message_type;
if (exists $types{$t}) {
die 'Duplicate message_type number ', $t, ":\n",
' ', $_, " and\n",
' ', $types{$t}, "\n";
}
$types{$t} = $_;
}
sub new {
my ($pkg, %p) = @_;
my $type_pkg =
exists $types{$p{message_type}} ? $types{$p{message_type}} : $pkg;
bless { %p }, $type_pkg;
}
sub new_from_bytes {
my ($pkg, $bytes, $splice) = @_;
my %p;
return if (length $bytes < 2);
my $offset = 0;
my $b = decode_byte($bytes, \$offset);
$p{message_type} = ($b&0xf0) >> 4;
$p{dup} = ($b&0x8)>>3;
$p{qos} = ($b&0x6)>>1;
$p{retain} = ($b&0x1);
my $length;
eval {
$length = decode_remaining_length($bytes, \$offset);
};
return if ($@);
if (length $bytes < $offset+$length) {
return
}
substr $_[1], 0, $offset+$length, '' if ($splice);
$p{remaining} = substr $bytes, $offset, $length;
my $self = $pkg->new(%p);
$self->_parse_remaining();
$self;
}
sub _parse_remaining {
}
sub message_type { shift->{message_type} }
sub dup { shift->{dup} || 0 }
sub qos {
my $self = shift;
defined $self->{qos} ? $self->{qos} : $self->_default_qos
}
sub _default_qos {
MQTT_QOS_AT_MOST_ONCE
}
sub retain { shift->{retain} || 0 }
sub remaining { shift->{remaining} || '' }
sub _remaining_string {
my ($self, $prefix) = @_;
dump_string($self->remaining, $prefix);
}
sub _remaining_bytes { shift->remaining }
sub string {
my ($self, $prefix) = @_;
$prefix = '' unless (defined $prefix);
my @attr;
push @attr, qos_string($self->qos);
foreach (qw/dup retain/) {
my $bool = $self->$_;
push @attr, $_ if ($bool);
}
my $r = $self->_remaining_string($prefix);
$prefix.message_type_string($self->message_type).
'/'.(join ',', @attr).($r ? ' '.$r : '')
}
sub bytes {
my ($self) = shift;
my $o = '';
my $b =
($self->message_type << 4) | ($self->dup << 3) |
($self->qos << 1) | $self->retain;
$o .= encode_byte($b);
my $remaining = $self->_remaining_bytes;
$o .= encode_remaining_length(length $remaining);
$o .= $remaining;
$o;
}
1;
__END__
=pod
=encoding UTF-8
=head1 NAME
Net::MQTT::Message - Perl module to represent MQTT messages
=head1 VERSION
version 1.142010
=head1 SYNOPSIS
use Net::MQTT::Constants;
use Net::MQTT::Message;
use IO::Socket::INET;
my $socket = IO::Socket::INET->new(PeerAddr => '127.0.0.1:1883');
my $mqtt = Net::MQTT::Message->new(message_type => MQTT_CONNECT);
print $socket $mqtt->bytes;
my $tcp_payload = pack 'H*', '300d000774657374696e6774657374';
$mqtt = Net::MQTT::Message->new_from_bytes($tcp_payload);
print 'Received: ', $mqtt->string, "\n";
=head1 DESCRIPTION
This module encapsulates a single MQTT message. It uses subclasses to
represent specific message types.
=head1 METHODS
=head2 C<new( %parameters )>
Constructs an L<Net::MQTT::Message> object based on the given
parameters. The common parameter keys are:
=over
=item C<message_type>
The message type field of the MQTT message. This should be an integer
between 0 and 15 inclusive. The module L<Net::MQTT::Constants>
provides constants that can be used for this value. This parameter
is required.
=item C<dup>
The duplicate flag field of the MQTT message. This should be either 1
or 0. The default is 0.
=item C<qos>
The QoS field of the MQTT message. This should be an integer between
0 and 3 inclusive. The default is as specified in the spec or 0 ("at
most once") otherwise. The module L<Net::MQTT::Constants> provides
constants that can be used for this value.
=item C<retain>
The retain flag field of the MQTT message. This should be either 1
or 0. The default is 0.
=back
The remaining keys are dependent on the specific message type. The
documentation for the subclasses for each message type list methods
with the same name as the required keys.
=head2 C<new_from_bytes( $packed_bytes, [ $splice ] )>
Attempts to constructs an L<Net::MQTT::Message> object based on
the given packed byte string. If there are insufficient bytes, then
undef is returned. If the splice parameter is provided and true, then
the processed bytes are removed from the scalar referenced by the
$packed_bytes parameter.
=head2 C<message_type()>
Returns the message type field of the MQTT message. The module
L<Net::MQTT::Constants> provides a function, C<message_type_string>,
that can be used to convert this value to a human readable string.
=head2 C<dup()>
The duplicate flag field of the MQTT message.
=head2 C<qos()>
The QoS field of the MQTT message. The module
L<Net::MQTT::Constants> provides a function, C<qos_string>, that
can be used to convert this value to a human readable string.
=head2 C<retain()>
The retain field of the MQTT message.
=head2 C<remaining()>
This contains a packed string of bytes with any of the payload of the
MQTT message that was not parsed by these modules. This should not
be required for packets that strictly follow the standard.
=head2 C<string([ $prefix ])>
Returns a summary of the message as a string suitable for logging.
If provided, each line will be prefixed by the optional prefix.
=head2 C<bytes()>
Returns the bytes of the message suitable for writing to a socket.
=head1 AUTHOR
Mark Hindess <soft-cpan@temporalanomaly.com>
=head1 COPYRIGHT AND LICENSE
This software is copyright (c) 2014 by Mark Hindess.
This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.
=cut

View File

@ -0,0 +1,92 @@
use strict;
use warnings;
package Net::MQTT::Message::ConnAck;
$Net::MQTT::Message::ConnAck::VERSION = '1.142010';
# ABSTRACT: Perl module to represent an MQTT ConnAck message
use base 'Net::MQTT::Message';
use Net::MQTT::Constants qw/:all/;
sub message_type {
2
}
sub connack_reserved { shift->{connack_reserved} || 0 }
sub return_code { shift->{return_code} || MQTT_CONNECT_ACCEPTED }
sub _remaining_string {
my ($self, $prefix) = @_;
connect_return_code_string($self->return_code).
' '.$self->SUPER::_remaining_string($prefix)
}
sub _parse_remaining {
my $self = shift;
my $offset = 0;
$self->{connack_reserved} = decode_byte($self->{remaining}, \$offset);
$self->{return_code} = decode_byte($self->{remaining}, \$offset);
substr $self->{remaining}, 0, $offset, '';
}
sub _remaining_bytes {
my $self = shift;
my $o = encode_byte($self->connack_reserved);
$o .= encode_byte($self->return_code);
}
1;
__END__
=pod
=encoding UTF-8
=head1 NAME
Net::MQTT::Message::ConnAck - Perl module to represent an MQTT ConnAck message
=head1 VERSION
version 1.142010
=head1 SYNOPSIS
# instantiated by Net::MQTT::Message
=head1 DESCRIPTION
This module encapsulates a single MQTT Connection Acknowledgement
message. It is a specific subclass used by L<Net::MQTT::Message>
and should not need to be instantiated directly.
=head1 METHODS
=head2 C<connack_reserved()>
Returns the reserved field of the MQTT Connection Acknowledgement
message.
=head2 C<return_code()>
Returns the return code field of the MQTT Connection Acknowledgement
message. The module L<Net::MQTT::Constants> provides a function,
C<connect_return_code_string>, that can be used to convert this value
to a human readable string.
=head1 AUTHOR
Mark Hindess <soft-cpan@temporalanomaly.com>
=head1 COPYRIGHT AND LICENSE
This software is copyright (c) 2014 by Mark Hindess.
This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.
=cut

View File

@ -0,0 +1,246 @@
use strict;
use warnings;
package Net::MQTT::Message::Connect;
$Net::MQTT::Message::Connect::VERSION = '1.142010';
# ABSTRACT: Perl module to represent an MQTT Connect message
use base 'Net::MQTT::Message';
use Net::MQTT::Constants qw/:all/;
sub message_type {
1
}
sub protocol_name { shift->{protocol_name} || 'MQIsdp' }
sub protocol_version { shift->{protocol_version} || 3 }
sub user_name_flag {
my $self = shift;
$self->{user_name_flag} || defined $self->{user_name};
}
sub password_flag {
my $self = shift;
$self->{password_flag} || defined $self->{password};
}
sub will_retain { shift->{will_retain} || 0 }
sub will_qos { shift->{will_qos} || 0 }
sub will_flag {
my $self = shift;
$self->{will_flag} || defined $self->{will_topic}
}
sub clean_session {
my $self = shift;
defined $self->{clean_session} ? $self->{clean_session} : 1
}
sub connect_reserved_flag { shift->{connect_reserved_flag} || 0 }
sub keep_alive_timer { shift->{keep_alive_timer} || 60 }
sub client_id { shift->{client_id} || 'Net::MQTT::Message['.$$.']' }
sub will_topic { shift->{will_topic} }
sub will_message { shift->{will_message} }
sub user_name { shift->{user_name} }
sub password { shift->{password} }
sub _remaining_string {
my ($self, $prefix) = @_;
$self->protocol_name.'/'.$self->protocol_version.'/'.$self->client_id.
($self->user_name_flag ? ' user='.$self->user_name : '').
($self->password_flag ? ' pass='.$self->password : '').
($self->will_flag
? ' will='.$self->will_topic.',"'.$self->will_message.'",'.
$self->will_retain.','.qos_string($self->will_qos) : '').
' '.$self->SUPER::_remaining_string($prefix)
}
sub _parse_remaining {
my $self = shift;
my $offset = 0;
$self->{protocol_name} = decode_string($self->{remaining}, \$offset);
$self->{protocol_version} = decode_byte($self->{remaining}, \$offset);
my $b = decode_byte($self->{remaining}, \$offset);
$self->{user_name_flag} = ($b&0x80) >> 7;
$self->{password_flag} = ($b&0x40) >> 6;
$self->{will_retain} = ($b&0x20) >> 5;
$self->{will_qos} = ($b&0x18) >> 3;
$self->{will_flag} = ($b&0x4) >> 2;
$self->{clean_session} = ($b&0x2) >> 1;
$self->{connect_reserved_flag} = $b&0x1;
$self->{keep_alive_timer} = decode_short($self->{remaining}, \$offset);
$self->{client_id} = decode_string($self->{remaining}, \$offset);
if ($self->will_flag) {
$self->{will_topic} = decode_string($self->{remaining}, \$offset);
$self->{will_message} = decode_string($self->{remaining}, \$offset);
}
if ($self->user_name_flag) {
$self->{user_name} = decode_string($self->{remaining}, \$offset);
}
if ($self->password_flag) {
$self->{password} = decode_string($self->{remaining}, \$offset);
}
substr $self->{remaining}, 0, $offset, '';
}
sub _remaining_bytes {
my $self = shift;
my $o = encode_string($self->protocol_name);
$o .= encode_byte($self->protocol_version);
$o .= encode_byte(
($self->user_name_flag << 7) |
($self->password_flag << 6) |
($self->will_retain << 5) | ($self->will_qos << 3) |
($self->will_flag << 2) |
($self->clean_session << 1) |
$self->connect_reserved_flag);
$o .= encode_short($self->keep_alive_timer);
$o .= encode_string($self->client_id);
$o .= encode_string($self->will_topic) if ($self->will_flag);
$o .= encode_string($self->will_message) if ($self->will_flag);
$o .= encode_string($self->user_name) if ($self->user_name_flag);
$o .= encode_string($self->password) if ($self->password_flag);
$o;
}
1;
__END__
=pod
=encoding UTF-8
=head1 NAME
Net::MQTT::Message::Connect - Perl module to represent an MQTT Connect message
=head1 VERSION
version 1.142010
=head1 SYNOPSIS
# instantiated by Net::MQTT::Message
=head1 DESCRIPTION
This module encapsulates a single MQTT Connection Request message. It
is a specific subclass used by L<Net::MQTT::Message> and should
not need to be instantiated directly.
=head1 METHODS
=head2 C<protocol_name()>
Returns the protocol name field of the MQTT Connect message. The
default is 'C<MQIsdp>'.
=head2 C<protocol_version()>
Returns the protocol version field of the MQTT Connect message. The
default is 3.
=head2 C<user_name_flag()>
Returns the user name flag field of the MQTT Connect message. The
default is true if and only if a user name is defined.
=head2 C<password_flag()>
Returns the password flag field of the MQTT Connect message. The
default is true if and only if a password is defined.
=head2 C<will_retain()>
Returns the will retain field of the MQTT Connect message. The
default is 0.
=head2 C<will_qos()>
Returns the will QoS field of the MQTT Connect message. The default
is 0.
=head2 C<will_flag()>
Returns the will flag field of the MQTT Connect message. The
default is true if and only if a will topic is defined.
=head2 C<clean_session()>
Returns the clean session flag field of the MQTT Connect message. The
default is 1.
=head2 C<connect_reserved_flag()>
Returns the reserved flag field of the MQTT Connect message. The
default is 0.
=head2 C<keep_alive_timer()>
Returns the keep alive timer field of the MQTT Connect message. The
units are seconds. The default is 60.
=head2 C<client_id()>
Returns the client identifier field of the MQTT Connect message. The
default is 'C<Net::MQTT::Message[$$]>' where 'C<$$>' is the
current process id.
=head2 C<will_topic()>
Returns the will topic field of the MQTT Connect message. The default
is undefined.
=head2 C<will_message()>
Returns the will message field of the MQTT Connect message. The
default is undefined.
=head2 C<user_name()>
Returns the user name field of the MQTT Connect message. The default
is undefined.
=head2 C<password()>
Returns the password field of the MQTT Connect message. The default
is undefined.
=head1 AUTHOR
Mark Hindess <soft-cpan@temporalanomaly.com>
=head1 COPYRIGHT AND LICENSE
This software is copyright (c) 2014 by Mark Hindess.
This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.
=cut

View File

@ -0,0 +1,51 @@
use strict;
use warnings;
package Net::MQTT::Message::Disconnect;
$Net::MQTT::Message::Disconnect::VERSION = '1.142010';
# ABSTRACT: Perl module to represent an MQTT Disconnect message
use base 'Net::MQTT::Message';
sub message_type {
14
}
1;
__END__
=pod
=encoding UTF-8
=head1 NAME
Net::MQTT::Message::Disconnect - Perl module to represent an MQTT Disconnect message
=head1 VERSION
version 1.142010
=head1 SYNOPSIS
# instantiated by Net::MQTT::Message
=head1 DESCRIPTION
This module encapsulates a single MQTT Disconnection Notification
message. It is a specific subclass used by L<Net::MQTT::Message>
and should not need to be instantiated directly.
=head1 AUTHOR
Mark Hindess <soft-cpan@temporalanomaly.com>
=head1 COPYRIGHT AND LICENSE
This software is copyright (c) 2014 by Mark Hindess.
This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.
=cut

View File

@ -0,0 +1,74 @@
use strict;
use warnings;
package Net::MQTT::Message::JustMessageId;
$Net::MQTT::Message::JustMessageId::VERSION = '1.142010';
# ABSTRACT: Perl module for an MQTT message w/message id only payload
use base 'Net::MQTT::Message';
use Net::MQTT::Constants qw/:all/;
sub message_id { shift->{message_id} }
sub _remaining_string {
my ($self, $prefix) = @_;
$self->message_id.' '.$self->SUPER::_remaining_string($prefix)
}
sub _parse_remaining {
my $self = shift;
my $offset = 0;
$self->{message_id} = decode_short($self->{remaining}, \$offset);
substr $self->{remaining}, 0, $offset, '';
}
sub _remaining_bytes {
my $self = shift;
encode_short($self->message_id)
}
1;
__END__
=pod
=encoding UTF-8
=head1 NAME
Net::MQTT::Message::JustMessageId - Perl module for an MQTT message w/message id only payload
=head1 VERSION
version 1.142010
=head1 SYNOPSIS
# abstract class not instantiated directly
=head1 DESCRIPTION
This module encapsulates a single MQTT message that has only a message id
in its payload. This is an abstract class used to implement a number
of other MQTT messages such as PubAck, PubComp, etc.
=head1 METHODS
=head2 C<message_id()>
Returns the message id field of the MQTT message.
=head1 AUTHOR
Mark Hindess <soft-cpan@temporalanomaly.com>
=head1 COPYRIGHT AND LICENSE
This software is copyright (c) 2014 by Mark Hindess.
This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.
=cut

View File

@ -0,0 +1,51 @@
use strict;
use warnings;
package Net::MQTT::Message::PingReq;
$Net::MQTT::Message::PingReq::VERSION = '1.142010';
# ABSTRACT: Perl module to represent an MQTT PingReq message
use base 'Net::MQTT::Message';
sub message_type {
12
}
1;
__END__
=pod
=encoding UTF-8
=head1 NAME
Net::MQTT::Message::PingReq - Perl module to represent an MQTT PingReq message
=head1 VERSION
version 1.142010
=head1 SYNOPSIS
# instantiated by Net::MQTT::Message
=head1 DESCRIPTION
This module encapsulates a single MQTT Ping Request message. It is a
specific subclass used by L<Net::MQTT::Message> and should not
need to be instantiated directly.
=head1 AUTHOR
Mark Hindess <soft-cpan@temporalanomaly.com>
=head1 COPYRIGHT AND LICENSE
This software is copyright (c) 2014 by Mark Hindess.
This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.
=cut

View File

@ -0,0 +1,51 @@
use strict;
use warnings;
package Net::MQTT::Message::PingResp;
$Net::MQTT::Message::PingResp::VERSION = '1.142010';
# ABSTRACT: Perl module to represent an MQTT PingResp message
use base 'Net::MQTT::Message';
sub message_type {
13
}
1;
__END__
=pod
=encoding UTF-8
=head1 NAME
Net::MQTT::Message::PingResp - Perl module to represent an MQTT PingResp message
=head1 VERSION
version 1.142010
=head1 SYNOPSIS
# instantiated by Net::MQTT::Message
=head1 DESCRIPTION
This module encapsulates a single MQTT Ping Response message. It is a
specific subclass used by L<Net::MQTT::Message> and should not
need to be instantiated directly.
=head1 AUTHOR
Mark Hindess <soft-cpan@temporalanomaly.com>
=head1 COPYRIGHT AND LICENSE
This software is copyright (c) 2014 by Mark Hindess.
This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.
=cut

View File

@ -0,0 +1,59 @@
use strict;
use warnings;
package Net::MQTT::Message::PubAck;
$Net::MQTT::Message::PubAck::VERSION = '1.142010';
# ABSTRACT: Perl module to represent an MQTT PubAck message
use base 'Net::MQTT::Message::JustMessageId';
use Net::MQTT::Constants qw/:all/;
sub message_type {
4
}
1;
__END__
=pod
=encoding UTF-8
=head1 NAME
Net::MQTT::Message::PubAck - Perl module to represent an MQTT PubAck message
=head1 VERSION
version 1.142010
=head1 SYNOPSIS
# instantiated by Net::MQTT::Message
=head1 DESCRIPTION
This module encapsulates a single MQTT Publish Acknowledgement
message. It is a specific subclass used by L<Net::MQTT::Message>
and should not need to be instantiated directly.
=head1 METHODS
=head2 C<message_id()>
Returns the message id field of the MQTT Publish Acknowledgement message.
=head1 AUTHOR
Mark Hindess <soft-cpan@temporalanomaly.com>
=head1 COPYRIGHT AND LICENSE
This software is copyright (c) 2014 by Mark Hindess.
This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.
=cut

View File

@ -0,0 +1,59 @@
use strict;
use warnings;
package Net::MQTT::Message::PubComp;
$Net::MQTT::Message::PubComp::VERSION = '1.142010';
# ABSTRACT: Perl module to represent an MQTT PubComp message
use base 'Net::MQTT::Message::JustMessageId';
use Net::MQTT::Constants qw/:all/;
sub message_type {
7
}
1;
__END__
=pod
=encoding UTF-8
=head1 NAME
Net::MQTT::Message::PubComp - Perl module to represent an MQTT PubComp message
=head1 VERSION
version 1.142010
=head1 SYNOPSIS
# instantiated by Net::MQTT::Message
=head1 DESCRIPTION
This module encapsulates a single MQTT Publish Complete message. It
is a specific subclass used by L<Net::MQTT::Message> and should
not need to be instantiated directly.
=head1 METHODS
=head2 C<message_id()>
Returns the message id field of the MQTT Publish Complete message.
=head1 AUTHOR
Mark Hindess <soft-cpan@temporalanomaly.com>
=head1 COPYRIGHT AND LICENSE
This software is copyright (c) 2014 by Mark Hindess.
This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.
=cut

View File

@ -0,0 +1,59 @@
use strict;
use warnings;
package Net::MQTT::Message::PubRec;
$Net::MQTT::Message::PubRec::VERSION = '1.142010';
# ABSTRACT: Perl module to represent an MQTT PubRec message
use base 'Net::MQTT::Message::JustMessageId';
use Net::MQTT::Constants qw/:all/;
sub message_type {
5
}
1;
__END__
=pod
=encoding UTF-8
=head1 NAME
Net::MQTT::Message::PubRec - Perl module to represent an MQTT PubRec message
=head1 VERSION
version 1.142010
=head1 SYNOPSIS
# instantiated by Net::MQTT::Message
=head1 DESCRIPTION
This module encapsulates a single MQTT Publish Received message. It
is a specific subclass used by L<Net::MQTT::Message> and should
not need to be instantiated directly.
=head1 METHODS
=head2 C<message_id()>
Returns the message id field of the MQTT Publish Received message.
=head1 AUTHOR
Mark Hindess <soft-cpan@temporalanomaly.com>
=head1 COPYRIGHT AND LICENSE
This software is copyright (c) 2014 by Mark Hindess.
This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.
=cut

View File

@ -0,0 +1,63 @@
use strict;
use warnings;
package Net::MQTT::Message::PubRel;
$Net::MQTT::Message::PubRel::VERSION = '1.142010';
# ABSTRACT: Perl module to represent an MQTT PubRel message
use base 'Net::MQTT::Message::JustMessageId';
use Net::MQTT::Constants qw/:all/;
sub message_type {
6
}
sub _default_qos {
MQTT_QOS_AT_LEAST_ONCE
}
1;
__END__
=pod
=encoding UTF-8
=head1 NAME
Net::MQTT::Message::PubRel - Perl module to represent an MQTT PubRel message
=head1 VERSION
version 1.142010
=head1 SYNOPSIS
# instantiated by Net::MQTT::Message
=head1 DESCRIPTION
This module encapsulates a single MQTT Publish Release message. It is
a specific subclass used by L<Net::MQTT::Message> and should not
need to be instantiated directly.
=head1 METHODS
=head2 C<message_id()>
Returns the message id field of the MQTT Publish Release message.
=head1 AUTHOR
Mark Hindess <soft-cpan@temporalanomaly.com>
=head1 COPYRIGHT AND LICENSE
This software is copyright (c) 2014 by Mark Hindess.
This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.
=cut

View File

@ -0,0 +1,104 @@
use strict;
use warnings;
package Net::MQTT::Message::Publish;
$Net::MQTT::Message::Publish::VERSION = '1.142010';
# ABSTRACT: Perl module to represent an MQTT Publish message
use base 'Net::MQTT::Message';
use Net::MQTT::Constants qw/:all/;
sub message_type {
3
}
sub topic { shift->{topic} }
sub message_id { shift->{message_id} }
sub message { shift->{message} }
sub _message_string { shift->{message} }
sub _remaining_string {
my $self = shift;
$self->topic.
($self->qos ? '/'.$self->message_id : '').
' '.dump_string($self->_message_string)
}
sub _parse_remaining {
my $self = shift;
my $offset = 0;
$self->{topic} = decode_string($self->{remaining}, \$offset);
$self->{message_id} = decode_short($self->{remaining}, \$offset)
if ($self->qos);
$self->{message} = substr $self->{remaining}, $offset;
$self->{remaining} = '';
}
sub _remaining_bytes {
my $self = shift;
my $o = encode_string($self->topic);
if ($self->qos) {
$o .= encode_short($self->message_id);
}
$o .= $self->message;
$o;
}
1;
__END__
=pod
=encoding UTF-8
=head1 NAME
Net::MQTT::Message::Publish - Perl module to represent an MQTT Publish message
=head1 VERSION
version 1.142010
=head1 SYNOPSIS
# instantiated by Net::MQTT::Message
=head1 DESCRIPTION
This module encapsulates a single MQTT Publish message. It
is a specific subclass used by L<Net::MQTT::Message> and should
not need to be instantiated directly.
=head1 METHODS
=head2 C<topic()>
Returns the topic field of the MQTT Publish message.
=head2 C<message_id()>
Returns the message id field of the MQTT Publish message.
=head2 C<message()>
Returns the message field of the MQTT Publish message.
=head1 AUTHOR
Mark Hindess <soft-cpan@temporalanomaly.com>
=head1 COPYRIGHT AND LICENSE
This software is copyright (c) 2014 by Mark Hindess.
This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.
=cut

View File

@ -0,0 +1,97 @@
use strict;
use warnings;
package Net::MQTT::Message::SubAck;
$Net::MQTT::Message::SubAck::VERSION = '1.142010';
# ABSTRACT: Perl module to represent an MQTT SubAck message
use base 'Net::MQTT::Message';
use Net::MQTT::Constants qw/:all/;
sub message_type {
9
}
sub message_id { shift->{message_id} }
sub qos_levels { shift->{qos_levels} }
sub _remaining_string {
my ($self, $prefix) = @_;
$self->message_id.'/'.
(join ',', map { qos_string($_) } @{$self->qos_levels}).
' '.$self->SUPER::_remaining_string($prefix)
}
sub _parse_remaining {
my $self = shift;
my $offset = 0;
$self->{message_id} = decode_short($self->{remaining}, \$offset);
while ($offset < length $self->{remaining}) {
push @{$self->{qos_levels}}, decode_byte($self->{remaining}, \$offset)&0x3;
}
substr $self->{remaining}, 0, $offset, '';
}
sub _remaining_bytes {
my $self = shift;
my $o = encode_short($self->message_id);
foreach my $qos (@{$self->qos_levels}) {
$o .= encode_byte($qos);
}
$o
}
1;
__END__
=pod
=encoding UTF-8
=head1 NAME
Net::MQTT::Message::SubAck - Perl module to represent an MQTT SubAck message
=head1 VERSION
version 1.142010
=head1 SYNOPSIS
# instantiated by Net::MQTT::Message
=head1 DESCRIPTION
This module encapsulates a single MQTT Subscription Acknowledgement
message. It is a specific subclass used by L<Net::MQTT::Message>
and should not need to be instantiated directly.
=head1 METHODS
=head2 C<message_id()>
Returns the message id field of the MQTT Subscription Acknowledgement
message.
=head2 C<qos_levels()>
Returns the list of granted QoS fields of the MQTT Subscription
Acknowledgement message.
=head1 AUTHOR
Mark Hindess <soft-cpan@temporalanomaly.com>
=head1 COPYRIGHT AND LICENSE
This software is copyright (c) 2014 by Mark Hindess.
This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.
=cut

View File

@ -0,0 +1,106 @@
use strict;
use warnings;
package Net::MQTT::Message::Subscribe;
$Net::MQTT::Message::Subscribe::VERSION = '1.142010';
# ABSTRACT: Perl module to represent an MQTT Subscribe message
use base 'Net::MQTT::Message';
use Net::MQTT::Constants qw/:all/;
sub message_type {
8
}
sub _default_qos {
MQTT_QOS_AT_LEAST_ONCE
}
sub message_id { shift->{message_id} }
sub topics { shift->{topics} }
sub _topics_string {
join ',', map { $_->[0].'/'.qos_string($_->[1]) } @{shift->{topics}}
}
sub _remaining_string {
my ($self, $prefix) = @_;
$self->message_id.' '.$self->_topics_string.' '.
$self->SUPER::_remaining_string($prefix)
}
sub _parse_remaining {
my $self = shift;
my $offset = 0;
$self->{message_id} = decode_short($self->{remaining}, \$offset);
while ($offset < length $self->{remaining}) {
push @{$self->{topics}}, [ decode_string($self->{remaining}, \$offset),
decode_byte($self->{remaining}, \$offset) ];
}
substr $self->{remaining}, 0, $offset, '';
}
sub _remaining_bytes {
my $self = shift;
my $o = encode_short($self->message_id);
foreach my $r (@{$self->topics}) {
my ($name, $qos) = @$r;
$o .= encode_string($name);
$o .= encode_byte($qos);
}
$o
}
1;
__END__
=pod
=encoding UTF-8
=head1 NAME
Net::MQTT::Message::Subscribe - Perl module to represent an MQTT Subscribe message
=head1 VERSION
version 1.142010
=head1 SYNOPSIS
# instantiated by Net::MQTT::Message
=head1 DESCRIPTION
This module encapsulates a single MQTT Subscribe message. It is a
specific subclass used by L<Net::MQTT::Message> and should not
need to be instantiated directly.
=head1 METHODS
=head2 C<message_id()>
Returns the message id field of the MQTT Subscribe message.
=head2 C<topics()>
Returns the list of topics of the MQTT Subscribe message. Each
element of the list is a 2-ple containing the topic and its associated
requested QoS level.
=head1 AUTHOR
Mark Hindess <soft-cpan@temporalanomaly.com>
=head1 COPYRIGHT AND LICENSE
This software is copyright (c) 2014 by Mark Hindess.
This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.
=cut

View File

@ -0,0 +1,60 @@
use strict;
use warnings;
package Net::MQTT::Message::UnsubAck;
$Net::MQTT::Message::UnsubAck::VERSION = '1.142010';
# ABSTRACT: Perl module to represent an MQTT UnsubAck message
use base 'Net::MQTT::Message::JustMessageId';
use Net::MQTT::Constants qw/:all/;
sub message_type {
11
}
1;
__END__
=pod
=encoding UTF-8
=head1 NAME
Net::MQTT::Message::UnsubAck - Perl module to represent an MQTT UnsubAck message
=head1 VERSION
version 1.142010
=head1 SYNOPSIS
# instantiated by Net::MQTT::Message
=head1 DESCRIPTION
This module encapsulates a single MQTT Unsubscribe Acknowledgement
message. It is a specific subclass used by L<Net::MQTT::Message>
and should not need to be instantiated directly.
=head1 METHODS
=head2 C<message_id()>
Returns the message id field of the MQTT Unsubscribe Acknowledgement
message.
=head1 AUTHOR
Mark Hindess <soft-cpan@temporalanomaly.com>
=head1 COPYRIGHT AND LICENSE
This software is copyright (c) 2014 by Mark Hindess.
This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.
=cut

View File

@ -0,0 +1,99 @@
use strict;
use warnings;
package Net::MQTT::Message::Unsubscribe;
$Net::MQTT::Message::Unsubscribe::VERSION = '1.142010';
# ABSTRACT: Perl module to represent an MQTT Unsubscribe message
use base 'Net::MQTT::Message';
use Net::MQTT::Constants qw/:all/;
sub message_type {
10
}
sub _default_qos {
MQTT_QOS_AT_LEAST_ONCE
}
sub message_id { shift->{message_id} }
sub topics { shift->{topics} }
sub _topics_string { join ',', @{shift->{topics}} }
sub _remaining_string {
my ($self, $prefix) = @_;
$self->message_id.' '.$self->_topics_string.' '.
$self->SUPER::_remaining_string($prefix)
}
sub _parse_remaining {
my $self = shift;
my $offset = 0;
$self->{message_id} = decode_short($self->{remaining}, \$offset);
while ($offset < length $self->{remaining}) {
push @{$self->{topics}}, decode_string($self->{remaining}, \$offset);
}
substr $self->{remaining}, 0, $offset, '';
}
sub _remaining_bytes {
my $self = shift;
my $o = encode_short($self->message_id);
foreach my $name (@{$self->topics}) {
$o .= encode_string($name);
}
$o
}
1;
__END__
=pod
=encoding UTF-8
=head1 NAME
Net::MQTT::Message::Unsubscribe - Perl module to represent an MQTT Unsubscribe message
=head1 VERSION
version 1.142010
=head1 SYNOPSIS
# instantiated by Net::MQTT::Message
=head1 DESCRIPTION
This module encapsulates a single MQTT Unsubscribe message. It is a
specific subclass used by L<Net::MQTT::Message> and should not
need to be instantiated directly.
=head1 METHODS
=head2 C<message_id()>
Returns the message id field of the MQTT Unsubscribe message.
=head2 C<topics()>
Returns the list of topics of the MQTT Subscribe message.
=head1 AUTHOR
Mark Hindess <soft-cpan@temporalanomaly.com>
=head1 COPYRIGHT AND LICENSE
This software is copyright (c) 2014 by Mark Hindess.
This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.
=cut

View File

@ -0,0 +1,116 @@
use strict;
use warnings;
package Net::MQTT::TopicStore;
$Net::MQTT::TopicStore::VERSION = '1.142010';
# ABSTRACT: Perl module to represent MQTT topic store
sub new {
my $pkg = shift;
my $self = bless { topics => { } }, $pkg;
$self->add($_) foreach (@_);
$self
}
sub add {
my ($self, $topic_pattern) = @_;
unless (exists $self->{topics}->{$topic_pattern}) {
$self->{topics}->{$topic_pattern} = _topic_to_regexp($topic_pattern);
}
$topic_pattern
}
sub delete {
my ($self, $topic) = @_;
delete $self->{topics}->{$topic};
}
sub values {
my ($self, $topic) = @_;
my @res = ();
foreach my $t (keys %{$self->{topics}}) {
my $re = $self->{topics}->{$t};
next unless (defined $re ? $topic =~ $re : $topic eq $t);
push @res, $t;
}
return \@res;
}
sub _topic_to_regexp {
my $topic = shift;
my $c;
$topic = quotemeta $topic;
$c += ($topic =~ s!\\/\\\+!\\/[^/]*!g);
$c += ($topic =~ s!\\/\\#$!(?:\$|/.*)!);
$c += ($topic =~ s!^\\\+\\/![^/]*\\/!g);
$c += ($topic =~ s!^\\\+$![^/]*!g);
$c += ($topic =~ s!^\\#$!.*!);
$topic .= '$' unless ($topic =~ m!\$$!);
unless ($c) {
return;
}
qr/^$topic/
}
1;
__END__
=pod
=encoding UTF-8
=head1 NAME
Net::MQTT::TopicStore - Perl module to represent MQTT topic store
=head1 VERSION
version 1.142010
=head1 SYNOPSIS
use Net::MQTT::TopicStore;
my $topic_store = Net::MQTT::TopicStore->new();
$topic_store->add($topic_pattern1);
$topic_store->add($topic_pattern2);
my @topics = @{ $topic->get($topic) };
$topic_store->remove($topic_pattern2);
=head1 DESCRIPTION
This module encapsulates a single MQTT topic store.
=head1 METHODS
=head2 C<new( )>
Constructs a L<Net::MQTT::TopicStore> object.
=head2 C<add( $topic_pattern )>
Adds the topic pattern to the store.
=head2 C<delete( $topic_pattern )>
Remove the topic pattern from the store.
=head2 C<values( $topic )>
Returns all the topic patterns in the store that apply to the given topic.
=head1 AUTHOR
Mark Hindess <soft-cpan@temporalanomaly.com>
=head1 COPYRIGHT AND LICENSE
This software is copyright (c) 2014 by Mark Hindess.
This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.
=cut