From 51a50b36a85b8e8a236f7e8222c9d62db0b9abba Mon Sep 17 00:00:00 2001 From: timmib <> Date: Thu, 10 Dec 2020 11:33:17 +0000 Subject: [PATCH] 93_InfluxDBLogger.pm: Added git-svn-id: https://svn.fhem.de/fhem/trunk@23322 2b470e98-0d58-463d-a4d8-8e2adae1ed80 --- fhem/FHEM/93_InfluxDBLogger.pm | 732 +++++++++++++++++++++++++++++++++ fhem/MAINTAINER.txt | 1 + 2 files changed, 733 insertions(+) create mode 100644 fhem/FHEM/93_InfluxDBLogger.pm diff --git a/fhem/FHEM/93_InfluxDBLogger.pm b/fhem/FHEM/93_InfluxDBLogger.pm new file mode 100644 index 000000000..17eba09a7 --- /dev/null +++ b/fhem/FHEM/93_InfluxDBLogger.pm @@ -0,0 +1,732 @@ +# $Id$ +# 93_InfluxDBLogger.pm +# +package main; + +use strict; +use warnings; +use HttpUtils; + +my $total_writes_name = "total_writes"; +my $succeeded_writes_name = "succeeded_writes"; +my $failed_writes_name = "failed_writes"; +my $dropped_writes_name = "dropped_writes"; +my $droppeed_writes_last_message_name = "dropped_writes_last_message"; +my $failed_writes_last_error_name = "failed_writes_last_error"; + +# FHEM Modulfunktionen + +sub InfluxDBLogger_Initialize($) { + my ($hash) = @_; + $hash->{DefFn} = "InfluxDBLogger_Define"; + $hash->{NotifyFn} = "InfluxDBLogger_Notify"; + $hash->{SetFn} = "InfluxDBLogger_Set"; + $hash->{RenameFn} = "InfluxDBLogger_Rename"; + $hash->{AttrList} = "disable:1,0 security:basic_auth,none,token username readingInclude readingExclude conversions deviceTagName measurement tags fields"; + Log3 undef, 2, "InfluxDBLogger: Initialized new"; +} + +sub InfluxDBLogger_Define($$) { + my ( $hash, $def ) = @_; + my $name = $hash->{NAME}; + + my @a = split("[ \t][ \t]*", $def); + + return "Usage: define devname InfluxDBLogger [http|https]://IP_or_Hostname:port dbname devspec" if (scalar(@a) < 4); + + $hash->{URL} = $a[2]; + $hash->{DATABASE} = $a[3]; + $hash->{NOTIFYDEV} = $a[4]; + + if ( "THIS_WONT_USUALY_MATCH" =~ /$hash->{NOTIFYDEV}/ && $init_done) { + $attr{$name}{disable} = "1"; + Log3 $name, 2, "InfluxDBLogger: [$name] You specified a very loose device spec. To avoid a lot of events in your influx database this module is disabled on default. You might want to use the readingRegEx-attribute and enable the device afterwards."; + } + + Log3 $name, 3, "InfluxDBLogger: [$name] defined with server ".$hash->{URL}." database ".$hash->{DATABASE}." notifydev ".$hash->{NOTIFYDEV}; +} + +sub InfluxDBLogger_Notify($$) +{ + my ($own_hash, $dev_hash) = @_; + my $name = $own_hash->{NAME}; # own name / hash + + return "" if(IsDisabled($name)); # Return without any further action if the module is disabled + return "" if(!$init_done); + + my $devName = $dev_hash->{NAME}; # Device that created the events + my $events = deviceEvents($dev_hash, 1); + + return "" if($devName eq "global" && grep(m/^INITIALIZED|REREADCFG$/, @{$events})); + return "" if($own_hash->{TYPE} eq $dev_hash->{TYPE}); # avoid endless loops from logger to logger + + Log3 $name, 4, "InfluxDBLogger: [$name] notified from device $devName"; + my %map = InfluxDBLogger_BuildMap($own_hash, $dev_hash, $events); + my @incompatible = (); + my ($data,$rows) = InfluxDBLogger_BuildData($own_hash,$dev_hash,\%map,\@incompatible); + + InfluxDBLogger_Send($own_hash,$data,$rows); + + if (scalar(@incompatible) > 0 ) { + InfluxDBLogger_DroppedIncompatibleValues($own_hash,$name,@incompatible); + } +} + +sub InfluxDBLogger_Send($$$) +{ + my ($own_hash, $data, $rows) = @_; + my $name = $own_hash->{NAME}; + + if ( $data ne "" ) { + my $total_writes = ReadingsVal($name,$total_writes_name,0); + $total_writes+=$rows; + my $rv = readingsSingleUpdate($own_hash, $total_writes_name, $total_writes, 1); + InfluxDBLogger_UpdateState($own_hash,$name); + + my $reqpar = { + url => InfluxDBLogger_BuildUrl($own_hash), + method => "POST", + data => $data, + hideurl => 1, + hash => $own_hash, + callback => \&InfluxDBLogger_HttpCallback + }; + InfluxDBLogger_AddSecurity($own_hash,$name,$reqpar); + + Log3 $name, 4, "InfluxDBLogger: [$name] Sending data ".$reqpar->{data}." to ".$reqpar->{url}; + HttpUtils_NonblockingGet($reqpar); + } +} + +sub InfluxDBLogger_AddSecurity($$$) +{ + my ($own_hash, $name, $reqpar) = @_; + + my $security = AttrVal($name, "security", ""); + if ( $security eq "basic_auth" ) + { + my $user = AttrVal($name, "username", undef); + $reqpar->{user} = $user; + $reqpar->{pwd} = InfluxDBLogger_GetPassword($own_hash,$name); + } + elsif ( $security eq "token") + { + my $token = InfluxDBLogger_ReadSecret($own_hash,$name,"token"); + $reqpar->{header} = { "Authorization" => "Token " . $token}; + } +} + +sub InfluxDBLogger_BuildMap($$$) +{ + my ($own_hash, $dev_hash, $events) = @_; + my $name = $own_hash->{NAME}; + my $devName = $dev_hash->{NAME}; + my %map = (); + + foreach my $event (@{$events}) { + $event = "" if(!defined($event)); + my $readingInclude = AttrVal($name, "readingInclude", undef); + my $readingExclude = AttrVal($name, "readingExclude", undef); + if ( (!defined($readingInclude) || $event =~ /$readingInclude/) && (!defined($readingExclude) || !($event =~ /$readingExclude/)) ) { + Log3 $name, 4, "InfluxDBLogger: [$name] notified from device $devName about $event"; + InfluxDBLogger_Map($own_hash, $dev_hash, $event, \%map); + } + } + + return %map; +} + +sub InfluxDBLogger_BuildData($$$$) +{ + my ($own_hash, $dev_hash, $map, $incompatible) = @_; + my $name = $own_hash->{NAME}; + my $data = ""; + + my $rows = 0; + my %m = %{$map}; + my %measuremnts = (); + foreach my $device (keys %m) { + my $readings = $m{$device}; + my %r = %{$readings}; + foreach my $reading (keys %r) { + my $value_map = $r{$reading}; + my $value = $value_map->{"value"}; + my $numeric = $value_map->{"numeric"}; + if ($numeric) { + my ($measurementAndTagSet,$fieldset) = InfluxDBLogger_BuildDataDynamic($own_hash, $dev_hash, $device, $reading, $value); + if (defined $measuremnts{$measurementAndTagSet}) { + $measuremnts{$measurementAndTagSet} .= "," . $fieldset; + } else { + $measuremnts{$measurementAndTagSet} = $fieldset; + } + $rows++; + } + else { + push(@{$incompatible}, $device ." ". $reading . " " . $value); + } + } + } + foreach my $measurementAndTagSet ( keys %measuremnts ) { + $data .= $measurementAndTagSet . " " . $measuremnts{$measurementAndTagSet} . "\n"; + } + + return $data, $rows; +} + +sub InfluxDBLogger_BuildDataDynamic($$$$$) +{ + my ($hash, $dev_hash, $device, $reading, $value) = @_; + my $name = $hash->{NAME}; + + my $measurement = InfluxDBLogger_GetMeasurement($hash, $dev_hash, $device, $reading, $value); + my $tag_set = InfluxDBLogger_GetTagSet($hash, $dev_hash, $device, $reading, $value); + my $field_set = InfluxDBLogger_GetFieldSet($hash, $dev_hash, $device, $reading, $value); + + my $measurementAndTagSet = defined $tag_set ? $measurement . "," . $tag_set : $measurement; + return ($measurementAndTagSet,$field_set); +} + +sub InfluxDBLogger_GetMeasurement($$$) +{ + my ($hash, $dev_hash, $device, $reading, $value) = @_; + my $name = $hash->{NAME}; + + my $measurement = AttrVal($name, "measurement", $reading); + $measurement =~ s/\$DEVICE/$device/ei; + $measurement =~ s/\$READINGNAME/$reading/ei; + + return $measurement; +} + +sub InfluxDBLogger_GetTagSet($$$) +{ + my ($hash, $dev_hash, $device, $reading, $value) = @_; + my $name = $hash->{NAME}; + my $tags_set = AttrVal($name, "tags", undef); + if (defined $tags_set) { + if ( $tags_set eq "-" ) { + $tags_set = undef; + } else { + $tags_set =~ s/\{(.*)\}/eval($1) /ei; + $tags_set =~ s/\$DEVICE/$device /ei; + } + } else { + $tags_set = AttrVal($name, "deviceTagName", "site_name")."=".$device; + } + + return $tags_set; +} + +sub InfluxDBLogger_GetFieldSet($$$) +{ + my ($hash, $dev_hash, $device, $reading, $value) = @_; + my $name = $hash->{NAME}; + + my $field_set = AttrVal($name, "fields", "value=\$READINGVALUE"); + $field_set =~ s/\$READINGNAME/$reading/ei; + $field_set =~ s/\$READINGVALUE/$value/ei; + + return $field_set; +} + +sub InfluxDBLogger_BuildDataClassic($$$) +{ + my ($hash, $dev_hash, $device, $reading, $value) = @_; + my $name = $hash->{NAME}; + + my $measurement = $reading; + my $tag_set = AttrVal($name, "deviceTagName", "site_name")."=".$device; + my $field_set = "value=".$value; + + return "$measurement,$tag_set $field_set"; +} + +sub InfluxDBLogger_BuildUrl($) +{ + my ($hash) = @_; + return $hash->{URL}."/write?db=".urlEncode($hash->{DATABASE}); +} + +sub InfluxDBLogger_Map($$$$) +{ + my ($hash, $dev_hash, $event, $map) = @_; + my $name = $hash->{NAME}; + my $deviceName = $dev_hash->{NAME}; + my @readingAndValue = split(":[ \t]*", $event); + my $readingName = $readingAndValue[0]; + my $readingValue = $readingAndValue[1]; + + my $conversions = AttrVal($name, "conversions", undef); + if ( defined($conversions)) { + my @conversions = split(",", $conversions); + foreach ( @conversions ) { + my @ab = split("=", $_); + $readingValue =~ s/$ab[0]/eval($ab[1])/ei; + } + } + + $map->{$deviceName}->{$readingName}->{"value"} = $readingValue; + $map->{$deviceName}->{$readingName}->{"numeric"} = $readingValue =~ /^[0-9,.E-]+$/; +} +sub InfluxDBLogger_HttpCallback($$$) +{ + my ($param, $err, $data) = @_; + my $hash = $param->{hash}; + my $name = $hash->{NAME}; + + if($err ne "") { + InfluxDBLogger_HttpCallback_Error($hash,$name,$err); + } + else { + my $header = $param->{httpheader}; + my $influx_db_error = undef; + my $http_error = undef; + while ($header =~ /X-Influxdb-Error:\s*(.*+)/g) { + $influx_db_error = $1; + } + while ($header =~ /HTTP\/1\.0\s*([4|5]\d\d\s*.*+)/g) { + $http_error = $1; + } + if ( defined($influx_db_error) ) + { + InfluxDBLogger_HttpCallback_Error($hash,$name,$influx_db_error); + } + elsif ( defined($http_error) ) + { + InfluxDBLogger_HttpCallback_Error($hash,$name,$http_error); + } + else + { + my $succeeded_writes = ReadingsVal($name,$succeeded_writes_name,0); + $succeeded_writes++; + my $rv = readingsSingleUpdate($hash, $succeeded_writes_name, $succeeded_writes, 1); + Log3 $name, 4, "InfluxDBLogger: [$name] HTTP Succeeded ".$rv; + } + } + InfluxDBLogger_UpdateState($hash,$name); +} +sub InfluxDBLogger_UpdateState($$) +{ + my ($hash, $name) = @_; + my $new_state = "Statistics: t=".ReadingsVal($name,$total_writes_name,0)." s=".ReadingsVal($name,$succeeded_writes_name,0)." f=".ReadingsVal($name,$failed_writes_name,0); + readingsSingleUpdate($hash,"state",$new_state,1); +} + + +sub InfluxDBLogger_HttpCallback_Error($$$) +{ + my ($hash, $name, $err) = @_; + + + Log3 $name, 1,"InfluxDBLogger: [$name] Error = $err"; + readingsBeginUpdate($hash); + readingsBulkUpdate($hash, $failed_writes_last_error_name, $err); + + my $failed_writes = ReadingsVal($name,$failed_writes_name,0); + $failed_writes++; + readingsBulkUpdate($hash, $failed_writes_name, $failed_writes); + + readingsEndUpdate($hash, 1); +} + +sub InfluxDBLogger_DroppedIncompatibleValues($$@) +{ + my ($hash, $name, @warnings) = @_; + + readingsBeginUpdate($hash); + my $dropped_writes = ReadingsVal($name,$dropped_writes_name,0); + my $warn = ""; + foreach (@warnings) { + $warn = $_; + Log3 $name, 4, "InfluxDBLogger: [$name] Warning, incompatible non numeric value: $warn"; + } + readingsBulkUpdate($hash, $droppeed_writes_last_message_name, $warn); + + $dropped_writes+=scalar(@warnings); + readingsBulkUpdate($hash, $dropped_writes_name, $dropped_writes); + + readingsEndUpdate($hash, 1); +} + +sub InfluxDBLogger_Set($$@) +{ + my ( $hash, $name, $cmd, @args ) = @_; + Log3 $name, 5, "InfluxDBLogger: [$name] set $cmd"; + + if ( lc $cmd eq 'password' ) { + my $pwd = $args[0]; + InfluxDBLogger_StoreSecret($hash, $name,"passwd", $pwd); + return (undef,1); + } + elsif ( lc $cmd eq 'token' ) { + my $token = $args[0]; + InfluxDBLogger_StoreSecret($hash, $name,"token", $token); + return (undef,1); + } + elsif ( lc $cmd eq 'resetstatistics' ) { + InfluxDBLogger_ResetStatistics($hash, $name); + return (undef,1); + } + else { + return "Unknown argument $cmd, choose one of resetStatistics:noArg password token"; + } +} + +sub InfluxDBLogger_ResetStatistics($) +{ + my ( $hash, $name ) = @_; + readingsBeginUpdate($hash); + readingsBulkUpdate($hash, $total_writes_name, 0); + readingsBulkUpdate($hash, $succeeded_writes_name, 0); + readingsBulkUpdate($hash, $failed_writes_name, 0); + readingsBulkUpdate($hash, $dropped_writes_name, 0); + readingsBulkUpdate($hash, $droppeed_writes_last_message_name, ""); + readingsBulkUpdate($hash, $failed_writes_last_error_name, ""); + readingsEndUpdate($hash, 1); + + InfluxDBLogger_UpdateState($hash, $name); +} + +sub InfluxDBLogger_IsBasicAuth($) +{ + my $name = shift; + return AttrVal($name, "security", "") eq "basic_auth"; +} + +sub InfluxDBLogger_GetPassword() +{ + my $hash = shift; + my $name = shift; + + if( InfluxDBLogger_IsBasicAuth($name) ) + { + return InfluxDBLogger_ReadSecret($hash,$name,"passwd"); + } + else + { + return undef; + } +} + +sub InfluxDBLogger_StoreSecret { + my $hash = shift; + my $name = shift; + my $ref = shift; + my $password = shift; + + my $index = $hash->{TYPE} . "_" . $name . "_" . $ref; + my $key = getUniqueId() . $index; + my $enc_pwd = ""; + + $hash->{$ref} = undef; + + if ( eval "use Digest::MD5;1" ) { + $key = Digest::MD5::md5_hex( unpack "H*", $key ); + $key .= Digest::MD5::md5_hex($key); + } + + for my $char ( split //, $password ) { + my $encode = chop($key); + $enc_pwd .= sprintf( "%.2x", ord($char) ^ ord($encode) ); + $key = $encode . $key; + } + Log3 $name, 5, "InfluxDBLogger: [$name] storing new $ref"; + my $err = setKeyValue( $index, $enc_pwd ); + return "error while saving the $ref - $err" if ( defined($err) ); + + $hash->{$ref} = "saved"; + + return "$ref successfully saved"; +} + +sub InfluxDBLogger_ReadSecret { + my $hash = shift; + my $name = shift; + my $ref = shift; + + my $index = $hash->{TYPE} . "_" . $name . "_" . $ref; + my $key = getUniqueId() . $index; + my ( $password, $err ); + + Log3 $name, 4, "InfluxDBLogger [$name] - Read $ref from file"; + + ( $err, $password ) = getKeyValue($index); + + if ( defined($err) ) { + + Log3 $name, 3, "InfluxDBLogger [$name] - unable to read $ref from file: $err"; + return undef; + + } + + if ( defined($password) ) { + if ( eval "use Digest::MD5;1" ) { + + $key = Digest::MD5::md5_hex( unpack "H*", $key ); + $key .= Digest::MD5::md5_hex($key); + } + + my $dec_pwd = ''; + + for my $char ( map { pack( 'C', hex($_) ) } ( $password =~ /(..)/g ) ) { + + my $decode = chop($key); + $dec_pwd .= chr( ord($char) ^ ord($decode) ); + $key = $decode . $key; + } + + return $dec_pwd; + + } + else { + + Log3 $name, 3, "InfluxDBLogger [$name] - No $ref in file"; + return undef; + } + + return; +} + +sub InfluxDBLogger_Rename($$) { + my ( $new_name, $old_name) = @_; + + my $hash = $defs{$new_name}; + + InfluxDBLogger_StorePassword( $hash, $new_name, InfluxDBLogger_ReadPassword( $hash, $old_name ) ); + setKeyValue( $hash->{TYPE} . "_" . $old_name . "_passwd", undef ); + + return; +} + +# Eval-Rückgabewert für erfolgreiches +# Laden des Moduls +1; + + +# Beginn der Commandref + +=pod +=item helper +=item summary Logs numeric readings into InfluxDB time-series databases +=item summary_DE Schreibt numerische Readings in eine InfluxDB Zeitreihendatenbank + +=begin html + + +

InfluxDBLogger

+ + +=end html + +=begin html_DE + + +

InfluxDBLogger

+ + +=end html_DE + +# Ende der Commandref +=cut \ No newline at end of file diff --git a/fhem/MAINTAINER.txt b/fhem/MAINTAINER.txt index 24b119a36..1cc7c9d55 100644 --- a/fhem/MAINTAINER.txt +++ b/fhem/MAINTAINER.txt @@ -441,6 +441,7 @@ FHEM/93_DbLog.pm DS_Starter Automatisierung FHEM/93_DbRep.pm DS_Starter Sonstiges FHEM/93_FHEM2FHEM.pm rudolfkoenig Automatisierung FHEM/93_Log2Syslog.pm DS_Starter Automatisierung +FHEM/93_InfluxDBLogger.pm timmib Unterstützende Dienste FHEM/93_PWMR.pm jamesgo Heizungssteuerung/Raumklima FHEM/94_PWM.pm jamesgo Heizungssteuerung/Raumklima FHEM/95_Alarm.pm pahenning Unterstützende Dienste