From 8bb473879ff33194bb81c3f9ab1fee3c77d0100e Mon Sep 17 00:00:00 2001 From: nasseeder1 Date: Wed, 30 Nov 2022 09:36:26 +0000 Subject: [PATCH] 93_DbLog: contrib 5.0.0 git-svn-id: https://svn.fhem.de/fhem/trunk@26757 2b470e98-0d58-463d-a4d8-8e2adae1ed80 --- fhem/contrib/DS_Starter/93_DbLog.pm | 1209 ++++++++------------------- 1 file changed, 341 insertions(+), 868 deletions(-) diff --git a/fhem/contrib/DS_Starter/93_DbLog.pm b/fhem/contrib/DS_Starter/93_DbLog.pm index 301e540a8..cb5841ba4 100644 --- a/fhem/contrib/DS_Starter/93_DbLog.pm +++ b/fhem/contrib/DS_Starter/93_DbLog.pm @@ -1,5 +1,5 @@ ############################################################################################################################################ -# $Id: 93_DbLog.pm 26750 2022-11-26 16:38:54Z DS_Starter $ +# $Id: 93_DbLog.pm 26750 2022-11-29 16:38:54Z DS_Starter $ # # 93_DbLog.pm # written by Dr. Boris Neubert 2007-12-30 @@ -39,7 +39,7 @@ no if $] >= 5.017011, warnings => 'experimental::smartmatch'; # Version History intern by DS_Starter: my %DbLog_vNotesIntern = ( - "5.0.0" => "28.11.2022 Test subprocess ", + "5.0.0" => "29.11.2022 Test subprocess ", "4.13.3" => "26.11.2022 revise commandref ", "4.13.2" => "06.11.2022 Patch Delta calculation (delta-d,delta-h) https://forum.fhem.de/index.php/topic,129975.msg1242272.html#msg1242272 ", "4.13.1" => "16.10.2022 edit commandref ", @@ -360,40 +360,30 @@ sub DbLog_Define { return "Bad regexp: $@" if($@); $hash->{REGEXP} = $regexp; - $hash->{MODE} = AttrVal($hash->{NAME}, "asyncMode", undef)?"asynchronous":"synchronous"; # Mode setzen Forum:#76213 + $hash->{MODE} = AttrVal($name, "asyncMode", undef) ? "asynchronous" : "synchronous"; # Mode setzen Forum:#76213 $hash->{HELPER}{OLDSTATE} = "initialized"; $hash->{HELPER}{MODMETAABSENT} = 1 if($modMetaAbsent); # Modul Meta.pm nicht vorhanden $hash->{HELPER}{TH} = "history"; # Tabelle history (wird ggf. durch Datenbankschema ergänzt) $hash->{HELPER}{TC} = "current"; # Tabelle current (wird ggf. durch Datenbankschema ergänzt) - # Versionsinformationen setzen - DbLog_setVersionInfo($hash); + DbLog_setVersionInfo ($hash); # Versionsinformationen setzen + notifyRegexpChanged ($hash, $regexp); # nur Events dieser Devices an NotifyFn weiterleiten, NOTIFYDEV wird gesetzt wenn möglich + + $hash->{PID} = $$; # remember PID for plotfork + $data{DbLog}{$name}{cache}{index} = 0; # CacheIndex für Events zum asynchronen Schreiben in DB - # nur Events dieser Devices an NotifyFn weiterleiten, NOTIFYDEV wird gesetzt wenn möglich - notifyRegexpChanged($hash, $regexp); - - #remember PID for plotfork - $hash->{PID} = $$; - - # CacheIndex für Events zum asynchronen Schreiben in DB - $data{DbLog}{$name}{cache}{index} = 0; - - # read configuration data - my $ret = DbLog_readCfg($hash); + my $ret = DbLog_readCfg($hash); # read configuration data if ($ret) { # return on error while reading configuration Log3($name, 1, "DbLog $name - Error while reading $hash->{CONFIGURATION}: '$ret' "); return $ret; } - # set used COLUMNS - InternalTimer(gettimeofday()+2, "DbLog_setinternalcols", $hash, 0); + InternalTimer(gettimeofday()+2, "DbLog_setinternalcols", $hash, 0); # set used COLUMNS - readingsSingleUpdate($hash, 'state', 'waiting for connection', 1); - _DbLog_ConnectPush($hash); - - # initial execution of DbLog_execmemcache - DbLog_execmemcache($hash); + readingsSingleUpdate ($hash, 'state', 'waiting for connection', 1); + _DbLog_ConnectPush ($hash); + DbLog_execmemcache ($hash); # initial execution of DbLog_execmemcache return; } @@ -413,9 +403,9 @@ sub DbLog_Undef { my $dbh = $hash->{DBHP}; BlockingKill($hash->{HELPER}{".RUNNING_PID"}) if($hash->{HELPER}{".RUNNING_PID"}); - BlockingKill($hash->{HELPER}{REDUCELOG_PID}) if($hash->{HELPER}{REDUCELOG_PID}); - BlockingKill($hash->{HELPER}{COUNT_PID}) if($hash->{HELPER}{COUNT_PID}); - BlockingKill($hash->{HELPER}{DELDAYS_PID}) if($hash->{HELPER}{DELDAYS_PID}); + BlockingKill($hash->{HELPER}{REDUCELOG_PID}) if($hash->{HELPER}{REDUCELOG_PID}); + BlockingKill($hash->{HELPER}{COUNT_PID}) if($hash->{HELPER}{COUNT_PID}); + BlockingKill($hash->{HELPER}{DELDAYS_PID}) if($hash->{HELPER}{DELDAYS_PID}); $dbh->disconnect() if(defined($dbh)); @@ -442,13 +432,28 @@ sub DbLog_DelayedShutdown { return 0 if(IsDisabled($name)); $hash->{HELPER}{SHUTDOWNSEQ} = 1; - # return 0 if(!$async && !$hash->{HELPER}{PUSHISRUNNING}); - Log3($name, 2, "DbLog $name - Last database write cycle due to shutdown ..."); + + Log3 ($name, 2, "DbLog $name - Last database write cycle due to shutdown ..."); DbLog_execmemcache($hash); return 1; } +##################################################### +# DelayedShutdown abschließen +# letzte Aktivitäten vor Freigabe des Shutdowns +##################################################### +sub _DbLog_finishDelayedShutdown { + my $hash = shift; + my $name = $hash->{NAME}; + + DbLog_SBP_CleanUp ($hash); + delete $hash->{HELPER}{SHUTDOWNSEQ}; + CancelDelayedShutdown ($name); + +return; +} + ################################################################ # # Wird bei jeder Aenderung eines Attributes dieser @@ -1416,24 +1421,23 @@ sub DbLog_Log { my $name = $hash->{NAME}; my $dev_name = $dev_hash->{NAME}; my $dev_type = uc($dev_hash->{TYPE}); - my $async = AttrVal($name, "asyncMode", 0 ); - my $clim = AttrVal($name, "cacheLimit", $dblog_cachedef ); - my $ce = AttrVal($name, "cacheEvents", 0 ); + my $async = AttrVal ($name, "asyncMode", 0); + my $clim = AttrVal ($name, "cacheLimit", $dblog_cachedef); + my $ce = AttrVal ($name, "cacheEvents", 0); + + return if(IsDisabled($name) || !$hash->{HELPER}{COLSET} || $init_done != 1); + my ($net,$force); - return if(IsDisabled($name) || !$hash->{HELPER}{COLSET} || $init_done != 1); - - # Notify-Routine Startzeit - my $nst = [gettimeofday]; + my $nst = [gettimeofday]; # Notify-Routine Startzeit my $events = deviceEvents($dev_hash, AttrVal($name, "addStateEvent", 1)); return if(!$events); my $max = int(@{$events}); - # verbose4 Logs nur für Devices in Attr "verbose4Devs" my $vb4show = 0; - my @vb4devs = split(",", AttrVal($name, "verbose4Devs", "")); + my @vb4devs = split(",", AttrVal ($name, 'verbose4Devs', '')); # verbose4 Logs nur für Devices in Attr "verbose4Devs" if (!@vb4devs) { $vb4show = 1; } @@ -1464,24 +1468,21 @@ sub DbLog_Log { my $DbLogSelectionMode = AttrVal($name, "DbLogSelectionMode","Exclude"); my $value_fn = AttrVal($name, "valueFn", ""); - # Funktion aus Device spezifischer DbLogValueFn validieren - if( $DbLogValueFn =~ m/^\s*(\{.*\})\s*$/s ) { + if( $DbLogValueFn =~ m/^\s*(\{.*\})\s*$/s ) { # Funktion aus Device spezifischer DbLogValueFn validieren $DbLogValueFn = $1; } else { $DbLogValueFn = ''; } - # Funktion aus Attr valueFn validieren - if( $value_fn =~ m/^\s*(\{.*\})\s*$/s ) { + if( $value_fn =~ m/^\s*(\{.*\})\s*$/s ) { # Funktion aus Attr valueFn validieren $value_fn = $1; } else { $value_fn = ''; } - #one Transaction - eval { + eval { # one Transaction for (my $i = 0; $i < $max; $i++) { my $next = 0; my $event = $events->[$i]; @@ -1514,7 +1515,7 @@ sub DbLog_Log { } } - $event =~ s/\|/_ESC_/gxs; # escape Pipe "|" + $event =~ s/\|/_ESC_/gxs; # escape Pipe "|" my @r = DbLog_ParseEvent($name,$dev_name, $dev_type, $event); $reading = $r[0]; @@ -1618,8 +1619,7 @@ sub DbLog_Log { next if($DoIt == 0); - # check auf defaultMinInterval - $DoIt = DbLog_checkDefMinInt($name,$dev_name,$now,$reading,$value); + $DoIt = DbLog_checkDefMinInt($name,$dev_name,$now,$reading,$value); # check auf defaultMinInterval if ($DoIt) { my $lastt = $defs{$dev_name}{Helper}{DBLOG}{$reading}{$name}{TIME}; # patch Forum:#111423 @@ -1628,15 +1628,14 @@ sub DbLog_Log { $defs{$dev_name}{Helper}{DBLOG}{$reading}{$name}{TIME} = $now; $defs{$dev_name}{Helper}{DBLOG}{$reading}{$name}{VALUE} = $value; - # Device spezifische DbLogValueFn-Funktion anwenden - if($DbLogValueFn ne '') { + if($DbLogValueFn ne '') { # Device spezifische DbLogValueFn-Funktion anwenden my $TIMESTAMP = $timestamp; my $LASTTIMESTAMP = $lastt // 0; # patch Forum:#111423 my $DEVICE = $dev_name; my $EVENT = $event; my $READING = $reading; my $VALUE = $value; - my $LASTVALUE = $lastv // ""; # patch Forum:#111423 + my $LASTVALUE = $lastv // ""; # patch Forum:#111423 my $UNIT = $unit; my $IGNORE = 0; my $CN = " "; @@ -1670,8 +1669,7 @@ sub DbLog_Log { $unit = $UNIT if(defined $UNIT); } - # zentrale valueFn im DbLog-Device abarbeiten - if($value_fn ne '') { + if($value_fn ne '') { # zentrale valueFn im DbLog-Device abarbeiten my $TIMESTAMP = $timestamp; my $LASTTIMESTAMP = $lastt // 0; # patch Forum:#111423 my $DEVICE = $dev_name; @@ -1736,7 +1734,9 @@ sub DbLog_Log { my $lmlr = $hash->{HELPER}{LASTLIMITRUNTIME}; my $syncival = AttrVal($name, "syncInterval", 30); if(!$lmlr || gettimeofday() > $lmlr+($syncival/2)) { - Log3 $name, 4, "DbLog $name -> Number of cache entries reached cachelimit $clim - start database sync."; + + Log3 ($name, 4, "DbLog $name -> Number of cache entries reached cachelimit $clim - start database sync."); + DbLog_execmemcache ($hash); $hash->{HELPER}{LASTLIMITRUNTIME} = gettimeofday(); } @@ -1750,6 +1750,7 @@ sub DbLog_Log { } } }; + if(!$async) { if(@row_array) { # synchoner Mode return if($hash->{HELPER}{REOPEN_RUNS}); # return wenn "reopen" mit Ablaufzeit gestartet ist @@ -1757,22 +1758,23 @@ sub DbLog_Log { my $error = DbLog_Push($hash, $vb4show, @row_array); Log3 ($name, 5, "DbLog $name -> DbLog_Push Returncode: $error") if($error && $vb4show); - CancelDelayedShutdown($name) if($hash->{HELPER}{SHUTDOWNSEQ}); - Log3 ($name, 2, "DbLog $name - Last database write cycle done") if(delete $hash->{HELPER}{SHUTDOWNSEQ}); - - my $state = $error ? $error : (IsDisabled($name)) ? "disabled" : "connected"; + my $state = $error ? $error : + IsDisabled($name) ? 'disabled' : + 'connected'; + DbLog_setReadingstate ($hash, $state); - # Notify-Routine Laufzeit ermitteln - $net = tv_interval($nst); + $net = tv_interval($nst); # Notify-Routine Laufzeit ermitteln } else { - CancelDelayedShutdown($name) if($hash->{HELPER}{SHUTDOWNSEQ}); - Log3 ($name, 2, "DbLog $name - no data for last database write cycle") if(delete $hash->{HELPER}{SHUTDOWNSEQ}); + if($hash->{HELPER}{SHUTDOWNSEQ}) { + Log3 ($name, 2, "DbLog $name - no data for last database write cycle"); + _DbLog_finishDelayedShutdown ($hash); + } } } - if($net && AttrVal($name, "showNotifyTime", undef)) { + if($net && AttrVal($name, 'showNotifyTime', 0)) { readingsSingleUpdate($hash, "notify_processing_time", sprintf("%.4f",$net), 1); } @@ -1919,7 +1921,7 @@ sub DbLog_Push { ($usepkh,$usepkc,$pkh,$pkc) = DbLog_checkUsePK ($params); } else { - Log3 $hash->{NAME}, 5, "DbLog $name -> Primary Key usage suppressed by attribute noSupportPK"; + Log3 ($name, 5, "DbLog $name -> Primary Key usage suppressed by attribute noSupportPK"); } my (@timestamp,@device,@type,@event,@reading,@value,@unit); @@ -1941,7 +1943,7 @@ sub DbLog_Push { push(@reading, "$a[4]"); push(@value, "$a[5]"); push(@unit, "$a[6]"); - Log3 $hash->{NAME}, 4, "DbLog $name -> processing event Timestamp: $a[0], Device: $a[1], Type: $a[2], Event: $a[3], Reading: $a[4], Value: $a[5], Unit: $a[6]" + Log3 ($name, 4, "DbLog $name -> processing event Timestamp: $a[0], Device: $a[1], Type: $a[2], Event: $a[3], Reading: $a[4], Value: $a[5], Unit: $a[6]") if($vb4show); } use warnings; @@ -1973,7 +1975,7 @@ sub DbLog_Push { for my $row (@row_array) { my @a = split("\\|",$row); s/_ESC_/\|/gxs for @a; # escaped Pipe return to "|" - Log3 $hash->{NAME}, 5, "DbLog $name -> processing event Timestamp: $a[0], Device: $a[1], Type: $a[2], Event: $a[3], Reading: $a[4], Value: $a[5], Unit: $a[6]"; + Log3 ($name, 5, "DbLog $name -> processing event Timestamp: $a[0], Device: $a[1], Type: $a[2], Event: $a[3], Reading: $a[4], Value: $a[5], Unit: $a[6]"); $a[3] =~ s/'/''/g; # escape ' with '' $a[5] =~ s/'/''/g; # escape ' with '' $a[6] =~ s/'/''/g; # escape ' with '' @@ -2003,14 +2005,14 @@ sub DbLog_Push { $ins_hist = 0 if($ins_hist eq "0E0"); if($ins_hist == $ceti) { - Log3 $hash->{NAME}, 4, "DbLog $name -> $ins_hist of $ceti events inserted into table $history".($usepkh?" using PK on columns $pkh":""); + Log3 $name, 4, "DbLog $name -> $ins_hist of $ceti events inserted into table $history".($usepkh?" using PK on columns $pkh":""); } else { if($usepkh) { - Log3 $hash->{NAME}, 3, "DbLog $name -> INFO - ".$ins_hist." of $ceti events inserted into table $history due to PK on columns $pkh"; + Log3 $name, 3, "DbLog $name -> INFO - ".$ins_hist." of $ceti events inserted into table $history due to PK on columns $pkh"; } else { - Log3 $hash->{NAME}, 2, "DbLog $name -> WARNING - only ".$ins_hist." of $ceti events inserted into table $history"; + Log3 $name, 2, "DbLog $name -> WARNING - only ".$ins_hist." of $ceti events inserted into table $history"; } } eval {$dbh->commit() if(!$dbh->{AutoCommit});}; # Data commit @@ -2029,7 +2031,7 @@ sub DbLog_Push { if ($@) { $errorh = $@; - Log3 $hash->{NAME}, 2, "DbLog $name -> Error table $history - $errorh"; + Log3 $name, 2, "DbLog $name -> Error table $history - $errorh"; eval {$dbh->rollback() if(!$dbh->{AutoCommit});}; # issue Turning on AutoCommit failed if ($@) { Log3($name, 2, "DbLog $name -> Error rollback $history - $@"); @@ -2100,7 +2102,7 @@ sub DbLog_Push { my $status = $tuple_status[$tuple]; $status = 0 if($status eq "0E0"); next if($status); # $status ist "1" wenn update ok - Log3 $hash->{NAME}, 4, "DbLog $name -> Failed to update in $current, try to insert - TS: $timestamp[$tuple], Device: $device[$tuple], Reading: $reading[$tuple], Status = $status"; + Log3 $name, 4, "DbLog $name -> Failed to update in $current, try to insert - TS: $timestamp[$tuple], Device: $device[$tuple], Reading: $reading[$tuple], Status = $status"; push(@timestamp_cur, "$timestamp[$tuple]"); push(@device_cur, "$device[$tuple]"); push(@type_cur, "$type[$tuple]"); @@ -2111,10 +2113,10 @@ sub DbLog_Push { $nupd_cur++; } if(!$nupd_cur) { - Log3 $hash->{NAME}, 4, "DbLog $name -> $ceti of $ceti events updated in table $current".($usepkc?" using PK on columns $pkc":""); + Log3 $name, 4, "DbLog $name -> $ceti of $ceti events updated in table $current".($usepkc?" using PK on columns $pkc":""); } else { - Log3 $hash->{NAME}, 4, "DbLog $name -> $nupd_cur of $ceti events not updated and try to insert into table $current".($usepkc?" using PK on columns $pkc":""); + Log3 $name, 4, "DbLog $name -> $nupd_cur of $ceti events not updated and try to insert into table $current".($usepkc?" using PK on columns $pkc":""); $doins = 1; } @@ -2134,14 +2136,14 @@ sub DbLog_Push { my $status = $tuple_status[$tuple]; $status = 0 if($status eq "0E0"); next if($status); # $status ist "1" wenn insert ok - Log3 $hash->{NAME}, 3, "DbLog $name -> Insert into $current rejected - TS: $timestamp[$tuple], Device: $device_cur[$tuple], Reading: $reading_cur[$tuple], Status = $status"; + Log3 $name, 3, "DbLog $name -> Insert into $current rejected - TS: $timestamp[$tuple], Device: $device_cur[$tuple], Reading: $reading_cur[$tuple], Status = $status"; $nins_cur++; } if(!$nins_cur) { - Log3 $hash->{NAME}, 4, "DbLog $name -> ".($#device_cur+1)." of ".($#device_cur+1)." events inserted into table $current ".($usepkc?" using PK on columns $pkc":""); + Log3 $name, 4, "DbLog $name -> ".($#device_cur+1)." of ".($#device_cur+1)." events inserted into table $current ".($usepkc?" using PK on columns $pkc":""); } else { - Log3 $hash->{NAME}, 4, "DbLog $name -> ".($#device_cur+1-$nins_cur)." of ".($#device_cur+1)." events inserted into table $current".($usepkc?" using PK on columns $pkc":""); + Log3 $name, 4, "DbLog $name -> ".($#device_cur+1-$nins_cur)." of ".($#device_cur+1)." events inserted into table $current".($usepkc?" using PK on columns $pkc":""); } } eval {$dbh->commit() if(!$dbh->{AutoCommit});}; # issue Turning on AutoCommit failed @@ -2210,19 +2212,19 @@ sub DbLog_Push { my $status = $tuple_status[$tuple]; $status = 0 if($status eq "0E0"); next if($status); # $status ist "1" wenn insert ok - Log3 $hash->{NAME}, 3, "DbLog $name -> Insert into $history rejected".($usepkh?" (possible PK violation) ":" ")."- TS: $timestamp[$tuple], Device: $device[$tuple], Event: $event[$tuple]"; + Log3 $name, 3, "DbLog $name -> Insert into $history rejected".($usepkh?" (possible PK violation) ":" ")."- TS: $timestamp[$tuple], Device: $device[$tuple], Event: $event[$tuple]"; my $nlh = ($timestamp[$tuple]."|".$device[$tuple]."|".$type[$tuple]."|".$event[$tuple]."|".$reading[$tuple]."|".$value[$tuple]."|".$unit[$tuple]); $nins_hist++; } if(!$nins_hist) { - Log3 $hash->{NAME}, 4, "DbLog $name -> $ceti of $ceti events inserted into table $history".($usepkh?" using PK on columns $pkh":""); + Log3 $name, 4, "DbLog $name -> $ceti of $ceti events inserted into table $history".($usepkh?" using PK on columns $pkh":""); } else { if($usepkh) { - Log3 $hash->{NAME}, 3, "DbLog $name -> INFO - ".($ceti-$nins_hist)." of $ceti events inserted into table $history due to PK on columns $pkh"; + Log3 $name, 3, "DbLog $name -> INFO - ".($ceti-$nins_hist)." of $ceti events inserted into table $history due to PK on columns $pkh"; } else { - Log3 $hash->{NAME}, 2, "DbLog $name -> WARNING - only ".($ceti-$nins_hist)." of $ceti events inserted into table $history"; + Log3 $name, 2, "DbLog $name -> WARNING - only ".($ceti-$nins_hist)." of $ceti events inserted into table $history"; } } eval {$dbh->commit() if(!$dbh->{AutoCommit});}; # Data commit @@ -2241,7 +2243,7 @@ sub DbLog_Push { if ($@) { $errorh = $@; - Log3 $hash->{NAME}, 2, "DbLog $name -> Error table $history - $errorh"; + Log3 $name, 2, "DbLog $name -> Error table $history - $errorh"; eval {$dbh->rollback() if(!$dbh->{AutoCommit});}; # issue Turning on AutoCommit failed if ($@) { Log3($name, 2, "DbLog $name -> Error rollback $history - $@"); @@ -2312,7 +2314,7 @@ sub DbLog_Push { my $status = $tuple_status[$tuple]; $status = 0 if($status eq "0E0"); next if($status); # $status ist "1" wenn update ok - Log3 $hash->{NAME}, 4, "DbLog $name -> Failed to update in $current, try to insert - TS: $timestamp[$tuple], Device: $device[$tuple], Reading: $reading[$tuple], Status = $status"; + Log3 $name, 4, "DbLog $name -> Failed to update in $current, try to insert - TS: $timestamp[$tuple], Device: $device[$tuple], Reading: $reading[$tuple], Status = $status"; push(@timestamp_cur, "$timestamp[$tuple]"); push(@device_cur, "$device[$tuple]"); push(@type_cur, "$type[$tuple]"); @@ -2323,10 +2325,10 @@ sub DbLog_Push { $nupd_cur++; } if(!$nupd_cur) { - Log3 $hash->{NAME}, 4, "DbLog $name -> $ceti of $ceti events updated in table $current".($usepkc?" using PK on columns $pkc":""); + Log3 $name, 4, "DbLog $name -> $ceti of $ceti events updated in table $current".($usepkc?" using PK on columns $pkc":""); } else { - Log3 $hash->{NAME}, 4, "DbLog $name -> $nupd_cur of $ceti events not updated and try to insert into table $current".($usepkc?" using PK on columns $pkc":""); + Log3 $name, 4, "DbLog $name -> $nupd_cur of $ceti events not updated and try to insert into table $current".($usepkc?" using PK on columns $pkc":""); $doins = 1; } @@ -2346,14 +2348,14 @@ sub DbLog_Push { my $status = $tuple_status[$tuple]; $status = 0 if($status eq "0E0"); next if($status); # $status ist "1" wenn insert ok - Log3 $hash->{NAME}, 3, "DbLog $name -> Insert into $current rejected - TS: $timestamp[$tuple], Device: $device_cur[$tuple], Reading: $reading_cur[$tuple], Status = $status"; + Log3 $name, 3, "DbLog $name -> Insert into $current rejected - TS: $timestamp[$tuple], Device: $device_cur[$tuple], Reading: $reading_cur[$tuple], Status = $status"; $nins_cur++; } if(!$nins_cur) { - Log3 $hash->{NAME}, 4, "DbLog $name -> ".($#device_cur+1)." of ".($#device_cur+1)." events inserted into table $current ".($usepkc?" using PK on columns $pkc":""); + Log3 $name, 4, "DbLog $name -> ".($#device_cur+1)." of ".($#device_cur+1)." events inserted into table $current ".($usepkc?" using PK on columns $pkc":""); } else { - Log3 $hash->{NAME}, 4, "DbLog $name -> ".($#device_cur+1-$nins_cur)." of ".($#device_cur+1)." events inserted into table $current".($usepkc?" using PK on columns $pkc":""); + Log3 $name, 4, "DbLog $name -> ".($#device_cur+1-$nins_cur)." of ".($#device_cur+1)." events inserted into table $current".($usepkc?" using PK on columns $pkc":""); } } eval {$dbh->commit() if(!$dbh->{AutoCommit});}; # issue Turning on AutoCommit failed @@ -2397,17 +2399,17 @@ sub DbLog_Push { return Encode::encode_utf8($error); } -############################################ -# Functions called from subprocess +################################################################# +# SubProcess - Hauptprozess gestartet durch DbLog_SBP_Init +# liest Daten vom Parentprozess mit +# $subprocess->readFromParent() # # my $parent = $subprocess->parent(); -############################################ +################################################################# sub DbLog_SBP_onRun { my $subprocess = shift; my $name = $subprocess->{name}; - # Log3 ($name, 1, "$name - SUBPROCESS: Running..."); - while (1) { my $json = $subprocess->readFromParent(); @@ -2423,6 +2425,7 @@ sub DbLog_SBP_onRun { my $tf = $memc->{tf}; my $bi = $memc->{bi}; my $utf8 = $memc->{utf8}; + my $verbose = $memc->{verbose}; my $history = $memc->{history}; my $current = $memc->{current}; my $model = $memc->{model}; @@ -2430,47 +2433,45 @@ sub DbLog_SBP_onRun { my $cdata = $memc->{cdata}; # Log Daten, z.B.: 3399 => 2022-11-29 09:33:32|SolCast|SOLARFORECAST||nextCycletime|09:33:47| my $errorh = 0; - my $error = 0; + my $error = q{}; my $doins = 0; # Hilfsvariable, wenn "1" sollen inserts in Tabelle current erfolgen (updates schlugen fehl) my $rowlback = 0; # Eventliste für Rückgabe wenn Fehler my $dbh; + my $params; my @row_array; my $ret; my $retjson; - my $rowhref; + my $rowhref; - for my $key (sort{$a<=>$b} (keys %{$cdata})) { - # Log3 ($name, 1, "$name - SUBPROCESS read from parent: $key -> ".$cdata->{$key}); - push @row_array, $cdata->{$key}; - } + $attr{$name}{verbose} = $verbose; # verbose Level übergeben ###################################################################################################### - Log3 ($name, 5, "DbLog $name -> Start DbLog_PushAsync"); Log3 ($name, 5, "DbLog $name -> DbLogType is: $DbLogType"); my $bst = [gettimeofday]; # Background-Startzeit my ($useac,$useta) = DbLog_commitMode ($name, $cm); - eval { - if (!$useac) { - $dbh = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, AutoCommit => 0, AutoInactiveDestroy => 1, mysql_enable_utf8 => $utf8 }); - } - elsif ($useac == 1) { - $dbh = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, AutoCommit => 1, AutoInactiveDestroy => 1, mysql_enable_utf8 => $utf8 }); - } - else { # Server default - $dbh = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, AutoInactiveDestroy => 1, mysql_enable_utf8 => $utf8 }); - } + $params = { + name => $name, + dbconn => $dbconn, + dbuser => $dbuser, + dbpassword => $dbpassword, + utf8 => $utf8, + useac => $useac, + model => $model }; - if ($@) { - Log3 ($name, 2, "DbLog $name - Error: $@"); + + ($error, $dbh) = _DbLog_SBP_onRun_connectDB ($params); + + if ($error) { + Log3 ($name, 2, "DbLog $name - Error: $error"); $ret = { name => $name, - error => $@, + error => $error, ot => 0, rowlback => $cdata # Rückgabe alle übergebenen Log-Daten }; @@ -2494,7 +2495,7 @@ sub DbLog_SBP_onRun { my ($usepkh,$usepkc,$pkh,$pkc); if (!$nsupk) { - my $params = { + $params = { name => $name, dbh => $dbh, dbconn => $dbconn, @@ -2507,8 +2508,8 @@ sub DbLog_SBP_onRun { else { Log3 ($name, 5, "DbLog $name -> Primary Key usage suppressed by attribute noSupportPK"); } - - my $ceti = $#row_array+1; + + my $ceti = scalar keys %{$cdata}; my (@timestamp,@device,@type,@event,@reading,@value,@unit); my (@timestamp_cur,@device_cur,@type_cur,@event_cur,@reading_cur,@value_cur,@unit_cur); @@ -2517,8 +2518,9 @@ sub DbLog_SBP_onRun { no warnings 'uninitialized'; - for my $row (@row_array) { - my @a = split("\\|",$row); + for my $key (sort {$a<=>$b} keys %{$cdata}) { + my $row = $cdata->{$key}; + my @a = split("\\|",$row); s/_ESC_/\|/gxs for @a; # escaped Pipe back to "|" push(@timestamp, "$a[0]"); @@ -2531,6 +2533,7 @@ sub DbLog_SBP_onRun { Log3 ($name, 5, "DbLog $name -> processing event Timestamp: $a[0], Device: $a[1], Type: $a[2], Event: $a[3], Reading: $a[4], Value: $a[5], Unit: $a[6]"); } + use warnings; if($bi) { @@ -2554,10 +2557,12 @@ sub DbLog_SBP_onRun { else { # ohne PK $sqlins = "INSERT INTO $history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES "; } + no warnings 'uninitialized'; - for my $row (@row_array) { - my @a = split("\\|",$row); + for my $key (sort {$a<=>$b} keys %{$cdata}) { + my $row = $cdata->{$key}; + my @a = split("\\|",$row); s/_ESC_/\|/gxs for @a; # escaped Pipe back to "|" Log3 ($name, 5, "DbLog $name -> processing event Timestamp: $a[0], Device: $a[1], Type: $a[2], Event: $a[3], Reading: $a[4], Value: $a[5], Unit: $a[6]"); @@ -2586,6 +2591,7 @@ sub DbLog_SBP_onRun { } eval { $sth_ih = $dbh->prepare($sqlins); + if($tl) { # Tracelevel setzen $sth_ih->{TraceLevel} = "$tl|$tf"; } @@ -2624,6 +2630,8 @@ sub DbLog_SBP_onRun { Log3 ($name, 2, "DbLog $name -> Error table $history - $errorh"); + $dbh->disconnect(); + $rowlback = $cdata if($useta); # nicht gespeicherte Datensätze nur zurück geben wenn Transaktion ein $ret = { @@ -2658,6 +2666,7 @@ sub DbLog_SBP_onRun { } if ($@) { Log3 ($name, 2, "DbLog $name - Error: $@"); + $dbh->disconnect(); $ret = { @@ -2709,7 +2718,7 @@ sub DbLog_SBP_onRun { ($tuples, $rows) = $sth_uc->execute_array( { ArrayTupleStatus => \my @tuple_status } ); my $nupd_cur = 0; - for my $tuple (0..$#row_array) { + for my $tuple (0..$ceti-1) { my $status = $tuple_status[$tuple]; $status = 0 if($status eq "0E0"); next if($status); # $status ist "1" wenn update ok @@ -2801,6 +2810,7 @@ sub DbLog_SBP_onRun { } if ($@) { # Eventliste zurückgeben wenn z.B. Disk I/O Error bei SQLITE Log3 ($name, 2, "DbLog $name - Error: $@"); + $dbh->disconnect(); $ret = { @@ -2812,31 +2822,32 @@ sub DbLog_SBP_onRun { $retjson = eval { encode_json($ret) }; $subprocess->writeToParent($retjson); - return; + next; } if($tl) { # Tracelevel setzen $sth_ih->{TraceLevel} = "$tl|$tf"; } - $sth_ih->bind_param_array(1, [@timestamp]); - $sth_ih->bind_param_array(2, [@device]); - $sth_ih->bind_param_array(3, [@type]); - $sth_ih->bind_param_array(4, [@event]); - $sth_ih->bind_param_array(5, [@reading]); - $sth_ih->bind_param_array(6, [@value]); - $sth_ih->bind_param_array(7, [@unit]); + $sth_ih->bind_param_array (1, [@timestamp]); + $sth_ih->bind_param_array (2, [@device]); + $sth_ih->bind_param_array (3, [@type]); + $sth_ih->bind_param_array (4, [@event]); + $sth_ih->bind_param_array (5, [@reading]); + $sth_ih->bind_param_array (6, [@value]); + $sth_ih->bind_param_array (7, [@unit]); eval { $dbh->begin_work() if($useta && $dbh->{AutoCommit}); }; # Transaktion wenn gewünscht und autocommit ein if ($@) { Log3 ($name, 2, "DbLog $name -> Error start transaction for $history - $@"); } + eval { ($tuples, $rows) = $sth_ih->execute_array( { ArrayTupleStatus => \my @tuple_status } ); my $nins_hist = 0; my @n2hist; - for my $tuple (0..$#row_array) { + for my $tuple (0..$ceti-1) { my $status = $tuple_status[$tuple]; $status = 0 if($status eq "0E0"); next if($status); # $status ist "1" wenn insert ok @@ -2860,6 +2871,7 @@ sub DbLog_SBP_onRun { } my $bkey = 1; + for my $line (@n2hist) { $line =~ s/\|/_ESC_/gxs; # escape Pipe "|" $rowhref->{$bkey} = $line; @@ -2954,11 +2966,12 @@ sub DbLog_SBP_onRun { if ($@) { Log3 ($name, 2, "DbLog $name -> Error start transaction for $current - $@"); } + eval { ($tuples, $rows) = $sth_uc->execute_array( { ArrayTupleStatus => \my @tuple_status } ); my $nupd_cur = 0; - for my $tuple (0..$#row_array) { + for my $tuple (0..$ceti-1) { my $status = $tuple_status[$tuple]; $status = 0 if($status eq "0E0"); next if($status); # $status ist "1" wenn update ok @@ -3004,6 +3017,7 @@ sub DbLog_SBP_onRun { $nins_cur++; } + if(!$nins_cur) { Log3 ($name, 4, "DbLog $name -> ".($#device_cur+1)." of ".($#device_cur+1)." events inserted into table $current ".($usepkc ? " using PK on columns $pkc" : "")); } @@ -3033,8 +3047,6 @@ sub DbLog_SBP_onRun { my $rt = tv_interval($st); # SQL-Laufzeit ermitteln my $brt = tv_interval($bst); # Background-Laufzeit ermitteln my $ot = $rt.",".$brt; - - Log3 ($name, 5, "DbLog $name -> DbLog_PushAsync finished"); $ret = { name => $name, @@ -3044,26 +3056,88 @@ sub DbLog_SBP_onRun { }; $retjson = eval { encode_json($ret) }; - $subprocess->writeToParent($retjson); - - - ###################################################################################################### - - + $subprocess->writeToParent($retjson); # hier schreiben wir etwas an den übergeordneten Prozess # dies wird über die globale Select-Schleife empfangen # und in der ReadFn ausgewertet. - #$subprocess->writeToParent($json); + # $subprocess->writeToParent($json); } } return; } +################################################################################### +# neue Datenbankverbindung im SubProcess +################################################################################### +sub _DbLog_SBP_onRun_connectDB { + my $paref = shift; + + my $name = $paref->{name}; + my $dbconn = $paref->{dbconn}; + my $dbuser = $paref->{dbuser}; + my $dbpassword = $paref->{dbpassword}; + my $utf8 = $paref->{utf8}; + my $useac = $paref->{useac}; + my $model = $paref->{model}; + + my $dbh = ''; + my $err = ''; + + eval { if (!$useac) { + $dbh = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, + RaiseError => 1, + AutoCommit => 0, + AutoInactiveDestroy => 1 + } + ); 1; + } + elsif ($useac == 1) { + $dbh = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, + RaiseError => 1, + AutoCommit => 1, + AutoInactiveDestroy => 1 + } + ); 1; + } + else { # Server default + $dbh = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, + RaiseError => 1, + AutoInactiveDestroy => 1 + } + ); 1; + } + } + or do { $err = $@; + Log3 ($name, 2, "DbLog $name - Error: $err"); + return $err; + }; + + if($utf8) { + if($model eq "MYSQL") { + $dbh->{mysql_enable_utf8} = 1; + $dbh->do('set names "UTF8"'); + } + + if($model eq "SQLITE") { + $dbh->do('PRAGMA encoding="UTF-8"'); + } + } + +return ($err, $dbh); +} + +##################################################### +## Subprocess wird beendet +##################################################### sub DbLog_SBP_onExit { my $subprocess = shift; - Log3 undef, 1, "EXITED!"; + my $name = $subprocess->{name}; + + Log3 ($name, 1, "DbLog $name - SubProcess EXITED!"); + +return; } ##################################################### @@ -3075,7 +3149,7 @@ sub DbLog_SBP_Init { return if($hash->{SBP_PID}); - $hash->{fhem}{subprocess} = undef; + $hash->{".fhem"}{subprocess} = undef; my $subprocess = SubProcess->new( { onRun => \&DbLog_SBP_onRun, onExit => \&DbLog_SBP_onExit @@ -3100,14 +3174,14 @@ sub DbLog_SBP_Init { Log3 ($name, 2, qq{DbLog $name - Subprocess "$pid" initialized ... ready for non-blocking operation}); - $hash->{fhem}{subprocess} = $subprocess; - $hash->{FD} = fileno $subprocess->child(); + $hash->{".fhem"}{subprocess} = $subprocess; + $hash->{FD} = fileno $subprocess->child(); delete($readyfnlist{"$name.$pid"}); - $selectlist{"$name.$pid"}= $hash; - - $hash->{SBP_STATE} = "Initialized"; - $hash->{SBP_PID} = $pid; + + $selectlist{"$name.$pid"} = $hash; + $hash->{SBP_PID} = $pid; + $hash->{SBP_STATE} = 'running'; return; } @@ -3119,14 +3193,21 @@ sub DbLog_SBP_CleanUp { my $hash = shift; my $name = $hash->{NAME}; - my $subprocess = $hash->{fhem}{subprocess}; + my $subprocess = $hash->{".fhem"}{subprocess}; return if(!defined $subprocess); my $pid = $subprocess->pid(); return if(!$pid); - $subprocess->terminate(); - $subprocess->wait(); + Log3 ($name, 2, qq{DbLog $name - stopping Subprocess "$pid" ...}); + + #$subprocess->terminate(); + #$subprocess->wait(); + + kill 'SIGKILL', $pid; + waitpid($pid, 0); + + Log3 ($name, 2, qq{DbLog $name - Subprocess "$pid" stopped}); delete($selectlist{"$name.$pid"}); delete $hash->{FD}; @@ -3139,13 +3220,13 @@ return; ################################################################################ # called from the global loop, when the select for hash->{FD} reports data -# entspräche der DbLog_PushAsyncDone -> geschrieben durch "onRun" Funktion +# geschrieben durch "onRun" Funktion ################################################################################ sub DbLog_SBP_Read { my $hash = shift; #my $name = $hash->{NAME}; - my $subprocess = $hash->{fhem}{subprocess}; + my $subprocess = $hash->{".fhem"}{subprocess}; # hier lesen wir aus der globalen Select-Schleife, was # in der onRun-Funktion geschrieben wurde @@ -3200,8 +3281,10 @@ sub DbLog_SBP_Read { delete $hash->{HELPER}{".RUNNING_PID"}; delete $hash->{HELPER}{LASTLIMITRUNTIME} if(!$error); - Log3 ($name, 2, "DbLog $name - Last database write cycle done") if(delete $hash->{HELPER}{SHUTDOWNSEQ}); - CancelDelayedShutdown($name); + if ($hash->{HELPER}{SHUTDOWNSEQ}) { + Log3 ($name, 2, "DbLog $name - Last database write cycle done"); + _DbLog_finishDelayedShutdown ($hash); + } } return; @@ -3220,12 +3303,11 @@ sub DbLog_execmemcache { my $timeout = AttrVal($name, "timeout", 86400 ); my $DbLogType = AttrVal($name, "DbLogType", "History" ); - my $dbconn = $hash->{dbconn}; - my $dbuser = $hash->{dbuser}; - my $dbpassword = $attr{"sec$name"}{secret}; - my $dolog = 1; + my $dbconn = $hash->{dbconn}; + my $dbuser = $hash->{dbuser}; + my $dbpassword = $attr{"sec$name"}{secret}; + my $dolog = 1; - # my @row_array; my ($dbh,$error); RemoveInternalTimer($hash, "DbLog_execmemcache"); @@ -3245,8 +3327,10 @@ sub DbLog_execmemcache { if (kill 0, $pid) { $alive = 1; + $hash->{SBP_STATE} = 'running'; } else { + $hash->{SBP_STATE} = "dead (".$hash->{SBP_PID}.")"; delete $hash->{SBP_PID}; } @@ -3259,7 +3343,7 @@ sub DbLog_execmemcache { return; } - my $subprocess = $hash->{fhem}{subprocess}; + my $subprocess = $hash->{".fhem"}{subprocess}; ################################################ my $memcount = $data{DbLog}{$name}{cache}{memcache} ? scalar(keys %{$data{DbLog}{$name}{cache}{memcache}}) : 0; @@ -3322,11 +3406,8 @@ sub DbLog_execmemcache { my $memc; for my $key (sort(keys %{$data{DbLog}{$name}{cache}{memcache}})) { Log3 ($name, 5, "DbLog $name -> MemCache contains: ".$data{DbLog}{$name}{cache}{memcache}{$key}); - - my $val = delete $data{DbLog}{$name}{cache}{memcache}{$key}; - #push (@row_array, $val); - - $memc->{cdata}{$key} = $val; # Subprocess Daten, z.B.: 2022-11-29 09:33:32|SolCast|SOLARFORECAST||nextCycletime|09:33:47| + + $memc->{cdata}{$key} = delete $data{DbLog}{$name}{cache}{memcache}{$key}; # Subprocess Daten, z.B.: 2022-11-29 09:33:32|SolCast|SOLARFORECAST||nextCycletime|09:33:47| } undef $data{DbLog}{$name}{cache}{memcache}; # sicherheitshalber Memory freigeben: https://perlmaven.com/undef-on-perl-arrays-and-hashes , bzw. https://www.effectiveperlprogramming.com/2018/09/undef-a-scalar-to-release-its-memory/ @@ -3342,6 +3423,7 @@ sub DbLog_execmemcache { $memc->{tf} = AttrVal($name, "traceFlag", 'SQL'); $memc->{bi} = AttrVal($name, "bulkInsert", 0); $memc->{cm} = AttrVal($name, 'commitMode', 'basic_ta:on'); + $memc->{verbose} = AttrVal($name, 'verbose', 3); $memc->{utf8} = defined ($hash->{UTF8}) ? $hash->{UTF8} : 0; $memc->{history} = $hash->{HELPER}{TH}; $memc->{current} = $hash->{HELPER}{TC}; @@ -3356,23 +3438,6 @@ sub DbLog_execmemcache { else { $subprocess->writeToChild($json); } - - ##################################################################### - - #my $rowlist = join('§', @row_array); - #$rowlist = encode_base64($rowlist,""); - - #$hash->{HELPER}{".RUNNING_PID"} = BlockingCall ( - # "DbLog_PushAsync", - # "$name|$rowlist", - # "DbLog_PushAsyncDone", - # $timeout, - # "DbLog_PushAsyncAborted", - # $hash - # ); - - #$hash->{HELPER}{".RUNNING_PID"}{loglevel} = 4; - #Log3 ($hash->{NAME}, 5, "DbLog $name -> DbLog_PushAsync called with timeout: $timeout"); } else { if($hash->{HELPER}{".RUNNING_PID"}) { @@ -3380,8 +3445,10 @@ sub DbLog_execmemcache { DbLog_writeFileIfCacheOverflow ($params); # Cache exportieren bei Overflow } else { - CancelDelayedShutdown($name) if($hash->{HELPER}{SHUTDOWNSEQ}); - Log3 ($name, 2, "DbLog $name - no data for last database write cycle") if(delete $hash->{HELPER}{SHUTDOWNSEQ}); + if($hash->{HELPER}{SHUTDOWNSEQ}) { + Log3 ($name, 2, "DbLog $name - no data for last database write cycle"); + _DbLog_finishDelayedShutdown ($hash); + } } } @@ -3389,7 +3456,7 @@ sub DbLog_execmemcache { my $nsdt = FmtDateTime($nextsync); my $se = AttrVal($name, "syncEvents", undef) ? 1 : 0; - readingsSingleUpdate($hash, "NextSync", $nsdt. " or if CacheUsage ".$clim." reached", $se); + readingsSingleUpdate($hash, "NextSync", $nsdt. " or when CacheUsage ".$clim." is reached", $se); DbLog_setReadingstate ($hash, $error); @@ -3464,645 +3531,6 @@ sub DbLog_setReadingstate { return; } -################################################################################################# -# -# Schreibroutine Einfügen Werte in DB asynchron non-blocking -# -################################################################################################# -sub DbLog_PushAsync { - my ($string) = @_; - my ($name,$rowlist) = split("\\|", $string); - my $hash = $defs{$name}; - my $dbconn = $hash->{dbconn}; - my $dbuser = $hash->{dbuser}; - my $dbpassword = $attr{"sec$name"}{secret}; - my $DbLogType = AttrVal($name, "DbLogType", "History"); - my $nsupk = AttrVal($name, "noSupportPK", 0); - my $tl = AttrVal($name, "traceLevel", 0); - my $tf = AttrVal($name, "traceFlag", "SQL"); - my $bi = AttrVal($name, "bulkInsert", 0); - my $utf8 = defined($hash->{UTF8})?$hash->{UTF8}:0; - my $history = $hash->{HELPER}{TH}; - my $current = $hash->{HELPER}{TC}; - my $errorh = 0; - my $error = 0; - my $doins = 0; # Hilfsvariable, wenn "1" sollen inserts in Tabelle current erfolgen (updates schlugen fehl) - my $dbh; - my $rowlback = 0; # Eventliste für Rückgabe wenn Fehler - - Log3 ($name, 5, "DbLog $name -> Start DbLog_PushAsync"); - Log3 ($name, 5, "DbLog $name -> DbLogType is: $DbLogType"); - - # Background-Startzeit - my $bst = [gettimeofday]; - - my ($useac,$useta) = DbLog_commitMode ($name, AttrVal($name, 'commitMode', 'basic_ta:on')); - - eval { - if(!$useac) { - $dbh = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, AutoCommit => 0, AutoInactiveDestroy => 1, mysql_enable_utf8 => $utf8 }); - } - elsif($useac == 1) { - $dbh = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, AutoCommit => 1, AutoInactiveDestroy => 1, mysql_enable_utf8 => $utf8 }); - } - else { - # Server default - $dbh = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, AutoInactiveDestroy => 1, mysql_enable_utf8 => $utf8 }); - } - }; - if ($@) { - $error = encode_base64($@,""); - Log3 ($name, 2, "DbLog $name - Error: $@"); - Log3 ($name, 5, "DbLog $name -> DbLog_PushAsync finished"); - return "$name|$error|0|$rowlist"; - } - - if($tl) { # Tracelevel setzen - $dbh->{TraceLevel} = "$tl|$tf"; - } - - my $ac = $dbh->{AutoCommit} ? "ON" : "OFF"; - my $tm = $useta ? "ON" : "OFF"; - - Log3 ($name, 4, "DbLog $name -> AutoCommit mode: $ac, Transaction mode: $tm"); - Log3 ($name, 4, "DbLog $name -> Insert mode: ".($bi ? "Bulk" : "Array")); - - # check ob PK verwendet wird, @usepkx?Anzahl der Felder im PK:0 wenn kein PK, $pkx?Namen der Felder:none wenn kein PK - my ($usepkh,$usepkc,$pkh,$pkc); - - if (!$nsupk) { - my $params = { - name => $name, - dbh => $dbh, - dbconn => $dbconn, - history => $history, - current => $current - }; - - ($usepkh,$usepkc,$pkh,$pkc) = DbLog_checkUsePK ($params); - } - else { - Log3 $hash->{NAME}, 5, "DbLog $name -> Primary Key usage suppressed by attribute noSupportPK"; - } - - my $rowldec = decode_base64($rowlist); - my @row_array = split('§', $rowldec); - my $ceti = $#row_array+1; - - my (@timestamp,@device,@type,@event,@reading,@value,@unit); - my (@timestamp_cur,@device_cur,@type_cur,@event_cur,@reading_cur,@value_cur,@unit_cur); - my ($st,$sth_ih,$sth_ic,$sth_uc,$sqlins); - my ($tuples, $rows); - - no warnings 'uninitialized'; - - for my $row (@row_array) { - my @a = split("\\|",$row); - s/_ESC_/\|/gxs for @a; # escaped Pipe return to "|" - push(@timestamp, "$a[0]"); - push(@device, "$a[1]"); - push(@type, "$a[2]"); - push(@event, "$a[3]"); - push(@reading, "$a[4]"); - push(@value, "$a[5]"); - push(@unit, "$a[6]"); - Log3 $hash->{NAME}, 5, "DbLog $name -> processing event Timestamp: $a[0], Device: $a[1], Type: $a[2], Event: $a[3], Reading: $a[4], Value: $a[5], Unit: $a[6]"; - } - - use warnings; - - if($bi) { - ####################### - # Bulk-Insert - ####################### - $st = [gettimeofday]; # SQL-Startzeit - - if (lc($DbLogType) =~ m(history)) { - ######################################## - # insert history mit/ohne primary key - if ($usepkh && $hash->{MODEL} eq 'MYSQL') { - $sqlins = "INSERT IGNORE INTO $history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES "; - } - elsif ($usepkh && $hash->{MODEL} eq 'SQLITE') { - $sqlins = "INSERT OR IGNORE INTO $history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES "; - } - elsif ($usepkh && $hash->{MODEL} eq 'POSTGRESQL') { - $sqlins = "INSERT INTO $history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES "; - } - else { - # ohne PK - $sqlins = "INSERT INTO $history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES "; - } - no warnings 'uninitialized'; - - for my $row (@row_array) { - my @a = split("\\|",$row); - s/_ESC_/\|/gxs for @a; # escaped Pipe return to "|" - Log3 $hash->{NAME}, 5, "DbLog $name -> processing event Timestamp: $a[0], Device: $a[1], Type: $a[2], Event: $a[3], Reading: $a[4], Value: $a[5], Unit: $a[6]"; - $a[3] =~ s/'/''/g; # escape ' with '' - $a[5] =~ s/'/''/g; # escape ' with '' - $a[6] =~ s/'/''/g; # escape ' with '' - $a[3] =~ s/\\/\\\\/g; # escape \ with \\ - $a[5] =~ s/\\/\\\\/g; # escape \ with \\ - $a[6] =~ s/\\/\\\\/g; # escape \ with \\ - $sqlins .= "('$a[0]','$a[1]','$a[2]','$a[3]','$a[4]','$a[5]','$a[6]'),"; - } - - use warnings; - - chop($sqlins); - - if ($usepkh && $hash->{MODEL} eq 'POSTGRESQL') { - $sqlins .= " ON CONFLICT DO NOTHING"; - } - - eval { $dbh->begin_work() if($useta && $dbh->{AutoCommit}); }; # Transaktion wenn gewünscht und autocommit ein - if ($@) { - Log3($name, 2, "DbLog $name -> Error start transaction for $history - $@"); - } - - eval { $sth_ih = $dbh->prepare($sqlins); - if($tl) { # Tracelevel setzen - $sth_ih->{TraceLevel} = "$tl|$tf"; - } - my $ins_hist = $sth_ih->execute(); - $ins_hist = 0 if($ins_hist eq "0E0"); - - if($ins_hist == $ceti) { - Log3 $hash->{NAME}, 4, "DbLog $name -> $ins_hist of $ceti events inserted into table $history".($usepkh?" using PK on columns $pkh":""); - } - else { - if($usepkh) { - Log3 $hash->{NAME}, 3, "DbLog $name -> INFO - ".$ins_hist." of $ceti events inserted into table $history due to PK on columns $pkh"; - } - else { - Log3 $hash->{NAME}, 2, "DbLog $name -> WARNING - only ".$ins_hist." of $ceti events inserted into table $history"; - } - } - eval {$dbh->commit() if(!$dbh->{AutoCommit});}; # Data commit - if ($@) { - Log3($name, 2, "DbLog $name -> Error commit $history - $@"); - } - else { - if(!$dbh->{AutoCommit}) { - Log3($name, 4, "DbLog $name -> insert table $history committed"); - } - else { - Log3($name, 4, "DbLog $name -> insert table $history committed by autocommit"); - } - } - }; - - if ($@) { - $errorh = $@; - Log3 $hash->{NAME}, 2, "DbLog $name -> Error table $history - $errorh"; - $error = encode_base64($errorh,""); - $rowlback = $rowlist if($useta); # nicht gespeicherte Datensätze nur zurück geben wenn Transaktion ein - } - } - - if (lc($DbLogType) =~ m(current)) { - ################################################################# - # insert current mit/ohne primary key - # Array-Insert wird auch bei Bulk verwendet weil im Bulk-Mode - # die nicht upgedateten Sätze nicht identifiziert werden können - if ($usepkc && $hash->{MODEL} eq 'MYSQL') { - eval { $sth_ic = $dbh->prepare("INSERT IGNORE INTO $current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; - } - elsif ($usepkc && $hash->{MODEL} eq 'SQLITE') { - eval { $sth_ic = $dbh->prepare("INSERT OR IGNORE INTO $current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; - } - elsif ($usepkc && $hash->{MODEL} eq 'POSTGRESQL') { - eval { $sth_ic = $dbh->prepare("INSERT INTO $current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?) ON CONFLICT DO NOTHING"); }; - } - else { - # ohne PK - eval { $sth_ic = $dbh->prepare("INSERT INTO $current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; - } - if ($@) { - $error = encode_base64($@,""); - Log3 ($name, 2, "DbLog $name - Error: $@"); - Log3 ($name, 5, "DbLog $name -> DbLog_PushAsync finished"); - $dbh->disconnect(); - return "$name|$error|0|"; - } - - if ($usepkc && $hash->{MODEL} eq 'MYSQL') { - $sth_uc = $dbh->prepare("REPLACE INTO $current (TIMESTAMP, TYPE, EVENT, VALUE, UNIT, DEVICE, READING) VALUES (?,?,?,?,?,?,?)"); - } - elsif ($usepkc && $hash->{MODEL} eq 'SQLITE') { - $sth_uc = $dbh->prepare("INSERT OR REPLACE INTO $current (TIMESTAMP, TYPE, EVENT, VALUE, UNIT, DEVICE, READING) VALUES (?,?,?,?,?,?,?)"); - } - elsif ($usepkc && $hash->{MODEL} eq 'POSTGRESQL') { - $sth_uc = $dbh->prepare("INSERT INTO $current (TIMESTAMP, TYPE, EVENT, VALUE, UNIT, DEVICE, READING) VALUES (?,?,?,?,?,?,?) ON CONFLICT ($pkc) - DO UPDATE SET TIMESTAMP=EXCLUDED.TIMESTAMP, DEVICE=EXCLUDED.DEVICE, TYPE=EXCLUDED.TYPE, EVENT=EXCLUDED.EVENT, READING=EXCLUDED.READING, - VALUE=EXCLUDED.VALUE, UNIT=EXCLUDED.UNIT"); - } - else { - $sth_uc = $dbh->prepare("UPDATE $current SET TIMESTAMP=?, TYPE=?, EVENT=?, VALUE=?, UNIT=? WHERE (DEVICE=?) AND (READING=?)"); - } - - if($tl) { - # Tracelevel setzen - $sth_uc->{TraceLevel} = "$tl|$tf"; - $sth_ic->{TraceLevel} = "$tl|$tf"; - } - - $sth_uc->bind_param_array(1, [@timestamp]); - $sth_uc->bind_param_array(2, [@type]); - $sth_uc->bind_param_array(3, [@event]); - $sth_uc->bind_param_array(4, [@value]); - $sth_uc->bind_param_array(5, [@unit]); - $sth_uc->bind_param_array(6, [@device]); - $sth_uc->bind_param_array(7, [@reading]); - - eval { $dbh->begin_work() if($useta && $dbh->{AutoCommit}); }; # Transaktion wenn gewünscht und autocommit ein - if ($@) { - Log3($name, 2, "DbLog $name -> Error start transaction for $current - $@"); - } - eval { - ($tuples, $rows) = $sth_uc->execute_array( { ArrayTupleStatus => \my @tuple_status } ); - my $nupd_cur = 0; - for my $tuple (0..$#row_array) { - my $status = $tuple_status[$tuple]; - $status = 0 if($status eq "0E0"); - next if($status); # $status ist "1" wenn update ok - Log3 $hash->{NAME}, 4, "DbLog $name -> Failed to update in $current, try to insert - TS: $timestamp[$tuple], Device: $device[$tuple], Reading: $reading[$tuple], Status = $status"; - push(@timestamp_cur, "$timestamp[$tuple]"); - push(@device_cur, "$device[$tuple]"); - push(@type_cur, "$type[$tuple]"); - push(@event_cur, "$event[$tuple]"); - push(@reading_cur, "$reading[$tuple]"); - push(@value_cur, "$value[$tuple]"); - push(@unit_cur, "$unit[$tuple]"); - $nupd_cur++; - } - if(!$nupd_cur) { - Log3 $hash->{NAME}, 4, "DbLog $name -> $ceti of $ceti events updated in table $current".($usepkc?" using PK on columns $pkc":""); - } - else { - Log3 $hash->{NAME}, 4, "DbLog $name -> $nupd_cur of $ceti events not updated and try to insert into table $current".($usepkc?" using PK on columns $pkc":""); - $doins = 1; - } - - if ($doins) { - # events die nicht in Tabelle current updated wurden, werden in current neu eingefügt - $sth_ic->bind_param_array(1, [@timestamp_cur]); - $sth_ic->bind_param_array(2, [@device_cur]); - $sth_ic->bind_param_array(3, [@type_cur]); - $sth_ic->bind_param_array(4, [@event_cur]); - $sth_ic->bind_param_array(5, [@reading_cur]); - $sth_ic->bind_param_array(6, [@value_cur]); - $sth_ic->bind_param_array(7, [@unit_cur]); - - ($tuples, $rows) = $sth_ic->execute_array( { ArrayTupleStatus => \my @tuple_status } ); - my $nins_cur = 0; - for my $tuple (0..$#device_cur) { - my $status = $tuple_status[$tuple]; - $status = 0 if($status eq "0E0"); - next if($status); # $status ist "1" wenn insert ok - Log3 $hash->{NAME}, 3, "DbLog $name -> Insert into $current rejected - TS: $timestamp[$tuple], Device: $device_cur[$tuple], Reading: $reading_cur[$tuple], Status = $status"; - $nins_cur++; - } - if(!$nins_cur) { - Log3 $hash->{NAME}, 4, "DbLog $name -> ".($#device_cur+1)." of ".($#device_cur+1)." events inserted into table $current ".($usepkc?" using PK on columns $pkc":""); - } - else { - Log3 $hash->{NAME}, 4, "DbLog $name -> ".($#device_cur+1-$nins_cur)." of ".($#device_cur+1)." events inserted into table $current".($usepkc?" using PK on columns $pkc":""); - } - } - eval {$dbh->commit() if(!$dbh->{AutoCommit});}; # issue Turning on AutoCommit failed - if ($@) { - Log3($name, 2, "DbLog $name -> Error commit table $current - $@"); - } - else { - if(!$dbh->{AutoCommit}) { - Log3($name, 4, "DbLog $name -> insert / update table $current committed"); - } - else { - Log3($name, 4, "DbLog $name -> insert / update table $current committed by autocommit"); - } - } - }; - } - } - else { - ####################### - # Array-Insert - ####################### - - $st = [gettimeofday]; # SQL-Startzeit - - if (lc($DbLogType) =~ m(history)) { - ######################################## - # insert history mit/ohne primary key - if ($usepkh && $hash->{MODEL} eq 'MYSQL') { - eval { $sth_ih = $dbh->prepare("INSERT IGNORE INTO $history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; - } - elsif ($usepkh && $hash->{MODEL} eq 'SQLITE') { - eval { $sth_ih = $dbh->prepare("INSERT OR IGNORE INTO $history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; - } - elsif ($usepkh && $hash->{MODEL} eq 'POSTGRESQL') { - eval { $sth_ih = $dbh->prepare("INSERT INTO $history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?) ON CONFLICT DO NOTHING"); }; - } - else { - # ohne PK - eval { $sth_ih = $dbh->prepare("INSERT INTO $history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; - } - if ($@) { - # Eventliste zurückgeben wenn z.B. Disk I/O Error bei SQLITE - $error = encode_base64($@,""); - Log3 ($name, 2, "DbLog $name - Error: $@"); - Log3 ($name, 5, "DbLog $name -> DbLog_PushAsync finished"); - $dbh->disconnect(); - return "$name|$error|0|$rowlist"; - } - - if($tl) { - # Tracelevel setzen - $sth_ih->{TraceLevel} = "$tl|$tf"; - } - - $sth_ih->bind_param_array(1, [@timestamp]); - $sth_ih->bind_param_array(2, [@device]); - $sth_ih->bind_param_array(3, [@type]); - $sth_ih->bind_param_array(4, [@event]); - $sth_ih->bind_param_array(5, [@reading]); - $sth_ih->bind_param_array(6, [@value]); - $sth_ih->bind_param_array(7, [@unit]); - - eval { $dbh->begin_work() if($useta && $dbh->{AutoCommit}); }; # Transaktion wenn gewünscht und autocommit ein - if ($@) { - Log3($name, 2, "DbLog $name -> Error start transaction for $history - $@"); - } - eval { - ($tuples, $rows) = $sth_ih->execute_array( { ArrayTupleStatus => \my @tuple_status } ); - my $nins_hist = 0; - my @n2hist; - for my $tuple (0..$#row_array) { - my $status = $tuple_status[$tuple]; - $status = 0 if($status eq "0E0"); - next if($status); # $status ist "1" wenn insert ok - Log3 $hash->{NAME}, 3, "DbLog $name -> Insert into $history rejected".($usepkh?" (possible PK violation) ":" ")."- TS: $timestamp[$tuple], Device: $device[$tuple], Event: $event[$tuple]"; - my $nlh = ($timestamp[$tuple]."|".$device[$tuple]."|".$type[$tuple]."|".$event[$tuple]."|".$reading[$tuple]."|".$value[$tuple]."|".$unit[$tuple]); - push(@n2hist, "$nlh"); - $nins_hist++; - } - if(!$nins_hist) { - Log3 $hash->{NAME}, 4, "DbLog $name -> $ceti of $ceti events inserted into table $history".($usepkh?" using PK on columns $pkh":""); - } - else { - if($usepkh) { - Log3 $hash->{NAME}, 3, "DbLog $name -> INFO - ".($ceti-$nins_hist)." of $ceti events inserted into table history due to PK on columns $pkh"; - } - else { - Log3 $hash->{NAME}, 2, "DbLog $name -> WARNING - only ".($ceti-$nins_hist)." of $ceti events inserted into table $history"; - } - s/\|/_ESC_/gxs for @n2hist; # escape Pipe "|" - $rowlist = join('§', @n2hist); - $rowlist = encode_base64($rowlist,""); - } - eval {$dbh->commit() if(!$dbh->{AutoCommit});}; # Data commit - if ($@) { - Log3($name, 2, "DbLog $name -> Error commit $history - $@"); - } - else { - if(!$dbh->{AutoCommit}) { - Log3($name, 4, "DbLog $name -> insert table $history committed"); - } - else { - Log3($name, 4, "DbLog $name -> insert table $history committed by autocommit"); - } - } - }; - - if ($@) { - $errorh = $@; - Log3 $hash->{NAME}, 2, "DbLog $name -> Error table $history - $errorh"; - $error = encode_base64($errorh,""); - $rowlback = $rowlist if($useta); # nicht gespeicherte Datensätze nur zurück geben wenn Transaktion ein - } - } - - if (lc($DbLogType) =~ m(current)) { - ######################################## - # insert current mit/ohne primary key - if ($usepkc && $hash->{MODEL} eq 'MYSQL') { - eval { $sth_ic = $dbh->prepare("INSERT IGNORE INTO $current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; - } - elsif ($usepkc && $hash->{MODEL} eq 'SQLITE') { - eval { $sth_ic = $dbh->prepare("INSERT OR IGNORE INTO $current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; - } - elsif ($usepkc && $hash->{MODEL} eq 'POSTGRESQL') { - eval { $sth_ic = $dbh->prepare("INSERT INTO $current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?) ON CONFLICT DO NOTHING"); }; - } - else { - # ohne PK - eval { $sth_ic = $dbh->prepare("INSERT INTO $current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; - } - if ($@) { - # Eventliste zurückgeben wenn z.B. Disk I/O error bei SQLITE - $error = encode_base64($@,""); - Log3 ($name, 2, "DbLog $name - Error: $@"); - Log3 ($name, 5, "DbLog $name -> DbLog_PushAsync finished"); - $dbh->disconnect(); - return "$name|$error|0|$rowlist"; - } - - if ($usepkc && $hash->{MODEL} eq 'MYSQL') { - $sth_uc = $dbh->prepare("REPLACE INTO $current (TIMESTAMP, TYPE, EVENT, VALUE, UNIT, DEVICE, READING) VALUES (?,?,?,?,?,?,?)"); - } elsif ($usepkc && $hash->{MODEL} eq 'SQLITE') { - $sth_uc = $dbh->prepare("INSERT OR REPLACE INTO $current (TIMESTAMP, TYPE, EVENT, VALUE, UNIT, DEVICE, READING) VALUES (?,?,?,?,?,?,?)"); - } elsif ($usepkc && $hash->{MODEL} eq 'POSTGRESQL') { - $sth_uc = $dbh->prepare("INSERT INTO $current (TIMESTAMP, TYPE, EVENT, VALUE, UNIT, DEVICE, READING) VALUES (?,?,?,?,?,?,?) ON CONFLICT ($pkc) - DO UPDATE SET TIMESTAMP=EXCLUDED.TIMESTAMP, DEVICE=EXCLUDED.DEVICE, TYPE=EXCLUDED.TYPE, EVENT=EXCLUDED.EVENT, READING=EXCLUDED.READING, - VALUE=EXCLUDED.VALUE, UNIT=EXCLUDED.UNIT"); - } - else { - $sth_uc = $dbh->prepare("UPDATE $current SET TIMESTAMP=?, TYPE=?, EVENT=?, VALUE=?, UNIT=? WHERE (DEVICE=?) AND (READING=?)"); - } - - if($tl) { - # Tracelevel setzen - $sth_uc->{TraceLevel} = "$tl|$tf"; - $sth_ic->{TraceLevel} = "$tl|$tf"; - } - - $sth_uc->bind_param_array(1, [@timestamp]); - $sth_uc->bind_param_array(2, [@type]); - $sth_uc->bind_param_array(3, [@event]); - $sth_uc->bind_param_array(4, [@value]); - $sth_uc->bind_param_array(5, [@unit]); - $sth_uc->bind_param_array(6, [@device]); - $sth_uc->bind_param_array(7, [@reading]); - - eval { $dbh->begin_work() if($useta && $dbh->{AutoCommit}); }; # Transaktion wenn gewünscht und autocommit ein - if ($@) { - Log3($name, 2, "DbLog $name -> Error start transaction for $current - $@"); - } - eval { - ($tuples, $rows) = $sth_uc->execute_array( { ArrayTupleStatus => \my @tuple_status } ); - my $nupd_cur = 0; - for my $tuple (0..$#row_array) { - my $status = $tuple_status[$tuple]; - $status = 0 if($status eq "0E0"); - next if($status); # $status ist "1" wenn update ok - Log3 $hash->{NAME}, 4, "DbLog $name -> Failed to update in $current, try to insert - TS: $timestamp[$tuple], Device: $device[$tuple], Reading: $reading[$tuple], Status = $status"; - push(@timestamp_cur, "$timestamp[$tuple]"); - push(@device_cur, "$device[$tuple]"); - push(@type_cur, "$type[$tuple]"); - push(@event_cur, "$event[$tuple]"); - push(@reading_cur, "$reading[$tuple]"); - push(@value_cur, "$value[$tuple]"); - push(@unit_cur, "$unit[$tuple]"); - $nupd_cur++; - } - if(!$nupd_cur) { - Log3 $hash->{NAME}, 4, "DbLog $name -> $ceti of $ceti events updated in table $current".($usepkc?" using PK on columns $pkc":""); - } - else { - Log3 $hash->{NAME}, 4, "DbLog $name -> $nupd_cur of $ceti events not updated and try to insert into table $current".($usepkc?" using PK on columns $pkc":""); - $doins = 1; - } - - if ($doins) { - # events die nicht in Tabelle current updated wurden, werden in current neu eingefügt - $sth_ic->bind_param_array(1, [@timestamp_cur]); - $sth_ic->bind_param_array(2, [@device_cur]); - $sth_ic->bind_param_array(3, [@type_cur]); - $sth_ic->bind_param_array(4, [@event_cur]); - $sth_ic->bind_param_array(5, [@reading_cur]); - $sth_ic->bind_param_array(6, [@value_cur]); - $sth_ic->bind_param_array(7, [@unit_cur]); - - ($tuples, $rows) = $sth_ic->execute_array( { ArrayTupleStatus => \my @tuple_status } ); - my $nins_cur = 0; - for my $tuple (0..$#device_cur) { - my $status = $tuple_status[$tuple]; - $status = 0 if($status eq "0E0"); - next if($status); # $status ist "1" wenn insert ok - Log3 $hash->{NAME}, 3, "DbLog $name -> Insert into $current rejected - TS: $timestamp[$tuple], Device: $device_cur[$tuple], Reading: $reading_cur[$tuple], Status = $status"; - $nins_cur++; - } - if(!$nins_cur) { - Log3 $hash->{NAME}, 4, "DbLog $name -> ".($#device_cur+1)." of ".($#device_cur+1)." events inserted into table $current ".($usepkc?" using PK on columns $pkc":""); - } - else { - Log3 $hash->{NAME}, 4, "DbLog $name -> ".($#device_cur+1-$nins_cur)." of ".($#device_cur+1)." events inserted into table $current".($usepkc?" using PK on columns $pkc":""); - } - } - eval {$dbh->commit() if(!$dbh->{AutoCommit});}; # issue Turning on AutoCommit failed - if ($@) { - Log3($name, 2, "DbLog $name -> Error commit table $current - $@"); - } - else { - if(!$dbh->{AutoCommit}) { - Log3($name, 4, "DbLog $name -> insert / update table $current committed"); - } - else { - Log3($name, 4, "DbLog $name -> insert / update table $current committed by autocommit"); - } - } - }; - } - } - - $dbh->disconnect(); - - # SQL-Laufzeit ermitteln - my $rt = tv_interval($st); - - Log3 ($name, 5, "DbLog $name -> DbLog_PushAsync finished"); - - # Background-Laufzeit ermitteln - my $brt = tv_interval($bst); - - $rt = $rt.",".$brt; - -return "$name|$error|$rt|$rowlback"; -} - -############################################################################################# -# Auswertung non-blocking asynchron DbLog_PushAsync -############################################################################################# -sub DbLog_PushAsyncDone { - my ($string) = @_; - my @a = split("\\|",$string); - my $name = $a[0]; - my $hash = $defs{$name}; - my $error = $a[1] ? decode_base64($a[1]) : 0; - my $bt = $a[2]; - my $rowlist = $a[3]; - my $asyncmode = AttrVal($name, "asyncMode", undef); - my $memcount; - - Log3 ($name, 5, "DbLog $name -> Start DbLog_PushAsyncDone"); - - if($rowlist) { - $rowlist = decode_base64($rowlist); - my @row_array = split('§', $rowlist); - - #one Transaction - eval { - for my $row (@row_array) { # Cache & CacheIndex für Events zum asynchronen Schreiben in DB - $data{DbLog}{$name}{cache}{index}++; - my $index = $data{DbLog}{$name}{cache}{index}; - $data{DbLog}{$name}{cache}{memcache}{$index} = $row; - } - $memcount = scalar(keys %{$data{DbLog}{$name}{cache}{memcache}}); - }; - } - - $memcount = $data{DbLog}{$name}{cache}{memcache}?scalar(keys %{$data{DbLog}{$name}{cache}{memcache}}):0; - readingsSingleUpdate($hash, 'CacheUsage', $memcount, 0); - - if(AttrVal($name, "showproctime", undef) && $bt) { - my ($rt,$brt) = split(",", $bt); - readingsBeginUpdate($hash); - readingsBulkUpdate($hash, "background_processing_time", sprintf("%.4f",$brt)); - readingsBulkUpdate($hash, "sql_processing_time", sprintf("%.4f",$rt)); - readingsEndUpdate($hash, 1); - } - - my $state = $error ? $error : (IsDisabled($name)) ? "disabled" : "connected"; - DbLog_setReadingstate ($hash, $state); - - if(!$asyncmode) { - delete($defs{$name}{READINGS}{NextSync}); - delete($defs{$name}{READINGS}{background_processing_time}); - delete($defs{$name}{READINGS}{sql_processing_time}); - delete($defs{$name}{READINGS}{CacheUsage}); - } - delete $hash->{HELPER}{".RUNNING_PID"}; - delete $hash->{HELPER}{LASTLIMITRUNTIME} if(!$error); - Log3 ($name, 5, "DbLog $name -> DbLog_PushAsyncDone finished"); - - Log3 ($name, 2, "DbLog $name - Last database write cycle done") if(delete $hash->{HELPER}{SHUTDOWNSEQ}); - CancelDelayedShutdown($name); - -return; -} - -############################################################################################# -# Abbruchroutine Timeout non-blocking asynchron DbLog_PushAsync -############################################################################################# -sub DbLog_PushAsyncAborted { - my ($hash,$cause) = @_; - my $name = $hash->{NAME}; - $cause = $cause?$cause:"Timeout: process terminated"; - - Log3 ($name, 2, "DbLog $name -> ".$hash->{HELPER}{".RUNNING_PID"}{fn}." ".$cause) if(!$hash->{HELPER}{SHUTDOWNSEQ}); - DbLog_setReadingstate ($hash, $cause); - - delete $hash->{HELPER}{".RUNNING_PID"}; - delete $hash->{HELPER}{LASTLIMITRUNTIME}; - - Log3 ($name, 2, "DbLog $name - Last database write cycle done") if(delete $hash->{HELPER}{SHUTDOWNSEQ}); - CancelDelayedShutdown($name); - -return; -} - ################################################################ # # zerlegt uebergebenes FHEM-Datum in die einzelnen Bestandteile @@ -4143,7 +3571,7 @@ return $retv; # Verbindungen zur DB aufbauen ################################################################################### sub DbLog_readCfg { - my ($hash)= @_; + my $hash = shift; my $name = $hash->{NAME}; my $configfilename= $hash->{CONFIGURATION}; @@ -4177,19 +3605,25 @@ sub DbLog_readCfg { } else { $hash->{MODEL}="unknown"; - Log3 $hash->{NAME}, 1, "Unknown database model found in configuration file $configfilename."; - Log3 $hash->{NAME}, 1, "Only MySQL/MariaDB, PostgreSQL, Oracle, SQLite are fully supported."; + + Log3 $name, 1, "Unknown database model found in configuration file $configfilename."; + Log3 $name, 1, "Only MySQL/MariaDB, PostgreSQL, Oracle, SQLite are fully supported."; + return "unknown database type"; } if($hash->{MODEL} eq "MYSQL") { - $hash->{UTF8} = defined($dbconfig{utf8})?$dbconfig{utf8}:0; + $hash->{UTF8} = defined($dbconfig{utf8}) ? $dbconfig{utf8} : 0; } return; } -sub _DbLog_ConnectPush { # own $dbhp for synchronous logging and dblog_get + +################################################################################### +# own $dbhp for synchronous logging and dblog_get +################################################################################### +sub _DbLog_ConnectPush { my ($hash,$get) = @_; my $name = $hash->{NAME}; my $dbconn = $hash->{dbconn}; @@ -4212,19 +3646,33 @@ sub _DbLog_ConnectPush { # own $ eval { if(!$useac) { - $dbhp = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, AutoCommit => 0, AutoInactiveDestroy => 1, mysql_enable_utf8 => $utf8 }); + $dbhp = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, + RaiseError => 1, + AutoCommit => 0, + AutoInactiveDestroy => 1, + mysql_enable_utf8 => $utf8 + }); } elsif($useac == 1) { - $dbhp = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, AutoCommit => 1, AutoInactiveDestroy => 1, mysql_enable_utf8 => $utf8 }); + $dbhp = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, + RaiseError => 1, + AutoCommit => 1, + AutoInactiveDestroy => 1, + mysql_enable_utf8 => $utf8 + }); } else { # Server default - $dbhp = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, AutoInactiveDestroy => 1, mysql_enable_utf8 => $utf8 }); + $dbhp = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, + RaiseError => 1, + AutoInactiveDestroy => 1, + mysql_enable_utf8 => $utf8 + }); } }; if($@) { $err = $@; - Log3 $hash->{NAME}, 2, "DbLog $name - Error: $@"; + Log3 $name, 2, "DbLog $name - Error: $@"; } if(!$dbhp) { @@ -4239,6 +3687,7 @@ sub _DbLog_ConnectPush { # own $ InternalTimer(gettimeofday()+5, '_DbLog_ConnectPush', $hash, 0); Log3 ($name, 4, "DbLog $name - Waiting for database connection"); + return 0; } @@ -4278,7 +3727,10 @@ sub _DbLog_ConnectPush { # own $ return 1; } -sub _DbLog_ConnectNewDBH { # new dbh for common use (except DbLog_Push and get-function) +################################################################################### +# new dbh for common use (except DbLog_Push and get-function) +################################################################################### +sub _DbLog_ConnectNewDBH { my ($hash) = @_; my $name = $hash->{NAME}; my $dbconn = $hash->{dbconn}; @@ -4291,13 +3743,27 @@ sub _DbLog_ConnectNewDBH { # ne eval { if(!$useac) { - $dbh = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, AutoCommit => 0, AutoInactiveDestroy => 1, mysql_enable_utf8 => $utf8 }); + $dbh = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, + RaiseError => 1, + AutoCommit => 0, + AutoInactiveDestroy => 1, + mysql_enable_utf8 => $utf8 + }); } elsif($useac == 1) { - $dbh = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, AutoCommit => 1, AutoInactiveDestroy => 1, mysql_enable_utf8 => $utf8 }); + $dbh = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, + RaiseError => 1, + AutoCommit => 1, + AutoInactiveDestroy => 1, + mysql_enable_utf8 => $utf8 + }); } else { # Server default - $dbh = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, AutoInactiveDestroy => 1, mysql_enable_utf8 => $utf8 }); + $dbh = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, + RaiseError => 1, + AutoInactiveDestroy => 1, + mysql_enable_utf8 => $utf8 + }); } }; @@ -4386,7 +3852,7 @@ sub DbLog_ExecSQL1 { my $sth; eval { $sth = $dbh->do($sql); }; if($@) { - Log3($name, 2, "DbLog $name - ERROR: $@"); + Log3 ($name, 2, "DbLog $name - ERROR: $@"); return 0; } @@ -4433,7 +3899,7 @@ sub DbLog_Get { if($outf eq "int" && $inf eq "current") { $inf = "history"; - Log3 $hash->{NAME}, 3, "Defining DbLog SVG-Plots with :CURRENT is deprecated. Please define DbLog SVG-Plots with :HISTORY instead of :CURRENT. (define SVG ::HISTORY)"; + Log3 $name, 3, "Defining DbLog SVG-Plots with :CURRENT is deprecated. Please define DbLog SVG-Plots with :HISTORY instead of :CURRENT. (define SVG ::HISTORY)"; } if($outf eq "int") { @@ -4726,9 +4192,11 @@ sub DbLog_Get { my $val = $sql_value; my $ts = $sql_timestamp; eval("$readings[$i]->[4]"); - $sql_value = $val; + $sql_value = $val; $sql_timestamp = $ts; - if($@) {Log3 $hash->{NAME}, 3, "DbLog: Error in inline function: <".$readings[$i]->[4].">, Error: $@";} + if($@) { + Log3 ($name, 3, "DbLog: Error in inline function: <".$readings[$i]->[4].">, Error: $@"); + } } if($sql_timestamp lt $from && $deltacalc) { @@ -4882,7 +4350,7 @@ sub DbLog_Get { } # Wenn Attr SuppressUndef gesetzt ist, dann ausfiltern aller undef-Werte - $writeout = 0 if (!defined($sql_value) && AttrVal($hash->{NAME}, "suppressUndef", 0)); + $writeout = 0 if (!defined($sql_value) && AttrVal($name, 'suppressUndef', 0)); ###################### Ausgabe ########################### if($writeout) { @@ -5931,8 +5399,8 @@ sub DbLog_AddLog { $event = "addLog"; - $defs{$dev_name}{Helper}{DBLOG}{$dev_reading}{$hash->{NAME}}{TIME} = $now; - $defs{$dev_name}{Helper}{DBLOG}{$dev_reading}{$hash->{NAME}}{VALUE} = $read_val; + $defs{$dev_name}{Helper}{DBLOG}{$dev_reading}{$name}{TIME} = $now; + $defs{$dev_name}{Helper}{DBLOG}{$dev_reading}{$name}{VALUE} = $read_val; $ts = TimeNow(); @@ -5957,8 +5425,8 @@ sub DbLog_AddLog { # Anwender spezifische Funktion anwenden if($value_fn ne '') { - my $lastt = $defs{$dev_name}{Helper}{DBLOG}{$dev_reading}{$hash->{NAME}}{TIME}; # patch Forum:#111423 - my $lastv = $defs{$dev_name}{Helper}{DBLOG}{$dev_reading}{$hash->{NAME}}{VALUE}; + my $lastt = $defs{$dev_name}{Helper}{DBLOG}{$dev_reading}{$name}{TIME}; # patch Forum:#111423 + my $lastv = $defs{$dev_name}{Helper}{DBLOG}{$dev_reading}{$name}{VALUE}; my $TIMESTAMP = $ts; my $LASTTIMESTAMP = $lastt // 0; # patch Forum:#111423 @@ -5976,8 +5444,8 @@ sub DbLog_AddLog { Log3 $name, 2, "DbLog $name -> error valueFn: ".$@ if($@); if($IGNORE) { # aktueller Event wird nicht geloggt wenn $IGNORE=1 gesetzt - $defs{$dev_name}{Helper}{DBLOG}{$dev_reading}{$hash->{NAME}}{TIME} = $lastt if($lastt); # patch Forum:#111423 - $defs{$dev_name}{Helper}{DBLOG}{$dev_reading}{$hash->{NAME}}{VALUE} = $lastv if(defined $lastv); + $defs{$dev_name}{Helper}{DBLOG}{$dev_reading}{$name}{TIME} = $lastt if($lastt); # patch Forum:#111423 + $defs{$dev_name}{Helper}{DBLOG}{$dev_reading}{$name}{VALUE} = $lastv if(defined $lastv); next; } @@ -6006,7 +5474,7 @@ sub DbLog_AddLog { } my $row = ($ts."|".$dev_name."|".$dev_type."|".$event."|".$dev_reading."|".$read_val."|".$ut); - Log3 $hash->{NAME}, 3, "DbLog $name -> addLog created - TS: $ts, Device: $dev_name, Type: $dev_type, Event: $event, Reading: $dev_reading, Value: $read_val, Unit: $ut" + Log3 $name, 3, "DbLog $name -> addLog created - TS: $ts, Device: $dev_name, Type: $dev_type, Event: $event, Reading: $dev_reading, Value: $read_val, Unit: $ut" if(!AttrVal($name, "suppressAddLogV3",0)); if($async) { @@ -6074,8 +5542,8 @@ sub DbLog_addCacheLine { my $lastt; my $lastv; if($defs{$i_dev}) { - $lastt = $defs{$i_dev}{Helper}{DBLOG}{$i_reading}{$hash->{NAME}}{TIME}; - $lastv = $defs{$i_dev}{Helper}{DBLOG}{$i_reading}{$hash->{NAME}}{VALUE}; + $lastt = $defs{$i_dev}{Helper}{DBLOG}{$i_reading}{$name}{TIME}; + $lastv = $defs{$i_dev}{Helper}{DBLOG}{$i_reading}{$name}{VALUE}; } my $TIMESTAMP = $i_timestamp; @@ -6093,10 +5561,12 @@ sub DbLog_addCacheLine { eval $value_fn; Log3 $name, 2, "DbLog $name -> error valueFn: ".$@ if($@); - if($IGNORE) { # kein add wenn $IGNORE=1 gesetzt - $defs{$i_dev}{Helper}{DBLOG}{$i_reading}{$hash->{NAME}}{TIME} = $lastt if($defs{$i_dev} && $lastt); # patch Forum:#111423 - $defs{$i_dev}{Helper}{DBLOG}{$i_reading}{$hash->{NAME}}{VALUE} = $lastv if($defs{$i_dev} && defined $lastv); - Log3 $hash->{NAME}, 4, "DbLog $name -> Event ignored by valueFn - TS: $i_timestamp, Device: $i_dev, Type: $i_type, Event: $i_evt, Reading: $i_reading, Value: $i_val, Unit: $i_unit"; + if($IGNORE) { # kein add wenn $IGNORE=1 gesetzt + $defs{$i_dev}{Helper}{DBLOG}{$i_reading}{$name}{TIME} = $lastt if($defs{$i_dev} && $lastt); # patch Forum:#111423 + $defs{$i_dev}{Helper}{DBLOG}{$i_reading}{$name}{VALUE} = $lastv if($defs{$i_dev} && defined $lastv); + + Log3 $name, 4, "DbLog $name -> Event ignored by valueFn - TS: $i_timestamp, Device: $i_dev, Type: $i_type, Event: $i_evt, Reading: $i_reading, Value: $i_val, Unit: $i_unit"; + next; } @@ -6122,10 +5592,12 @@ sub DbLog_addCacheLine { my $row = ($i_timestamp."|".$i_dev."|".$i_type."|".$i_evt."|".$i_reading."|".$i_val."|".$i_unit); $row = DbLog_charfilter($row) if(AttrVal($name, "useCharfilter",0)); - Log3 $hash->{NAME}, 3, "DbLog $name -> added by addCacheLine - TS: $i_timestamp, Device: $i_dev, Type: $i_type, Event: $i_evt, Reading: $i_reading, Value: $i_val, Unit: $i_unit"; + + Log3 $name, 3, "DbLog $name -> added by addCacheLine - TS: $i_timestamp, Device: $i_dev, Type: $i_type, Event: $i_evt, Reading: $i_reading, Value: $i_val, Unit: $i_unit"; + use warnings; - eval { # one transaction + eval { # one transaction $data{DbLog}{$name}{cache}{index}++; my $index = $data{DbLog}{$name}{cache}{index}; $data{DbLog}{$name}{cache}{memcache}{$index} = $row; @@ -6328,7 +5800,8 @@ sub DbLog_reduceLog { }; if ($@) { - Log3($hash->{NAME}, 3, "DbLog $name: reduceLog ! FAILED ! for day $processingDay"); + Log3($name, 3, "DbLog $name: reduceLog ! FAILED ! for day $processingDay"); + eval {$dbh->rollback() if(!$dbh->{AutoCommit});}; $ret = 0; } @@ -6350,7 +5823,7 @@ sub DbLog_reduceLog { push(@averageUpd, {%hourlyKnown}) if($day != 00); $c = 0; - for my $hourHash (@averageUpd) { # Only count for logging... + for my $hourHash (@averageUpd) { # Only count for logging... for my $hourKey (keys %$hourHash) { $c++ if ($hourHash->{$hourKey}->[0] && scalar(@{$hourHash->{$hourKey}->[4]}) > 1); } @@ -6391,7 +5864,7 @@ sub DbLog_reduceLog { }; if ($@) { $err = $@; - Log3($hash->{NAME}, 2, "DbLog $name - reduceLogNbl ! FAILED ! for day $processingDay: $err"); + Log3($name, 2, "DbLog $name - reduceLogNbl ! FAILED ! for day $processingDay: $err"); eval {$dbh->rollback() if(!$dbh->{AutoCommit});}; @averageUpdD = (); } @@ -6468,7 +5941,7 @@ sub DbLog_reduceLog { }; if ($@) { $err = $@; - Log3($hash->{NAME}, 2, "DbLog $name - reduceLogNbl ! FAILED ! for day $processingDay: $err"); + Log3 ($name, 2, "DbLog $name - reduceLogNbl ! FAILED ! for day $processingDay: $err"); eval {$dbh->rollback() if(!$dbh->{AutoCommit});}; } else { @@ -6597,7 +6070,7 @@ sub DbLog_reduceLogNbl { my $ac = ($dbh->{AutoCommit}) ? "ON" : "OFF"; my $tm = ($useta) ? "ON" : "OFF"; - Log3 $hash->{NAME}, 4, "DbLog $name -> AutoCommit mode: $ac, Transaction mode: $tm"; + Log3 $name, 4, "DbLog $name -> AutoCommit mode: $ac, Transaction mode: $tm"; my ($od,$nd) = split(":",$a[2]); # $od - Tage älter als , $nd - Tage neuer als my ($ots,$nts); @@ -6693,7 +6166,7 @@ sub DbLog_reduceLogNbl { }; if ($@) { $err = $@; - Log3($hash->{NAME}, 2, "DbLog $name - reduceLogNbl ! FAILED ! for day $processingDay: $err"); + Log3 ($name, 2, "DbLog $name - reduceLogNbl ! FAILED ! for day $processingDay: $err"); eval {$dbh->rollback() if(!$dbh->{AutoCommit});}; if ($@) { Log3 ($name, 2, "DbLog $name -> DbLog_reduceLogNbl - $@"); @@ -6771,7 +6244,7 @@ sub DbLog_reduceLogNbl { if ($@) { $err = $@; - Log3($hash->{NAME}, 2, "DbLog $name - reduceLogNbl average=hour ! FAILED ! for day $processingDay: $err"); + Log3 ($name, 2, "DbLog $name - reduceLogNbl average=hour ! FAILED ! for day $processingDay: $err"); eval {$dbh->rollback() if(!$dbh->{AutoCommit});}; if ($@) { Log3 ($name, 2, "DbLog $name -> DbLog_reduceLogNbl - $@"); @@ -6859,7 +6332,7 @@ sub DbLog_reduceLogNbl { } }; if ($@) { - Log3($hash->{NAME}, 3, "DbLog $name: reduceLogNbl average=day ! FAILED ! for day $processingDay"); + Log3 ($name, 3, "DbLog $name: reduceLogNbl average=day ! FAILED ! for day $processingDay"); eval {$dbh->rollback() if(!$dbh->{AutoCommit});}; if ($@) { Log3 ($name, 2, "DbLog $name -> DbLog_reduceLogNbl - $@"); @@ -7061,7 +6534,7 @@ sub DbLog_deldaysNbl { my $ac = ($dbh->{AutoCommit})?"ON":"OFF"; my $tm = ($useta)?"ON":"OFF"; - Log3 $hash->{NAME}, 4, "DbLog $name -> AutoCommit mode: $ac, Transaction mode: $tm"; + Log3 $name, 4, "DbLog $name -> AutoCommit mode: $ac, Transaction mode: $tm"; $cmd = "delete from $history where TIMESTAMP < "; if ($hash->{MODEL} eq 'SQLITE') { @@ -7182,11 +6655,11 @@ sub DbLog_reopen { RemoveInternalTimer($hash, "DbLog_reopen"); - if(_DbLog_ConnectPush($hash)) { - # Statusbit "Kein Schreiben in DB erlauben" löschen - my $delay = delete $hash->{HELPER}{REOPEN_RUNS}; + if (_DbLog_ConnectPush($hash)) { + my $delay = delete $hash->{HELPER}{REOPEN_RUNS}; # Statusbit "Kein Schreiben in DB erlauben" löschen delete $hash->{HELPER}{REOPEN_RUNS_UNTIL}; - Log3($name, 2, "DbLog $name: Database connection reopened (it was $delay seconds closed).") if($delay); + + Log3 ($name, 2, "DbLog $name: Database connection reopened (it was $delay seconds closed).") if($delay); DbLog_setReadingstate ($hash, "reopened"); DbLog_execmemcache ($hash) if($async);