# $Id$ # 93_InfluxDBLogger.pm # package main; use strict; use warnings; use HttpUtils; my $total_writes_name = "total_writes"; my $total_events_name = "total_events"; my $succeeded_writes_name = "succeeded_writes"; my $failed_writes_name = "failed_writes"; my $dropped_writes_name = "dropped_writes"; my $droppeed_writes_last_message_name = "dropped_writes_last_message"; my $failed_writes_last_error_name = "failed_writes_last_error"; # FHEM Modulfunktionen sub InfluxDBLogger_Initialize($) { my ($hash) = @_; $hash->{DefFn} = "InfluxDBLogger_Define"; $hash->{NotifyFn} = "InfluxDBLogger_Notify"; $hash->{SetFn} = "InfluxDBLogger_Set"; $hash->{RenameFn} = "InfluxDBLogger_Rename"; $hash->{AttrList} = "readingTimeStamps:1,0 stringValuesAllowed:1,0 disable:1,0 security:basic_auth,none,token username readingInclude readingExclude conversions deviceTagName measurement tags fields api:v1,v2 org precision:ms,s,us,ns " . $readingFnAttributes; Log3 undef, 2, "InfluxDBLogger: Initialized new"; } sub InfluxDBLogger_Define($$) { my ( $hash, $def ) = @_; my $name = $hash->{NAME}; my @a = split("[ \t][ \t]*", $def); return "Usage: define devname InfluxDBLogger [http|https]://IP_or_Hostname:port dbname devspec" if (scalar(@a) < 4); $hash->{URL} = $a[2]; $hash->{DATABASE} = $a[3]; $hash->{NOTIFYDEV} = $a[4]; if ( "THIS_WONT_USUALY_MATCH" =~ /$hash->{NOTIFYDEV}/ && $init_done) { $attr{$name}{disable} = "1"; Log3 $name, 2, "InfluxDBLogger: [$name] You specified a very loose device spec. To avoid a lot of events in your influx database this module is disabled on default. You might want to use the readingRegEx-attribute and enable the device afterwards."; } Log3 $name, 3, "InfluxDBLogger: [$name] defined with server ".$hash->{URL}." database ".$hash->{DATABASE}." notifydev ".$hash->{NOTIFYDEV}; } sub InfluxDBLogger_Notify($$) { my ($own_hash, $dev_hash) = @_; my $name = $own_hash->{NAME}; # own name return "" if(IsDisabled($name)); # Return without any further action if the module is disabled return "" if(!$init_done); my $devName = $dev_hash->{NAME}; # Device that created the events my $events = deviceEvents($dev_hash, 1); return "" if($devName eq "global" && grep(m/^INITIALIZED|REREADCFG$/, @{$events})); return "" if($own_hash->{TYPE} eq $dev_hash->{TYPE}); # avoid endless loops from logger to logger Log3 $name, 4, "InfluxDBLogger: [$name] notified from device $devName"; InfluxDBLogger_BuildAndSend($own_hash, $dev_hash, $events); } sub InfluxDBLogger_BuildAndSend($$$) { my ($own_hash, $dev_hash, $events) = @_; my $name = $own_hash->{NAME}; # own name my %map = InfluxDBLogger_BuildMap($own_hash, $dev_hash, $events); my @incompatible = (); my ($data,$rows) = InfluxDBLogger_BuildData($own_hash,$dev_hash,\%map,\@incompatible); InfluxDBLogger_Send($own_hash,$data,$rows); if (scalar(@incompatible) > 0 ) { InfluxDBLogger_DroppedIncompatibleValues($own_hash,$name,@incompatible); } } sub InfluxDBLogger_Write($$) { my ($hash, $name) = @_; return "" if(IsDisabled($name)); # Return without any further action if the module is disabled my @devices = devspec2array($hash->{NOTIFYDEV}); foreach my $deviceName (@devices) { my @events = (); Log3 $name, 4, "DEVNAME $deviceName"; my $dev_hash = $defs{$deviceName}; Log3 $name, 4, "DEVHASH $dev_hash"; my $readings = $dev_hash->{READINGS}; Log3 $name, 4, "BEFORE READING $readings"; foreach my $key (keys %{$readings}) { Log3 $name, 4, "READING $key"; my $value = ReadingsVal($deviceName,$key,undef); push(@events, $key . ": " .$value); } InfluxDBLogger_BuildAndSend($hash, $dev_hash, \@events); } } sub InfluxDBLogger_Send($$$) { my ($own_hash, $data, $rows) = @_; my $name = $own_hash->{NAME}; if ( $data ne "" ) { my $total_writes = ReadingsVal($name,$total_writes_name,0); my $total_events = ReadingsVal($name,$total_events_name,0); $total_writes+=1; $total_events+=$rows; readingsBeginUpdate($own_hash); readingsBulkUpdate($own_hash, $total_writes_name, $total_writes); readingsBulkUpdate($own_hash, $total_events_name, $total_events); readingsBulkUpdate($own_hash, "state", InfluxDBLogger_BuildState($own_hash,$name)); readingsEndUpdate($own_hash, 1); my $reqpar = { url => InfluxDBLogger_BuildUrl($own_hash), method => "POST", data => $data, hideurl => 1, hash => $own_hash, callback => \&InfluxDBLogger_HttpCallback }; InfluxDBLogger_AddSecurity($own_hash,$name,$reqpar); Log3 $name, 4, "InfluxDBLogger: [$name] Sending data ".$reqpar->{data}." to ".$reqpar->{url}; HttpUtils_NonblockingGet($reqpar); } } sub InfluxDBLogger_AddSecurity($$$) { my ($own_hash, $name, $reqpar) = @_; my $security = AttrVal($name, "security", ""); if ( $security eq "basic_auth" ) { my $user = AttrVal($name, "username", undef); $reqpar->{user} = $user; $reqpar->{pwd} = InfluxDBLogger_GetPassword($own_hash,$name); } elsif ( $security eq "token") { my $token = InfluxDBLogger_ReadSecret($own_hash,$name,"token"); $reqpar->{header} = { "Authorization" => "Token " . $token}; } } sub InfluxDBLogger_BuildMap($$$) { my ($own_hash, $dev_hash, $events) = @_; my $name = $own_hash->{NAME}; my $devName = $dev_hash->{NAME}; my %map = (); my $readingInclude = AttrVal($name, "readingInclude", undef); my $readingExclude = AttrVal($name, "readingExclude", undef); foreach my $event (@{$events}) { $event = "" if(!defined($event)); if ( (!defined($readingInclude) || $event =~ /$readingInclude/) && (!defined($readingExclude) || !($event =~ /$readingExclude/)) ) { Log3 $name, 4, "InfluxDBLogger: [$name] notified from device $devName about $event"; InfluxDBLogger_Map($own_hash, $dev_hash, $event, \%map); } } return %map; } sub InfluxDBLogger_BuildData($$$$) { my ($own_hash, $dev_hash, $map, $incompatible) = @_; my $name = $own_hash->{NAME}; my $data = ""; my $stringValuesAllowed = AttrVal($name, "stringValuesAllowed", 0); my $rows = 0; my %m = %{$map}; my $readingTimeStamps = AttrVal($name, "readingTimeStamps", 0); if ($readingTimeStamps) { foreach my $device (keys %m) { my $readings = $m{$device}; my %r = %{$readings}; foreach my $reading (keys %r) { my $value_map = $r{$reading}; my $value = $value_map->{"value"}; my $numeric = $value_map->{"numeric"}; if (($numeric) || ($stringValuesAllowed)) { my ($measurementAndTagSet,$fieldset,$timestamp) = InfluxDBLogger_BuildDataDynamic($own_hash, $dev_hash, $device, $reading, $value, $numeric); $data .= $measurementAndTagSet . " " . $fieldset; if(defined($timestamp)) { $data .= " " . $timestamp ."000000000" # nanoseconds } $data .= "\n"; $rows++; } else { push(@{$incompatible}, $device ." ". $reading . " " . $value); } } } } else { my %measuremnts = (); foreach my $device (keys %m) { my $readings = $m{$device}; my %r = %{$readings}; foreach my $reading (keys %r) { my $value_map = $r{$reading}; my $value = $value_map->{"value"}; my $numeric = $value_map->{"numeric"}; if (($numeric) || ($stringValuesAllowed)) { my ($measurementAndTagSet,$fieldset,$timestamp) = InfluxDBLogger_BuildDataDynamic($own_hash, $dev_hash, $device, $reading, $value, $numeric); if (defined $measuremnts{$measurementAndTagSet}) { $measuremnts{$measurementAndTagSet} .= "," . $fieldset; } else { $measuremnts{$measurementAndTagSet} = $fieldset; } $rows++; } else { push(@{$incompatible}, $device ." ". $reading . " " . $value); } } } foreach my $measurementAndTagSet ( keys %measuremnts ) { $data .= $measurementAndTagSet . " " . $measuremnts{$measurementAndTagSet} . "\n"; } } return $data, $rows; } sub InfluxDBLogger_BuildDataDynamic($$$$$$) { my ($hash, $dev_hash, $device, $reading, $value, $numeric) = @_; my $name = $hash->{NAME}; my $measurement = InfluxDBLogger_GetMeasurement($hash, $dev_hash, $device, $reading, $value); my $tag_set = InfluxDBLogger_GetTagSet($hash, $dev_hash, $device, $reading, $value); my $field_set = InfluxDBLogger_GetFieldSet($hash, $dev_hash, $device, $reading, $value, $numeric); my $timestamp = InfluxDBLogger_GetTimeStamp($hash, $dev_hash, $device, $reading, $value); my $measurementAndTagSet = defined $tag_set ? $measurement . "," . $tag_set : $measurement; return ($measurementAndTagSet,$field_set,$timestamp); } sub InfluxDBLogger_GetMeasurement($$$$$) { my ($hash, $dev_hash, $device, $reading, $value) = @_; my $name = $hash->{NAME}; my $measurement = AttrVal($name, "measurement", undef); if (defined $measurement) { $measurement =~ s/\{(.*)\}/eval($1)/ei; $measurement =~ s/\$DEVICE/$device/ei; $measurement =~ s/\$READINGNAME/$reading/ei; } else { $measurement = $reading; } return $measurement; } sub InfluxDBLogger_GetTagSet($$$$$) { my ($hash, $dev_hash, $device, $reading, $value) = @_; my $name = $hash->{NAME}; my $tags_set = AttrVal($name, "tags", undef); if (defined $tags_set) { if ( $tags_set eq "-" ) { $tags_set = undef; } else { $tags_set =~ s/\{(.*)\}/eval($1)/ei; $tags_set =~ s/\$DEVICE/$device/ei; } } else { $tags_set = AttrVal($name, "deviceTagName", "site_name")."=".$device; } return $tags_set; } sub InfluxDBLogger_GetFieldSet($$$$$$) { my ($hash, $dev_hash, $device, $reading, $value, $numeric) = @_; my $name = $hash->{NAME}; my $field_set = AttrVal($name, "fields", "value=\$READINGVALUE"); $field_set =~ s/\$READINGNAME/$reading/ei; if (!$numeric) { $value = "\"" . $value . "\"" } $field_set =~ s/\$READINGVALUE/$value/ei; return $field_set; } sub InfluxDBLogger_GetTimeStamp($$$$$) { my ($hash, $dev_hash, $device, $reading, $value) = @_; my $name = $hash->{NAME}; my $readingTimeStamps = AttrVal($name, "readingTimeStamps", 0); my $timeStamp = undef; if ($readingTimeStamps) { my $readingsTimestamp = ReadingsTimestamp($device, $reading,undef); if(defined($readingsTimestamp)) { my $readingsTimestampNum = time_str2num($readingsTimestamp); # ?? $timeStamp= $readingsTimestampNum+-2208992400 $timeStamp=$readingsTimestampNum } } return $timeStamp; } sub InfluxDBLogger_BuildUrl($) { my ($hash) = @_; my $name = $hash->{NAME}; my $url = $hash->{URL}; my $api = AttrVal($name, "api", "v1"); if ($api eq "v1") { $url .= "/write?db=".urlEncode($hash->{DATABASE}); } elsif ($api eq "v2") { my $org = AttrVal($name, "org", "privat"); my $bucket = $hash->{DATABASE}; $url .= "/api/v2/write?org=".urlEncode($org)."&bucket=".urlEncode($bucket); } else { Log3 $name, 1, "InfluxDBLogger: [$name] unsupported api"; $url = undef; } my $precision = AttrVal($name, "precision", undef); if ( defined($url) && defined($precision) ) { $url .= "&precision=".$precision; } return $url; } sub InfluxDBLogger_Map($$$$) { my ($hash, $dev_hash, $event, $map) = @_; my $name = $hash->{NAME}; my $deviceName = $dev_hash->{NAME}; my @readingAndValue = split(":[ \t]*", $event, 2); my $readingName = $readingAndValue[0]; my $readingValue = $readingAndValue[1]; my $conversions = AttrVal($name, "conversions", undef); if ( defined($conversions)) { my @conversions = split(",", $conversions); foreach ( @conversions ) { my @ab = split("=", $_); $readingValue =~ s/$ab[0]/eval($ab[1])/ei; } } $map->{$deviceName}->{$readingName}->{"value"} = $readingValue; $map->{$deviceName}->{$readingName}->{"numeric"} = $readingValue =~ /^[-+]?[0-9]*[\.\,]?[0-9]+([eE][-+]?[0-9]+)?$/; } sub InfluxDBLogger_HttpCallback($$$) { my ($param, $err, $data) = @_; my $hash = $param->{hash}; my $name = $hash->{NAME}; if($err ne "") { InfluxDBLogger_HttpCallback_Error($hash,$name,$err); } else { my $header = $param->{httpheader}; my $influx_db_error = undef; my $http_error = undef; while ($header =~ /X-Influxdb-Error:\s*(.*+)/g) { $influx_db_error = $1; } while ($header =~ /HTTP\/1\.0\s*([4|5]\d\d\s*.*+)/g) { $http_error = $1; } if ( defined($influx_db_error) ) { InfluxDBLogger_HttpCallback_Error($hash,$name,$influx_db_error); } elsif ( defined($http_error) ) { InfluxDBLogger_HttpCallback_Error($hash,$name,$http_error); } else { my $succeeded_writes = ReadingsVal($name,$succeeded_writes_name,0); $succeeded_writes++; my $rv = readingsSingleUpdate($hash, $succeeded_writes_name, $succeeded_writes, 1); Log3 $name, 4, "InfluxDBLogger: [$name] HTTP Succeeded ".$rv; } } InfluxDBLogger_UpdateState($hash,$name); } sub InfluxDBLogger_BuildState($$) { my ($hash, $name) = @_; return "Statistics: t=".ReadingsVal($name,$total_writes_name,0)." s=".ReadingsVal($name,$succeeded_writes_name,0)." f=".ReadingsVal($name,$failed_writes_name,0) ." e=".ReadingsVal($name,$total_events_name,0); } sub InfluxDBLogger_UpdateState($$) { my ($hash, $name) = @_; my $new_state = InfluxDBLogger_BuildState($hash, $name); readingsSingleUpdate($hash,"state",$new_state,1); } sub InfluxDBLogger_HttpCallback_Error($$$) { my ($hash, $name, $err) = @_; Log3 $name, 1,"InfluxDBLogger: [$name] Error = $err"; readingsBeginUpdate($hash); readingsBulkUpdate($hash, $failed_writes_last_error_name, $err); my $failed_writes = ReadingsVal($name,$failed_writes_name,0); $failed_writes++; readingsBulkUpdate($hash, $failed_writes_name, $failed_writes); readingsEndUpdate($hash, 1); } sub InfluxDBLogger_DroppedIncompatibleValues($$@) { my ($hash, $name, @warnings) = @_; readingsBeginUpdate($hash); my $dropped_writes = ReadingsVal($name,$dropped_writes_name,0); my $warn = ""; foreach (@warnings) { $warn = $_; Log3 $name, 4, "InfluxDBLogger: [$name] Warning, incompatible non numeric value: $warn"; } readingsBulkUpdate($hash, $droppeed_writes_last_message_name, $warn); $dropped_writes+=scalar(@warnings); readingsBulkUpdate($hash, $dropped_writes_name, $dropped_writes); readingsEndUpdate($hash, 1); } sub InfluxDBLogger_Set($$$@) { my ( $hash, $name, $cmd, @args ) = @_; Log3 $name, 5, "InfluxDBLogger: [$name] set $cmd"; if ( lc $cmd eq 'password' ) { my $pwd = $args[0]; InfluxDBLogger_StoreSecret($hash, $name,"passwd", $pwd); return (undef,1); } elsif ( lc $cmd eq 'token' ) { my $token = $args[0]; InfluxDBLogger_StoreSecret($hash, $name,"token", $token); return (undef,1); } elsif ( lc $cmd eq 'resetstatistics' ) { InfluxDBLogger_ResetStatistics($hash, $name); return (undef,1); } elsif ( lc $cmd eq 'write' ) { InfluxDBLogger_Write($hash, $name); return (undef,1); } else { return "Unknown argument $cmd, choose one of resetStatistics:noArg password token write:noArg"; } } sub InfluxDBLogger_ResetStatistics($$) { my ( $hash, $name ) = @_; readingsBeginUpdate($hash); readingsBulkUpdate($hash, $total_writes_name, 0); readingsBulkUpdate($hash, $total_events_name, 0); readingsBulkUpdate($hash, $succeeded_writes_name, 0); readingsBulkUpdate($hash, $failed_writes_name, 0); readingsBulkUpdate($hash, $dropped_writes_name, 0); readingsBulkUpdate($hash, $droppeed_writes_last_message_name, ""); readingsBulkUpdate($hash, $failed_writes_last_error_name, ""); readingsBulkUpdate($hash, "state", InfluxDBLogger_BuildState($hash,$name)); readingsEndUpdate($hash, 1); } sub InfluxDBLogger_IsBasicAuth($) { my $name = shift; return AttrVal($name, "security", "") eq "basic_auth"; } sub InfluxDBLogger_GetPassword($$) { my $hash = shift; my $name = shift; if( InfluxDBLogger_IsBasicAuth($name) ) { return InfluxDBLogger_ReadSecret($hash,$name,"passwd"); } else { return undef; } } sub InfluxDBLogger_StoreSecret($$$$) { my $hash = shift; my $name = shift; my $ref = shift; my $password = shift; my $index = $hash->{TYPE} . "_" . $name . "_" . $ref; my $key = getUniqueId() . $index; my $enc_pwd = ""; $hash->{$ref} = undef; if ( eval "use Digest::MD5;1" ) { $key = Digest::MD5::md5_hex( unpack "H*", $key ); $key .= Digest::MD5::md5_hex($key); } for my $char ( split //, $password ) { my $encode = chop($key); $enc_pwd .= sprintf( "%.2x", ord($char) ^ ord($encode) ); $key = $encode . $key; } Log3 $name, 5, "InfluxDBLogger: [$name] storing new $ref"; my $err = setKeyValue( $index, $enc_pwd ); return "error while saving the $ref - $err" if ( defined($err) ); $hash->{$ref} = "saved"; return "$ref successfully saved"; } sub InfluxDBLogger_ReadSecret($$$) { my $hash = shift; my $name = shift; my $ref = shift; my $index = $hash->{TYPE} . "_" . $name . "_" . $ref; my $key = getUniqueId() . $index; my ( $password, $err ); Log3 $name, 4, "InfluxDBLogger [$name] - Read $ref from file"; ( $err, $password ) = getKeyValue($index); if ( defined($err) ) { Log3 $name, 3, "InfluxDBLogger [$name] - unable to read $ref from file: $err"; return undef; } if ( defined($password) ) { if ( eval "use Digest::MD5;1" ) { $key = Digest::MD5::md5_hex( unpack "H*", $key ); $key .= Digest::MD5::md5_hex($key); } my $dec_pwd = ''; for my $char ( map { pack( 'C', hex($_) ) } ( $password =~ /(..)/g ) ) { my $decode = chop($key); $dec_pwd .= chr( ord($char) ^ ord($decode) ); $key = $decode . $key; } return $dec_pwd; } else { Log3 $name, 3, "InfluxDBLogger [$name] - No $ref in file"; return undef; } return; } sub InfluxDBLogger_Rename($$) { my ( $new_name, $old_name) = @_; my $hash = $defs{$new_name}; InfluxDBLogger_StorePassword( $hash, $new_name, InfluxDBLogger_ReadPassword( $hash, $old_name ) ); setKeyValue( $hash->{TYPE} . "_" . $old_name . "_passwd", undef ); return; } # Eval-Rückgabewert für erfolgreiches # Laden des Moduls 1; # Beginn der Commandref =pod =item helper =item summary Logs numeric readings into InfluxDB time-series databases =item summary_DE Schreibt numerische Readings in eine InfluxDB Zeitreihendatenbank =begin html

InfluxDBLogger

=end html =begin html_DE

InfluxDBLogger

=end html_DE # Ende der Commandref =cut