From 0fba7fe4accb49ea682d7bff4b1a1d98ac8a9358 Mon Sep 17 00:00:00 2001 From: fhemzap <> Date: Mon, 30 Nov 2015 16:32:26 +0000 Subject: [PATCH] HMCCU: RPCQueue.pm substitutes File::Queue because of a bug git-svn-id: https://svn.fhem.de/fhem/trunk@10058 2b470e98-0d58-463d-a4d8-8e2adae1ed80 --- fhem/contrib/HMCCU/RPCQueue.pm | 186 +++++++++++++++++++++++++++++++++ 1 file changed, 186 insertions(+) create mode 100644 fhem/contrib/HMCCU/RPCQueue.pm diff --git a/fhem/contrib/HMCCU/RPCQueue.pm b/fhem/contrib/HMCCU/RPCQueue.pm new file mode 100644 index 000000000..93acbdf8f --- /dev/null +++ b/fhem/contrib/HMCCU/RPCQueue.pm @@ -0,0 +1,186 @@ +package RPCQueue; + +use strict; +use IO::File; +use Fcntl 'SEEK_END', 'SEEK_SET', 'O_CREAT', 'O_RDWR'; +use Carp qw(carp croak); + +our $VERSION = '1.01'; + +sub new +{ + my $class = shift; + my $mi = $class . '->new()'; + + croak "$mi requires an even number of parameters" if (@_ & 1); + my %params = @_; + + # convert to lower case + my @keylist = keys %params; + foreach my $key (@keylist) { + my $val = $params{$key}; + delete $params{$key}; + $params{ lc($key) } = $val; + } + + croak "$mi needs an File parameter" unless exists $params{file}; + my $queue_file = delete $params{file}; + my $idx_file = $queue_file . '.idx'; + $queue_file .= '.dat'; + + my $self; + my $mode = delete $params{mode} || '0600'; + $self->{block_size} = delete $params{blocksize} || 64; + $self->{seperator} = delete $params{seperator} || "\n"; + $self->{sep_length} = length $self->{seperator}; + + croak "Seperator length cannot be greater than BlockSize" if ($self->{sep_length} > $self->{block_size}); + + $self->{queue_file} = $queue_file; + $self->{idx_file} = $idx_file; + + $self->{queue} = new IO::File $queue_file, O_CREAT | O_RDWR, $mode or croak $!; + $self->{idx} = new IO::File $idx_file, O_CREAT | O_RDWR, $mode or croak $!; + + ### Default ptr to 0, replace it with value in idx file if one exists + $self->{idx}->sysseek(0, SEEK_SET); + $self->{idx}->sysread($self->{ptr}, 1024); + $self->{ptr} = '0' unless $self->{ptr}; + + if($self->{ptr} > -s $queue_file) + { + carp "Ptr is greater than queue file size, resetting ptr to '0'"; + + $self->{idx}->truncate(0) or croak "Could not truncate idx: $!"; + $self->{idx}->sysseek(0, SEEK_SET); + $self->{idx}->syswrite('0') or croak "Could not syswrite to idx: $!"; + } + + bless $self, $class; + return $self; +} + +sub enq +{ + my ($self, $element) = @_; + + $self->{queue}->sysseek(0, SEEK_END); + + if(ref $element) + { + croak 'Cannot handle references'; + } + + if($element =~ s/$self->{seperator}//g) + { + carp "Removed illegal seperator(s) from $element"; + } + + $self->{queue}->syswrite("$element$self->{seperator}") or croak "Could not syswrite to queue: $!"; +} + +sub deq +{ + my $self = shift; + my $element; + + $self->{queue}->sysseek($self->{ptr}, SEEK_SET); + + my $i; + while($self->{queue}->sysread($_, $self->{block_size})) + { + + $i = index($_, $self->{seperator}); + if($i != -1) + { + $element .= substr($_, 0, $i); + $self->{ptr} += $i + $self->{sep_length}; + $self->{queue}->sysseek($self->{ptr}, SEEK_SET); + + last; + } + else + { + ## If seperator isn't found, go back 'sep_length' spaces to ensure we don't miss it between reads + $element .= substr($_, 0, -$self->{sep_length}, ''); + $self->{ptr} += $self->{block_size} - $self->{sep_length}; + $self->{queue}->sysseek($self->{ptr}, SEEK_SET); + } + } + + ## If queue seek pointer is at the EOF, truncate the queue file + if($self->{queue}->sysread($_, 1) == 0) + { + $self->{queue}->truncate(0) or croak "Could not truncate queue: $!"; + $self->{queue}->sysseek($self->{ptr} = 0, SEEK_SET); + } + + ## Set idx file contents to point to the current seek position in queue file + $self->{idx}->truncate(0) or croak "Could not truncate idx: $!"; + $self->{idx}->sysseek(0, SEEK_SET); + $self->{idx}->syswrite($self->{ptr}) or croak "Could not syswrite to idx: $!"; + + return $element; +} + +sub peek +{ + my ($self, $count) = @_; + croak "Invalid argument to peek ($count)" unless $count > 0; + + my $elements; + + $self->{queue}->sysseek($self->{ptr}, SEEK_SET); + + my (@items, $remainder); +GATHER: + while($self->{queue}->sysread($_, $self->{block_size})) + { + if(defined $remainder) + { + $_ = $remainder . $_; + } + + @items = split /$self->{seperator}/, $_, -1; + $remainder = pop @items; + + foreach (@items) + { + push @$elements, $_; + last GATHER if $count == @$elements; + } + } + + return $elements; +} + +sub reset +{ + my $self = shift; + + $self->{idx}->truncate(0) or croak "Could not truncate idx: $!"; + $self->{idx}->sysseek(0, SEEK_SET); + $self->{idx}->syswrite('0') or croak "Could not syswrite to idx: $!"; + + $self->{queue}->sysseek($self->{ptr} = 0, SEEK_SET); +} + +sub close +{ + my $self = shift; + + $self->{idx}->close(); + $self->{queue}->close(); +} + +sub delete +{ + my $self = shift; + + $self->close(); + + unlink $self->{queue_file}; + unlink $self->{idx_file}; +} + +1;