From 785ab692dc70f7f3943d1840f5f45cd9e117772e Mon Sep 17 00:00:00 2001 From: nasseeder1 Date: Mon, 15 Apr 2019 18:33:26 +0000 Subject: [PATCH] 93_DbLog: contrib V 4.4.0 git-svn-id: https://svn.fhem.de/fhem/trunk@19194 2b470e98-0d58-463d-a4d8-8e2adae1ed80 --- fhem/contrib/DS_Starter/93_DbLog.pm | 1362 ++++++++++++++++++--------- 1 file changed, 902 insertions(+), 460 deletions(-) diff --git a/fhem/contrib/DS_Starter/93_DbLog.pm b/fhem/contrib/DS_Starter/93_DbLog.pm index 6ec8f14a8..06d8e93bc 100644 --- a/fhem/contrib/DS_Starter/93_DbLog.pm +++ b/fhem/contrib/DS_Starter/93_DbLog.pm @@ -28,6 +28,7 @@ no if $] >= 5.017011, warnings => 'experimental::smartmatch'; # Version History intern by DS_Starter: our %DbLog_vNotesIntern = ( + "4.0.0" => "14.04.2019 rewrite DbLog_PushAsync / DbLog_Push / DbLog_Connectxx, new attribute \"bulkInsert\", ", "3.14.1" => "12.04.2019 DbLog_Get: change select of MySQL Forum: https://forum.fhem.de/index.php/topic,99280.0.html ", "3.14.0" => "05.04.2019 add support for Meta.pm and X_DelayedShutdownFn, attribute shutdownWait removed, ". "direct attribute help in FHEMWEB ", @@ -225,6 +226,7 @@ sub DbLog_Initialize($) $hash->{SVG_regexpFn} = "DbLog_regexpFn"; $hash->{DelayedShutdownFn} = "DbLog_DelayedShutdown"; $hash->{AttrList} = "addStateEvent:0,1 ". + "bulkInsert:1,0 ". "commitMode:basic_ta:on,basic_ta:off,ac:on_ta:on,ac:on_ta:off,ac:off_ta:on ". "colEvent ". "colReading ". @@ -349,6 +351,7 @@ sub DbLog_DelayedShutdown($) { return 0 if(IsDisabled($name)); $hash->{HELPER}{SHUTDOWNSEQ} = 1; + # return 0 if(!$async && !$hash->{HELPER}{PUSHISRUNNING}); Log3($name, 2, "DbLog $name - Last database write cycle due to shutdown ..."); DbLog_execmemcache($hash); @@ -570,7 +573,10 @@ sub DbLog_Set($@) { } elsif ($a[1] eq 'reopen') { if ($dbh) { - $dbh->commit() if(!$dbh->{AutoCommit}); + eval {$dbh->commit() if(!$dbh->{AutoCommit});}; + if ($@) { + Log3($name, 2, "DbLog $name -> Error commit history - $@"); + } $dbh->disconnect(); } if (!$a[2]) { @@ -1452,7 +1458,7 @@ sub DbLog_Log($$) { $net = tv_interval($nst); } else { CancelDelayedShutdown($name) if($hash->{HELPER}{SHUTDOWNSEQ}); - Log3 ($name, 2, "DbLog $name - no data for Last database write cycle") if(delete $hash->{HELPER}{SHUTDOWNSEQ}); + Log3 ($name, 2, "DbLog $name - no data for last database write cycle") if(delete $hash->{HELPER}{SHUTDOWNSEQ}); } } if($net && AttrVal($name, "showNotifyTime", undef)) { @@ -1471,6 +1477,7 @@ sub DbLog_Push(@) { my $supk = AttrVal($name, "noSupportPK", 0); my $tl = AttrVal($name, "traceLevel", 0); my $tf = AttrVal($name, "traceFlag", "SQL"); + my $bi = AttrVal($name, "bulkInsert", 0); my $errorh = 0; my $error = 0; my $doins = 0; # Hilfsvariable, wenn "1" sollen inserts in Tabelle current erfolgen (updates schlugen fehl) @@ -1501,6 +1508,11 @@ sub DbLog_Push(@) { $dbh->{RaiseError} = 1; $dbh->{PrintError} = 0; + if($tl) { + # Tracelevel setzen + $dbh->{TraceLevel} = "$tl|$tf"; + } + my ($useac,$useta) = DbLog_commitMode($hash); my $ac = ($dbh->{AutoCommit})?"ON":"OFF"; my $tm = ($useta)?"ON":"OFF"; @@ -1510,6 +1522,7 @@ sub DbLog_Push(@) { Log3 $name, 4, "DbLog $name -> ################################################################"; Log3 $name, 4, "DbLog $name -> DbLogType is: $DbLogType"; Log3 $name, 4, "DbLog $name -> AutoCommit mode: $ac, Transaction mode: $tm"; + Log3 $name, 4, "DbLog $name -> Insert mode: ".($bi?"Bulk":"Array"); # check ob PK verwendet wird, @usepkx?Anzahl der Felder im PK:0 wenn kein PK, $pkx?Namen der Felder:none wenn kein PK my ($usepkh,$usepkc,$pkh,$pkc); @@ -1521,7 +1534,9 @@ sub DbLog_Push(@) { my (@timestamp,@device,@type,@event,@reading,@value,@unit); my (@timestamp_cur,@device_cur,@type_cur,@event_cur,@reading_cur,@value_cur,@unit_cur); - my ($sth_ih,$sth_ic,$sth_uc); + my ($st,$sth_ih,$sth_ic,$sth_uc,$sqlins); + my ($tuples, $rows); + no warnings 'uninitialized'; my $ceti = $#row_array+1; @@ -1540,212 +1555,404 @@ sub DbLog_Push(@) { if($vb4show); } use warnings; - - if (lc($DbLogType) =~ m(history)) { - # insert history mit/ohne primary key - if ($usepkh && $hash->{MODEL} eq 'MYSQL') { - eval { $sth_ih = $dbh->prepare("INSERT IGNORE INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; - } elsif ($usepkh && $hash->{MODEL} eq 'SQLITE') { - eval { $sth_ih = $dbh->prepare("INSERT OR IGNORE INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; - } elsif ($usepkh && $hash->{MODEL} eq 'POSTGRESQL') { - eval { $sth_ih = $dbh->prepare("INSERT INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?) ON CONFLICT DO NOTHING"); }; - } else { - # old behavior - eval { $sth_ih = $dbh->prepare("INSERT INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; - } - if ($@) { - return $@; - } - $sth_ih->bind_param_array(1, [@timestamp]); - $sth_ih->bind_param_array(2, [@device]); - $sth_ih->bind_param_array(3, [@type]); - $sth_ih->bind_param_array(4, [@event]); - $sth_ih->bind_param_array(5, [@reading]); - $sth_ih->bind_param_array(6, [@value]); - $sth_ih->bind_param_array(7, [@unit]); - } - - if (lc($DbLogType) =~ m(current) ) { - # insert current mit/ohne primary key, insert-values für current werden generiert - if ($usepkc && $hash->{MODEL} eq 'MYSQL') { - eval { $sth_ic = $dbh->prepare("INSERT IGNORE INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; - } elsif ($usepkc && $hash->{MODEL} eq 'SQLITE') { - eval { $sth_ic = $dbh->prepare("INSERT OR IGNORE INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; - } elsif ($usepkc && $hash->{MODEL} eq 'POSTGRESQL') { - eval { $sth_ic = $dbh->prepare("INSERT INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?) ON CONFLICT DO NOTHING"); }; - } else { - # old behavior - eval { $sth_ic = $dbh->prepare("INSERT INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; - } - if ($@) { - return $@; - } - if ($usepkc && $hash->{MODEL} eq 'MYSQL') { - # update current (mit PK), insert-values für current wird generiert - $sth_uc = $dbh->prepare("REPLACE INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); - $sth_uc->bind_param_array(1, [@timestamp]); - $sth_uc->bind_param_array(2, [@device]); - $sth_uc->bind_param_array(3, [@type]); - $sth_uc->bind_param_array(4, [@event]); - $sth_uc->bind_param_array(5, [@reading]); - $sth_uc->bind_param_array(6, [@value]); - $sth_uc->bind_param_array(7, [@unit]); - } elsif ($usepkc && $hash->{MODEL} eq 'SQLITE') { - # update current (mit PK), insert-values für current wird generiert - $sth_uc = $dbh->prepare("INSERT OR REPLACE INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); - $sth_uc->bind_param_array(1, [@timestamp]); - $sth_uc->bind_param_array(2, [@device]); - $sth_uc->bind_param_array(3, [@type]); - $sth_uc->bind_param_array(4, [@event]); - $sth_uc->bind_param_array(5, [@reading]); - $sth_uc->bind_param_array(6, [@value]); - $sth_uc->bind_param_array(7, [@unit]); - } elsif ($usepkc && $hash->{MODEL} eq 'POSTGRESQL') { - # update current (mit PK), insert-values für current wird generiert - $sth_uc = $dbh->prepare("INSERT INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?) ON CONFLICT ($pkc) - DO UPDATE SET TIMESTAMP=EXCLUDED.TIMESTAMP, DEVICE=EXCLUDED.DEVICE, TYPE=EXCLUDED.TYPE, EVENT=EXCLUDED.EVENT, READING=EXCLUDED.READING, - VALUE=EXCLUDED.VALUE, UNIT=EXCLUDED.UNIT"); - $sth_uc->bind_param_array(1, [@timestamp]); - $sth_uc->bind_param_array(2, [@device]); - $sth_uc->bind_param_array(3, [@type]); - $sth_uc->bind_param_array(4, [@event]); - $sth_uc->bind_param_array(5, [@reading]); - $sth_uc->bind_param_array(6, [@value]); - $sth_uc->bind_param_array(7, [@unit]); - } else { - # for update current (ohne PK), insert-values für current wird generiert - $sth_uc = $dbh->prepare("UPDATE current SET TIMESTAMP=?, TYPE=?, EVENT=?, VALUE=?, UNIT=? WHERE (DEVICE=?) AND (READING=?)"); - $sth_uc->bind_param_array(1, [@timestamp]); + + if($bi) { + ####################### + # Bulk-Insert + ####################### + $st = [gettimeofday]; # SQL-Startzeit + + if (lc($DbLogType) =~ m(history)) { + ######################################## + # insert history mit/ohne primary key + if ($usepkh && $hash->{MODEL} eq 'MYSQL') { + $sqlins = "INSERT IGNORE INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES "; + } elsif ($usepkh && $hash->{MODEL} eq 'SQLITE') { + $sqlins = "INSERT OR IGNORE INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES "; + } elsif ($usepkh && $hash->{MODEL} eq 'POSTGRESQL') { + $sqlins = "INSERT INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES "; + } else { + # ohne PK + $sqlins = "INSERT INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES "; + } + + no warnings 'uninitialized'; + foreach my $row (@row_array) { + my @a = split("\\|",$row); + s/_ESC_/\|/g for @a; # escaped Pipe return to "|" + Log3 $hash->{NAME}, 5, "DbLog $name -> processing event Timestamp: $a[0], Device: $a[1], Type: $a[2], Event: $a[3], Reading: $a[4], Value: $a[5], Unit: $a[6]"; + $a[3] =~ s/'/''/g; # escape ' with '' + $a[5] =~ s/'/''/g; # escape ' with '' + $a[6] =~ s/'/''/g; # escape ' with '' + $sqlins .= "('$a[0]','$a[1]','$a[2]','$a[3]','$a[4]','$a[5]','$a[6]'),"; + } + use warnings; + + chop($sqlins); + + if ($usepkh && $hash->{MODEL} eq 'POSTGRESQL') { + $sqlins .= " ON CONFLICT DO NOTHING"; + } + + eval { $dbh->begin_work() if($useta && $dbh->{AutoCommit}); }; # Transaktion wenn gewünscht und autocommit ein + if ($@) { + Log3($name, 2, "DbLog $name -> Error start transaction for history - $@"); + } + eval { $sth_ih = $dbh->prepare($sqlins); + if($tl) { + # Tracelevel setzen + $sth_ih->{TraceLevel} = "$tl|$tf"; + } + my $ins_hist = $sth_ih->execute(); + $ins_hist = 0 if($ins_hist eq "0E0"); + + if($ins_hist == $ceti) { + Log3 $hash->{NAME}, 4, "DbLog $name -> $ins_hist of $ceti events inserted into table history".($usepkh?" using PK on columns $pkh":""); + } else { + if($usepkh) { + Log3 $hash->{NAME}, 3, "DbLog $name -> INFO - ".$ins_hist." of $ceti events inserted into table history due to PK on columns $pkh"; + } else { + Log3 $hash->{NAME}, 2, "DbLog $name -> WARNING - only ".$ins_hist." of $ceti events inserted into table history"; + } + } + eval {$dbh->commit() if(!$dbh->{AutoCommit});}; # Data commit + if ($@) { + Log3($name, 2, "DbLog $name -> Error commit history - $@"); + } else { + if(!$dbh->{AutoCommit}) { + Log3($name, 4, "DbLog $name -> insert table history committed"); + } else { + Log3($name, 4, "DbLog $name -> insert table history committed by autocommit"); + } + } + }; + + if ($@) { + $errorh = $@; + Log3 $hash->{NAME}, 2, "DbLog $name -> Error table history - $errorh"; + eval {$dbh->rollback() if(!$dbh->{AutoCommit});}; # issue Turning on AutoCommit failed + if ($@) { + Log3($name, 2, "DbLog $name -> Error rollback history - $@"); + } else { + Log3($name, 4, "DbLog $name -> insert history rolled back"); + } + } + } + + if (lc($DbLogType) =~ m(current)) { + ################################################################# + # insert current mit/ohne primary key + # Array-Insert wird auch bei Bulk verwendet weil im Bulk-Mode + # die nicht upgedateten Sätze nicht identifiziert werden können + if ($usepkc && $hash->{MODEL} eq 'MYSQL') { + eval { $sth_ic = $dbh->prepare("INSERT IGNORE INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; + } elsif ($usepkc && $hash->{MODEL} eq 'SQLITE') { + eval { $sth_ic = $dbh->prepare("INSERT OR IGNORE INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; + } elsif ($usepkc && $hash->{MODEL} eq 'POSTGRESQL') { + eval { $sth_ic = $dbh->prepare("INSERT INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?) ON CONFLICT DO NOTHING"); }; + } else { + # ohne PK + eval { $sth_ic = $dbh->prepare("INSERT INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; + } + if ($@) { + return $@; + } + + if ($usepkc && $hash->{MODEL} eq 'MYSQL') { + $sth_uc = $dbh->prepare("REPLACE INTO current (TIMESTAMP, TYPE, EVENT, VALUE, UNIT, DEVICE, READING) VALUES (?,?,?,?,?,?,?)"); + } elsif ($usepkc && $hash->{MODEL} eq 'SQLITE') { + $sth_uc = $dbh->prepare("INSERT OR REPLACE INTO current (TIMESTAMP, TYPE, EVENT, VALUE, UNIT, DEVICE, READING) VALUES (?,?,?,?,?,?,?)"); + } elsif ($usepkc && $hash->{MODEL} eq 'POSTGRESQL') { + $sth_uc = $dbh->prepare("INSERT INTO current (TIMESTAMP, TYPE, EVENT, VALUE, UNIT, DEVICE, READING) VALUES (?,?,?,?,?,?,?) ON CONFLICT ($pkc) + DO UPDATE SET TIMESTAMP=EXCLUDED.TIMESTAMP, DEVICE=EXCLUDED.DEVICE, TYPE=EXCLUDED.TYPE, EVENT=EXCLUDED.EVENT, READING=EXCLUDED.READING, + VALUE=EXCLUDED.VALUE, UNIT=EXCLUDED.UNIT"); + } else { + $sth_uc = $dbh->prepare("UPDATE current SET TIMESTAMP=?, TYPE=?, EVENT=?, VALUE=?, UNIT=? WHERE (DEVICE=?) AND (READING=?)"); + } + + if($tl) { + # Tracelevel setzen + $sth_uc->{TraceLevel} = "$tl|$tf"; + $sth_ic->{TraceLevel} = "$tl|$tf"; + } + + $sth_uc->bind_param_array(1, [@timestamp]); $sth_uc->bind_param_array(2, [@type]); $sth_uc->bind_param_array(3, [@event]); $sth_uc->bind_param_array(4, [@value]); $sth_uc->bind_param_array(5, [@unit]); $sth_uc->bind_param_array(6, [@device]); $sth_uc->bind_param_array(7, [@reading]); - } - } - - if($tl) { - # Tracelevel setzen - $dbh->{TraceLevel} = "$tl|$tf"; - $sth_ih->{TraceLevel} = "$tl|$tf"; - } - - my ($tuples, $rows); - - # insert into history-Tabelle - eval { $dbh->begin_work() if($useta && $dbh->{AutoCommit}); }; # Transaktion wenn gewünscht und autocommit ein - if ($@) { - Log3($name, 2, "DbLog $name -> Error start transaction for history - $@"); - } - eval { - if (lc($DbLogType) =~ m(history) ) { - ($tuples, $rows) = $sth_ih->execute_array( { ArrayTupleStatus => \my @tuple_status } ); - my $nins_hist = 0; - for my $tuple (0..$#row_array) { - my $status = $tuple_status[$tuple]; - $status = 0 if($status eq "0E0"); - next if($status); # $status ist "1" wenn insert ok - Log3 $hash->{NAME}, 3, "DbLog $name -> Insert into history rejected".($usepkh?" (possible PK violation) ":" ")."- TS: $timestamp[$tuple], Device: $device[$tuple], Event: $event[$tuple]"; - $nins_hist++; - } - if(!$nins_hist) { - Log3 $hash->{NAME}, 4, "DbLog $name -> $ceti of $ceti events inserted into table history".($usepkh?" using PK on columns $pkh":""); - } else { - Log3 $hash->{NAME}, 4, "DbLog $name -> ".($ceti-$nins_hist)." of $ceti events inserted into table history".($usepkh?" using PK on columns $pkh":""); - } - eval {$dbh->commit() if(!$dbh->{AutoCommit});}; # issue Turning on AutoCommit failed + + eval { $dbh->begin_work() if($useta && $dbh->{AutoCommit}); }; # Transaktion wenn gewünscht und autocommit ein if ($@) { - Log3($name, 2, "DbLog $name -> Error commit history - $@"); - } else { - if(!$dbh->{AutoCommit}) { - Log3($name, 4, "DbLog $name -> insert table history committed"); - } else { - Log3($name, 4, "DbLog $name -> insert table history committed by autocommit"); - } - } - } - }; - - if ($@) { - Log3 $hash->{NAME}, 2, "DbLog $name -> Error table history - $@"; - $errorh = $@; - eval {$dbh->rollback() if(!$dbh->{AutoCommit});}; # issue Turning on AutoCommit failed - if ($@) { - Log3($name, 2, "DbLog $name -> Error rollback history - $@"); - } else { - Log3($name, 4, "DbLog $name -> insert history rolled back"); - } - } - - # update or insert current - eval { $dbh->begin_work() if($useta && $dbh->{AutoCommit}); }; # Transaktion wenn gewünscht und autocommit ein - if ($@) { - Log3($name, 2, "DbLog $name -> Error start transaction for history - $@"); - } - eval { - if (lc($DbLogType) =~ m(current) ) { - ($tuples, $rows) = $sth_uc->execute_array( { ArrayTupleStatus => \my @tuple_status } ); - # Log3 $hash->{NAME}, 2, "DbLog $name -> Rows: $rows, Ceti: $ceti"; - my $nupd_cur = 0; - for my $tuple (0..$#row_array) { - my $status = $tuple_status[$tuple]; - $status = 0 if($status eq "0E0"); - next if($status); # $status ist "1" wenn update ok - Log3 $hash->{NAME}, 4, "DbLog $name -> Failed to update in current, try to insert - TS: $timestamp[$tuple], Device: $device[$tuple], Reading: $reading[$tuple], Status = $status"; - push(@timestamp_cur, "$timestamp[$tuple]"); - push(@device_cur, "$device[$tuple]"); - push(@type_cur, "$type[$tuple]"); - push(@event_cur, "$event[$tuple]"); - push(@reading_cur, "$reading[$tuple]"); - push(@value_cur, "$value[$tuple]"); - push(@unit_cur, "$unit[$tuple]"); - $nupd_cur++; - } - if(!$nupd_cur) { - Log3 $hash->{NAME}, 4, "DbLog $name -> $ceti of $ceti events updated in table current".($usepkc?" using PK on columns $pkc":""); - } else { - Log3 $hash->{NAME}, 4, "DbLog $name -> $nupd_cur of $ceti events not updated and try to insert into table current".($usepkc?" using PK on columns $pkc":""); - $doins = 1; - } - - if ($doins) { - # events die nicht in Tabelle current updated wurden, werden in current neu eingefügt - $sth_ic->bind_param_array(1, [@timestamp_cur]); - $sth_ic->bind_param_array(2, [@device_cur]); - $sth_ic->bind_param_array(3, [@type_cur]); - $sth_ic->bind_param_array(4, [@event_cur]); - $sth_ic->bind_param_array(5, [@reading_cur]); - $sth_ic->bind_param_array(6, [@value_cur]); - $sth_ic->bind_param_array(7, [@unit_cur]); - - ($tuples, $rows) = $sth_ic->execute_array( { ArrayTupleStatus => \my @tuple_status } ); - my $nins_cur = 0; - for my $tuple (0..$#device_cur) { - my $status = $tuple_status[$tuple]; - $status = 0 if($status eq "0E0"); - next if($status); # $status ist "1" wenn insert ok - Log3 $hash->{NAME}, 3, "DbLog $name -> Insert into current rejected - TS: $timestamp[$tuple], Device: $device_cur[$tuple], Reading: $reading_cur[$tuple], Status = $status"; - $nins_cur++; - } - if(!$nins_cur) { - Log3 $hash->{NAME}, 4, "DbLog $name -> ".($#device_cur+1)." of ".($#device_cur+1)." events inserted into table current ".($usepkc?" using PK on columns $pkc":""); - } else { - Log3 $hash->{NAME}, 4, "DbLog $name -> ".($#device_cur+1-$nins_cur)." of ".($#device_cur+1)." events inserted into table current".($usepkc?" using PK on columns $pkc":""); - } + Log3($name, 2, "DbLog $name -> Error start transaction for current - $@"); } - eval {$dbh->commit() if(!$dbh->{AutoCommit});}; # issue Turning on AutoCommit failed - if ($@) { - Log3($name, 2, "DbLog $name -> Error commit table current - $@"); + eval { + ($tuples, $rows) = $sth_uc->execute_array( { ArrayTupleStatus => \my @tuple_status } ); + my $nupd_cur = 0; + for my $tuple (0..$#row_array) { + my $status = $tuple_status[$tuple]; + $status = 0 if($status eq "0E0"); + next if($status); # $status ist "1" wenn update ok + Log3 $hash->{NAME}, 4, "DbLog $name -> Failed to update in current, try to insert - TS: $timestamp[$tuple], Device: $device[$tuple], Reading: $reading[$tuple], Status = $status"; + push(@timestamp_cur, "$timestamp[$tuple]"); + push(@device_cur, "$device[$tuple]"); + push(@type_cur, "$type[$tuple]"); + push(@event_cur, "$event[$tuple]"); + push(@reading_cur, "$reading[$tuple]"); + push(@value_cur, "$value[$tuple]"); + push(@unit_cur, "$unit[$tuple]"); + $nupd_cur++; + } + if(!$nupd_cur) { + Log3 $hash->{NAME}, 4, "DbLog $name -> $ceti of $ceti events updated in table current".($usepkc?" using PK on columns $pkc":""); + } else { + Log3 $hash->{NAME}, 4, "DbLog $name -> $nupd_cur of $ceti events not updated and try to insert into table current".($usepkc?" using PK on columns $pkc":""); + $doins = 1; + } + + if ($doins) { + # events die nicht in Tabelle current updated wurden, werden in current neu eingefügt + $sth_ic->bind_param_array(1, [@timestamp_cur]); + $sth_ic->bind_param_array(2, [@device_cur]); + $sth_ic->bind_param_array(3, [@type_cur]); + $sth_ic->bind_param_array(4, [@event_cur]); + $sth_ic->bind_param_array(5, [@reading_cur]); + $sth_ic->bind_param_array(6, [@value_cur]); + $sth_ic->bind_param_array(7, [@unit_cur]); + + ($tuples, $rows) = $sth_ic->execute_array( { ArrayTupleStatus => \my @tuple_status } ); + my $nins_cur = 0; + for my $tuple (0..$#device_cur) { + my $status = $tuple_status[$tuple]; + $status = 0 if($status eq "0E0"); + next if($status); # $status ist "1" wenn insert ok + Log3 $hash->{NAME}, 3, "DbLog $name -> Insert into current rejected - TS: $timestamp[$tuple], Device: $device_cur[$tuple], Reading: $reading_cur[$tuple], Status = $status"; + $nins_cur++; + } + if(!$nins_cur) { + Log3 $hash->{NAME}, 4, "DbLog $name -> ".($#device_cur+1)." of ".($#device_cur+1)." events inserted into table current ".($usepkc?" using PK on columns $pkc":""); + } else { + Log3 $hash->{NAME}, 4, "DbLog $name -> ".($#device_cur+1-$nins_cur)." of ".($#device_cur+1)." events inserted into table current".($usepkc?" using PK on columns $pkc":""); + } + } + eval {$dbh->commit() if(!$dbh->{AutoCommit});}; # issue Turning on AutoCommit failed + if ($@) { + Log3($name, 2, "DbLog $name -> Error commit table current - $@"); + } else { + if(!$dbh->{AutoCommit}) { + Log3($name, 4, "DbLog $name -> insert / update table current committed"); + } else { + Log3($name, 4, "DbLog $name -> insert / update table current committed by autocommit"); + } + } + }; + } + + } else { + ####################### + # Array-Insert + ####################### + + $st = [gettimeofday]; # SQL-Startzeit + + if (lc($DbLogType) =~ m(history)) { + ######################################## + # insert history mit/ohne primary key + if ($usepkh && $hash->{MODEL} eq 'MYSQL') { + eval { $sth_ih = $dbh->prepare("INSERT IGNORE INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; + } elsif ($usepkh && $hash->{MODEL} eq 'SQLITE') { + eval { $sth_ih = $dbh->prepare("INSERT OR IGNORE INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; + } elsif ($usepkh && $hash->{MODEL} eq 'POSTGRESQL') { + eval { $sth_ih = $dbh->prepare("INSERT INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?) ON CONFLICT DO NOTHING"); }; } else { - if(!$dbh->{AutoCommit}) { - Log3($name, 4, "DbLog $name -> insert / update table current committed"); - } else { - Log3($name, 4, "DbLog $name -> insert / update table current committed by autocommit"); - } - } - } - }; + # ohne PK + eval { $sth_ih = $dbh->prepare("INSERT INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; + } + if ($@) { + return $@; + } + + if($tl) { + # Tracelevel setzen + $sth_ih->{TraceLevel} = "$tl|$tf"; + } + + $sth_ih->bind_param_array(1, [@timestamp]); + $sth_ih->bind_param_array(2, [@device]); + $sth_ih->bind_param_array(3, [@type]); + $sth_ih->bind_param_array(4, [@event]); + $sth_ih->bind_param_array(5, [@reading]); + $sth_ih->bind_param_array(6, [@value]); + $sth_ih->bind_param_array(7, [@unit]); + + eval { $dbh->begin_work() if($useta && $dbh->{AutoCommit}); }; # Transaktion wenn gewünscht und autocommit ein + if ($@) { + Log3($name, 2, "DbLog $name -> Error start transaction for history - $@"); + } + eval { + ($tuples, $rows) = $sth_ih->execute_array( { ArrayTupleStatus => \my @tuple_status } ); + my $nins_hist = 0; + for my $tuple (0..$#row_array) { + my $status = $tuple_status[$tuple]; + $status = 0 if($status eq "0E0"); + next if($status); # $status ist "1" wenn insert ok + Log3 $hash->{NAME}, 3, "DbLog $name -> Insert into history rejected".($usepkh?" (possible PK violation) ":" ")."- TS: $timestamp[$tuple], Device: $device[$tuple], Event: $event[$tuple]"; + my $nlh = ($timestamp[$tuple]."|".$device[$tuple]."|".$type[$tuple]."|".$event[$tuple]."|".$reading[$tuple]."|".$value[$tuple]."|".$unit[$tuple]); + $nins_hist++; + } + if(!$nins_hist) { + Log3 $hash->{NAME}, 4, "DbLog $name -> $ceti of $ceti events inserted into table history".($usepkh?" using PK on columns $pkh":""); + } else { + if($usepkh) { + Log3 $hash->{NAME}, 3, "DbLog $name -> INFO - ".($ceti-$nins_hist)." of $ceti events inserted into table history due to PK on columns $pkh"; + } else { + Log3 $hash->{NAME}, 2, "DbLog $name -> WARNING - only ".($ceti-$nins_hist)." of $ceti events inserted into table history"; + } + } + eval {$dbh->commit() if(!$dbh->{AutoCommit});}; # Data commit + if ($@) { + Log3($name, 2, "DbLog $name -> Error commit history - $@"); + } else { + if(!$dbh->{AutoCommit}) { + Log3($name, 4, "DbLog $name -> insert table history committed"); + } else { + Log3($name, 4, "DbLog $name -> insert table history committed by autocommit"); + } + } + }; + + if ($@) { + $errorh = $@; + Log3 $hash->{NAME}, 2, "DbLog $name -> Error table history - $errorh"; + eval {$dbh->rollback() if(!$dbh->{AutoCommit});}; # issue Turning on AutoCommit failed + if ($@) { + Log3($name, 2, "DbLog $name -> Error rollback history - $@"); + } else { + Log3($name, 4, "DbLog $name -> insert history rolled back"); + } + } + } + + if (lc($DbLogType) =~ m(current)) { + ######################################## + # insert current mit/ohne primary key + if ($usepkc && $hash->{MODEL} eq 'MYSQL') { + eval { $sth_ic = $dbh->prepare("INSERT IGNORE INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; + } elsif ($usepkc && $hash->{MODEL} eq 'SQLITE') { + eval { $sth_ic = $dbh->prepare("INSERT OR IGNORE INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; + } elsif ($usepkc && $hash->{MODEL} eq 'POSTGRESQL') { + eval { $sth_ic = $dbh->prepare("INSERT INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?) ON CONFLICT DO NOTHING"); }; + } else { + # ohne PK + eval { $sth_ic = $dbh->prepare("INSERT INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; + } + if ($@) { + return $@; + } + + if ($usepkc && $hash->{MODEL} eq 'MYSQL') { + $sth_uc = $dbh->prepare("REPLACE INTO current (TIMESTAMP, TYPE, EVENT, VALUE, UNIT, DEVICE, READING) VALUES (?,?,?,?,?,?,?)"); + } elsif ($usepkc && $hash->{MODEL} eq 'SQLITE') { + $sth_uc = $dbh->prepare("INSERT OR REPLACE INTO current (TIMESTAMP, TYPE, EVENT, VALUE, UNIT, DEVICE, READING) VALUES (?,?,?,?,?,?,?)"); + } elsif ($usepkc && $hash->{MODEL} eq 'POSTGRESQL') { + $sth_uc = $dbh->prepare("INSERT INTO current (TIMESTAMP, TYPE, EVENT, VALUE, UNIT, DEVICE, READING) VALUES (?,?,?,?,?,?,?) ON CONFLICT ($pkc) + DO UPDATE SET TIMESTAMP=EXCLUDED.TIMESTAMP, DEVICE=EXCLUDED.DEVICE, TYPE=EXCLUDED.TYPE, EVENT=EXCLUDED.EVENT, READING=EXCLUDED.READING, + VALUE=EXCLUDED.VALUE, UNIT=EXCLUDED.UNIT"); + } else { + $sth_uc = $dbh->prepare("UPDATE current SET TIMESTAMP=?, TYPE=?, EVENT=?, VALUE=?, UNIT=? WHERE (DEVICE=?) AND (READING=?)"); + } + + if($tl) { + # Tracelevel setzen + $sth_uc->{TraceLevel} = "$tl|$tf"; + $sth_ic->{TraceLevel} = "$tl|$tf"; + } + + $sth_uc->bind_param_array(1, [@timestamp]); + $sth_uc->bind_param_array(2, [@type]); + $sth_uc->bind_param_array(3, [@event]); + $sth_uc->bind_param_array(4, [@value]); + $sth_uc->bind_param_array(5, [@unit]); + $sth_uc->bind_param_array(6, [@device]); + $sth_uc->bind_param_array(7, [@reading]); + + eval { $dbh->begin_work() if($useta && $dbh->{AutoCommit}); }; # Transaktion wenn gewünscht und autocommit ein + if ($@) { + Log3($name, 2, "DbLog $name -> Error start transaction for current - $@"); + } + eval { + ($tuples, $rows) = $sth_uc->execute_array( { ArrayTupleStatus => \my @tuple_status } ); + my $nupd_cur = 0; + for my $tuple (0..$#row_array) { + my $status = $tuple_status[$tuple]; + $status = 0 if($status eq "0E0"); + next if($status); # $status ist "1" wenn update ok + Log3 $hash->{NAME}, 4, "DbLog $name -> Failed to update in current, try to insert - TS: $timestamp[$tuple], Device: $device[$tuple], Reading: $reading[$tuple], Status = $status"; + push(@timestamp_cur, "$timestamp[$tuple]"); + push(@device_cur, "$device[$tuple]"); + push(@type_cur, "$type[$tuple]"); + push(@event_cur, "$event[$tuple]"); + push(@reading_cur, "$reading[$tuple]"); + push(@value_cur, "$value[$tuple]"); + push(@unit_cur, "$unit[$tuple]"); + $nupd_cur++; + } + if(!$nupd_cur) { + Log3 $hash->{NAME}, 4, "DbLog $name -> $ceti of $ceti events updated in table current".($usepkc?" using PK on columns $pkc":""); + } else { + Log3 $hash->{NAME}, 4, "DbLog $name -> $nupd_cur of $ceti events not updated and try to insert into table current".($usepkc?" using PK on columns $pkc":""); + $doins = 1; + } + + if ($doins) { + # events die nicht in Tabelle current updated wurden, werden in current neu eingefügt + $sth_ic->bind_param_array(1, [@timestamp_cur]); + $sth_ic->bind_param_array(2, [@device_cur]); + $sth_ic->bind_param_array(3, [@type_cur]); + $sth_ic->bind_param_array(4, [@event_cur]); + $sth_ic->bind_param_array(5, [@reading_cur]); + $sth_ic->bind_param_array(6, [@value_cur]); + $sth_ic->bind_param_array(7, [@unit_cur]); + + ($tuples, $rows) = $sth_ic->execute_array( { ArrayTupleStatus => \my @tuple_status } ); + my $nins_cur = 0; + for my $tuple (0..$#device_cur) { + my $status = $tuple_status[$tuple]; + $status = 0 if($status eq "0E0"); + next if($status); # $status ist "1" wenn insert ok + Log3 $hash->{NAME}, 3, "DbLog $name -> Insert into current rejected - TS: $timestamp[$tuple], Device: $device_cur[$tuple], Reading: $reading_cur[$tuple], Status = $status"; + $nins_cur++; + } + if(!$nins_cur) { + Log3 $hash->{NAME}, 4, "DbLog $name -> ".($#device_cur+1)." of ".($#device_cur+1)." events inserted into table current ".($usepkc?" using PK on columns $pkc":""); + } else { + Log3 $hash->{NAME}, 4, "DbLog $name -> ".($#device_cur+1-$nins_cur)." of ".($#device_cur+1)." events inserted into table current".($usepkc?" using PK on columns $pkc":""); + } + } + eval {$dbh->commit() if(!$dbh->{AutoCommit});}; # issue Turning on AutoCommit failed + if ($@) { + Log3($name, 2, "DbLog $name -> Error commit table current - $@"); + } else { + if(!$dbh->{AutoCommit}) { + Log3($name, 4, "DbLog $name -> insert / update table current committed"); + } else { + Log3($name, 4, "DbLog $name -> insert / update table current committed by autocommit"); + } + } + }; + } + } + + # SQL-Laufzeit ermitteln + my $rt = tv_interval($st); + + if(AttrVal($name, "showproctime", 0)) { + readingsBeginUpdate($hash); + readingsBulkUpdate($hash, "sql_processing_time", sprintf("%.4f",$rt)); + readingsEndUpdate($hash, 0); + } if ($errorh) { $error = $errorh; @@ -1858,7 +2065,7 @@ sub DbLog_execmemcache ($) { $error = "Commit already running - resync at NextSync"; } else { CancelDelayedShutdown($name) if($hash->{HELPER}{SHUTDOWNSEQ}); - Log3 ($name, 2, "DbLog $name - no data for Last database write cycle") if(delete $hash->{HELPER}{SHUTDOWNSEQ}); + Log3 ($name, 2, "DbLog $name - no data for last database write cycle") if(delete $hash->{HELPER}{SHUTDOWNSEQ}); } } @@ -1899,6 +2106,7 @@ sub DbLog_PushAsync(@) { my $supk = AttrVal($name, "noSupportPK", 0); my $tl = AttrVal($name, "traceLevel", 0); my $tf = AttrVal($name, "traceFlag", "SQL"); + my $bi = AttrVal($name, "bulkInsert", 0); my $utf8 = defined($hash->{UTF8})?$hash->{UTF8}:0; my $errorh = 0; my $error = 0; @@ -1913,14 +2121,16 @@ sub DbLog_PushAsync(@) { my $bst = [gettimeofday]; my ($useac,$useta) = DbLog_commitMode($hash); - if(!$useac) { - eval {$dbh = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, AutoCommit => 0, mysql_enable_utf8 => $utf8 });}; - } elsif($useac == 1) { - eval {$dbh = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, AutoCommit => 1, mysql_enable_utf8 => $utf8 });}; - } else { - # Server default - eval {$dbh = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, mysql_enable_utf8 => $utf8 });}; - } + eval { + if(!$useac) { + $dbh = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, AutoCommit => 0, mysql_enable_utf8 => $utf8 }); + } elsif($useac == 1) { + $dbh = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, AutoCommit => 1, mysql_enable_utf8 => $utf8 }); + } else { + # Server default + $dbh = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, mysql_enable_utf8 => $utf8 }); + } + }; if ($@) { $error = encode_base64($@,""); Log3 ($name, 2, "DbLog $name - Error: $@"); @@ -1928,9 +2138,15 @@ sub DbLog_PushAsync(@) { return "$name|$error|0|$rowlist"; } + if($tl) { + # Tracelevel setzen + $dbh->{TraceLevel} = "$tl|$tf"; + } + my $ac = ($dbh->{AutoCommit})?"ON":"OFF"; my $tm = ($useta)?"ON":"OFF"; Log3 $hash->{NAME}, 4, "DbLog $name -> AutoCommit mode: $ac, Transaction mode: $tm"; + Log3 $hash->{NAME}, 4, "DbLog $name -> Insert mode: ".($bi?"Bulk":"Array"); # check ob PK verwendet wird, @usepkx?Anzahl der Felder im PK:0 wenn kein PK, $pkx?Namen der Felder:none wenn kein PK my ($usepkh,$usepkc,$pkh,$pkc); @@ -1940,249 +2156,428 @@ sub DbLog_PushAsync(@) { Log3 $hash->{NAME}, 5, "DbLog $name -> Primary Key usage suppressed by attribute noSupportPK"; } - my $rowldec = decode_base64($rowlist); + my $rowldec = decode_base64($rowlist); my @row_array = split('§', $rowldec); + my $ceti = $#row_array+1; my (@timestamp,@device,@type,@event,@reading,@value,@unit); my (@timestamp_cur,@device_cur,@type_cur,@event_cur,@reading_cur,@value_cur,@unit_cur); - my ($sth_ih,$sth_ic,$sth_uc); + my ($st,$sth_ih,$sth_ic,$sth_uc,$sqlins); + my ($tuples, $rows); + no warnings 'uninitialized'; - - my $ceti = $#row_array+1; - foreach my $row (@row_array) { my @a = split("\\|",$row); s/_ESC_/\|/g for @a; # escaped Pipe return to "|" - push(@timestamp, "$a[0]"); - push(@device, "$a[1]"); - push(@type, "$a[2]"); - push(@event, "$a[3]"); - push(@reading, "$a[4]"); - push(@value, "$a[5]"); - push(@unit, "$a[6]"); - Log3 $hash->{NAME}, 5, "DbLog $name -> processing event Timestamp: $a[0], Device: $a[1], Type: $a[2], Event: $a[3], Reading: $a[4], Value: $a[5], Unit: $a[6]"; + push(@timestamp, "$a[0]"); + push(@device, "$a[1]"); + push(@type, "$a[2]"); + push(@event, "$a[3]"); + push(@reading, "$a[4]"); + push(@value, "$a[5]"); + push(@unit, "$a[6]"); + Log3 $hash->{NAME}, 5, "DbLog $name -> processing event Timestamp: $a[0], Device: $a[1], Type: $a[2], Event: $a[3], Reading: $a[4], Value: $a[5], Unit: $a[6]"; } - use warnings; + use warnings; - if (lc($DbLogType) =~ m(history)) { - # insert history mit/ohne primary key - if ($usepkh && $hash->{MODEL} eq 'MYSQL') { - eval { $sth_ih = $dbh->prepare("INSERT IGNORE INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; - } elsif ($usepkh && $hash->{MODEL} eq 'SQLITE') { - eval { $sth_ih = $dbh->prepare("INSERT OR IGNORE INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; - } elsif ($usepkh && $hash->{MODEL} eq 'POSTGRESQL') { - eval { $sth_ih = $dbh->prepare("INSERT INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?) ON CONFLICT DO NOTHING"); }; - } else { - # old behavior - eval { $sth_ih = $dbh->prepare("INSERT INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; - } - if ($@) { - # Eventliste zurückgeben wenn z.B. disk I/O error bei SQLITE - $error = encode_base64($@,""); - Log3 ($name, 2, "DbLog $name - Error: $@"); - Log3 ($name, 5, "DbLog $name -> DbLog_PushAsync finished"); - $dbh->disconnect(); - return "$name|$error|0|$rowlist"; - } - $sth_ih->bind_param_array(1, [@timestamp]); - $sth_ih->bind_param_array(2, [@device]); - $sth_ih->bind_param_array(3, [@type]); - $sth_ih->bind_param_array(4, [@event]); - $sth_ih->bind_param_array(5, [@reading]); - $sth_ih->bind_param_array(6, [@value]); - $sth_ih->bind_param_array(7, [@unit]); - } - - if (lc($DbLogType) =~ m(current) ) { - # insert current mit/ohne primary key, insert-values für current werden generiert - if ($usepkc && $hash->{MODEL} eq 'MYSQL') { - eval { $sth_ic = $dbh->prepare("INSERT IGNORE INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; - } elsif ($usepkc && $hash->{MODEL} eq 'SQLITE') { - eval { $sth_ic = $dbh->prepare("INSERT OR IGNORE INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; - } elsif ($usepkc && $hash->{MODEL} eq 'POSTGRESQL') { - eval { $sth_ic = $dbh->prepare("INSERT INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?) ON CONFLICT DO NOTHING"); }; - } else { - # old behavior - eval { $sth_ic = $dbh->prepare("INSERT INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; - } - if ($@) { - # Eventliste zurückgeben wenn z.B. Disk I/O error bei SQLITE - $error = encode_base64($@,""); - Log3 ($name, 2, "DbLog $name - Error: $@"); - Log3 ($name, 5, "DbLog $name -> DbLog_PushAsync finished"); - $dbh->disconnect(); - return "$name|$error|0|$rowlist"; - } - if ($usepkc && $hash->{MODEL} eq 'MYSQL') { - # update current (mit PK), insert-values für current wird generiert - $sth_uc = $dbh->prepare("REPLACE INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); - $sth_uc->bind_param_array(1, [@timestamp]); - $sth_uc->bind_param_array(2, [@device]); - $sth_uc->bind_param_array(3, [@type]); - $sth_uc->bind_param_array(4, [@event]); - $sth_uc->bind_param_array(5, [@reading]); - $sth_uc->bind_param_array(6, [@value]); - $sth_uc->bind_param_array(7, [@unit]); - } elsif ($usepkc && $hash->{MODEL} eq 'SQLITE') { - # update current (mit PK), insert-values für current wird generiert - $sth_uc = $dbh->prepare("INSERT OR REPLACE INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); - $sth_uc->bind_param_array(1, [@timestamp]); - $sth_uc->bind_param_array(2, [@device]); - $sth_uc->bind_param_array(3, [@type]); - $sth_uc->bind_param_array(4, [@event]); - $sth_uc->bind_param_array(5, [@reading]); - $sth_uc->bind_param_array(6, [@value]); - $sth_uc->bind_param_array(7, [@unit]); - } elsif ($usepkc && $hash->{MODEL} eq 'POSTGRESQL') { - # update current (mit PK), insert-values für current wird generiert - $sth_uc = $dbh->prepare("INSERT INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?) ON CONFLICT ($pkc) - DO UPDATE SET TIMESTAMP=EXCLUDED.TIMESTAMP, DEVICE=EXCLUDED.DEVICE, TYPE=EXCLUDED.TYPE, EVENT=EXCLUDED.EVENT, READING=EXCLUDED.READING, - VALUE=EXCLUDED.VALUE, UNIT=EXCLUDED.UNIT"); - $sth_uc->bind_param_array(1, [@timestamp]); - $sth_uc->bind_param_array(2, [@device]); - $sth_uc->bind_param_array(3, [@type]); - $sth_uc->bind_param_array(4, [@event]); - $sth_uc->bind_param_array(5, [@reading]); - $sth_uc->bind_param_array(6, [@value]); - $sth_uc->bind_param_array(7, [@unit]); - } else { - # update current (ohne PK), insert-values für current wird generiert - $sth_uc = $dbh->prepare("UPDATE current SET TIMESTAMP=?, TYPE=?, EVENT=?, VALUE=?, UNIT=? WHERE (DEVICE=?) AND (READING=?)"); - $sth_uc->bind_param_array(1, [@timestamp]); + if($bi) { + ####################### + # Bulk-Insert + ####################### + $st = [gettimeofday]; # SQL-Startzeit + + if (lc($DbLogType) =~ m(history)) { + ######################################## + # insert history mit/ohne primary key + if ($usepkh && $hash->{MODEL} eq 'MYSQL') { + $sqlins = "INSERT IGNORE INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES "; + } elsif ($usepkh && $hash->{MODEL} eq 'SQLITE') { + $sqlins = "INSERT OR IGNORE INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES "; + } elsif ($usepkh && $hash->{MODEL} eq 'POSTGRESQL') { + $sqlins = "INSERT INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES "; + } else { + # ohne PK + $sqlins = "INSERT INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES "; + } + no warnings 'uninitialized'; + foreach my $row (@row_array) { + my @a = split("\\|",$row); + s/_ESC_/\|/g for @a; # escaped Pipe return to "|" + Log3 $hash->{NAME}, 5, "DbLog $name -> processing event Timestamp: $a[0], Device: $a[1], Type: $a[2], Event: $a[3], Reading: $a[4], Value: $a[5], Unit: $a[6]"; + $a[3] =~ s/'/''/g; # escape ' with '' + $a[5] =~ s/'/''/g; # escape ' with '' + $a[6] =~ s/'/''/g; # escape ' with '' + $sqlins .= "('$a[0]','$a[1]','$a[2]','$a[3]','$a[4]','$a[5]','$a[6]'),"; + } + use warnings; + + chop($sqlins); + + if ($usepkh && $hash->{MODEL} eq 'POSTGRESQL') { + $sqlins .= " ON CONFLICT DO NOTHING"; + } + + eval { $dbh->begin_work() if($useta && $dbh->{AutoCommit}); }; # Transaktion wenn gewünscht und autocommit ein + if ($@) { + Log3($name, 2, "DbLog $name -> Error start transaction for history - $@"); + } + eval { $sth_ih = $dbh->prepare($sqlins); + if($tl) { + # Tracelevel setzen + $sth_ih->{TraceLevel} = "$tl|$tf"; + } + my $ins_hist = $sth_ih->execute(); + $ins_hist = 0 if($ins_hist eq "0E0"); + + if($ins_hist == $ceti) { + Log3 $hash->{NAME}, 4, "DbLog $name -> $ins_hist of $ceti events inserted into table history".($usepkh?" using PK on columns $pkh":""); + } else { + if($usepkh) { + Log3 $hash->{NAME}, 3, "DbLog $name -> INFO - ".$ins_hist." of $ceti events inserted into table history due to PK on columns $pkh"; + } else { + Log3 $hash->{NAME}, 2, "DbLog $name -> WARNING - only ".$ins_hist." of $ceti events inserted into table history"; + } + } + eval {$dbh->commit() if(!$dbh->{AutoCommit});}; # Data commit + if ($@) { + Log3($name, 2, "DbLog $name -> Error commit history - $@"); + } else { + if(!$dbh->{AutoCommit}) { + Log3($name, 4, "DbLog $name -> insert table history committed"); + } else { + Log3($name, 4, "DbLog $name -> insert table history committed by autocommit"); + } + } + }; + + if ($@) { + $errorh = $@; + Log3 $hash->{NAME}, 2, "DbLog $name -> Error table history - $errorh"; + $error = encode_base64($errorh,""); + $rowlback = $rowlist if($useta); # nicht gespeicherte Datensätze nur zurück geben wenn Transaktion ein + } + } + + if (lc($DbLogType) =~ m(current)) { + ################################################################# + # insert current mit/ohne primary key + # Array-Insert wird auch bei Bulk verwendet weil im Bulk-Mode + # die nicht upgedateten Sätze nicht identifiziert werden können + if ($usepkc && $hash->{MODEL} eq 'MYSQL') { + eval { $sth_ic = $dbh->prepare("INSERT IGNORE INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; + } elsif ($usepkc && $hash->{MODEL} eq 'SQLITE') { + eval { $sth_ic = $dbh->prepare("INSERT OR IGNORE INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; + } elsif ($usepkc && $hash->{MODEL} eq 'POSTGRESQL') { + eval { $sth_ic = $dbh->prepare("INSERT INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?) ON CONFLICT DO NOTHING"); }; + } else { + # ohne PK + eval { $sth_ic = $dbh->prepare("INSERT INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; + } + if ($@) { + $error = encode_base64($@,""); + Log3 ($name, 2, "DbLog $name - Error: $@"); + Log3 ($name, 5, "DbLog $name -> DbLog_PushAsync finished"); + $dbh->disconnect(); + return "$name|$error|0|"; + } + + if ($usepkc && $hash->{MODEL} eq 'MYSQL') { + $sth_uc = $dbh->prepare("REPLACE INTO current (TIMESTAMP, TYPE, EVENT, VALUE, UNIT, DEVICE, READING) VALUES (?,?,?,?,?,?,?)"); + } elsif ($usepkc && $hash->{MODEL} eq 'SQLITE') { + $sth_uc = $dbh->prepare("INSERT OR REPLACE INTO current (TIMESTAMP, TYPE, EVENT, VALUE, UNIT, DEVICE, READING) VALUES (?,?,?,?,?,?,?)"); + } elsif ($usepkc && $hash->{MODEL} eq 'POSTGRESQL') { + $sth_uc = $dbh->prepare("INSERT INTO current (TIMESTAMP, TYPE, EVENT, VALUE, UNIT, DEVICE, READING) VALUES (?,?,?,?,?,?,?) ON CONFLICT ($pkc) + DO UPDATE SET TIMESTAMP=EXCLUDED.TIMESTAMP, DEVICE=EXCLUDED.DEVICE, TYPE=EXCLUDED.TYPE, EVENT=EXCLUDED.EVENT, READING=EXCLUDED.READING, + VALUE=EXCLUDED.VALUE, UNIT=EXCLUDED.UNIT"); + } else { + $sth_uc = $dbh->prepare("UPDATE current SET TIMESTAMP=?, TYPE=?, EVENT=?, VALUE=?, UNIT=? WHERE (DEVICE=?) AND (READING=?)"); + } + + if($tl) { + # Tracelevel setzen + $sth_uc->{TraceLevel} = "$tl|$tf"; + $sth_ic->{TraceLevel} = "$tl|$tf"; + } + + $sth_uc->bind_param_array(1, [@timestamp]); $sth_uc->bind_param_array(2, [@type]); $sth_uc->bind_param_array(3, [@event]); $sth_uc->bind_param_array(4, [@value]); $sth_uc->bind_param_array(5, [@unit]); $sth_uc->bind_param_array(6, [@device]); $sth_uc->bind_param_array(7, [@reading]); - } - } - - if($tl) { - # Tracelevel setzen - $dbh->{TraceLevel} = "$tl|$tf"; - $sth_ih->{TraceLevel} = "$tl|$tf"; - } - - # SQL-Startzeit - my $st = [gettimeofday]; - - my ($tuples, $rows); - - # insert into history - eval { $dbh->begin_work() if($useta && $dbh->{AutoCommit}); }; # Transaktion wenn gewünscht und autocommit ein - if ($@) { - Log3($name, 2, "DbLog $name -> Error start transaction for history - $@"); - } - eval { - if (lc($DbLogType) =~ m(history) ) { - ($tuples, $rows) = $sth_ih->execute_array( { ArrayTupleStatus => \my @tuple_status } ); - my $nins_hist = 0; - my @n2hist; - for my $tuple (0..$#row_array) { - my $status = $tuple_status[$tuple]; - $status = 0 if($status eq "0E0"); - next if($status); # $status ist "1" wenn insert ok - Log3 $hash->{NAME}, 3, "DbLog $name -> Insert into history rejected".($usepkh?" (possible PK violation) ":" ")."- TS: $timestamp[$tuple], Device: $device[$tuple], Event: $event[$tuple]"; - my $nlh = ($timestamp[$tuple]."|".$device[$tuple]."|".$type[$tuple]."|".$event[$tuple]."|".$reading[$tuple]."|".$value[$tuple]."|".$unit[$tuple]); - push(@n2hist, "$nlh"); - $nins_hist++; - } - if(!$nins_hist) { - Log3 $hash->{NAME}, 4, "DbLog $name -> $ceti of $ceti events inserted into table history".($usepkh?" using PK on columns $pkh":""); - } else { - Log3 $hash->{NAME}, 4, "DbLog $name -> ".($ceti-$nins_hist)." of $ceti events inserted into table history".($usepkh?" using PK on columns $pkh":""); - s/\|/_ESC_/g for @n2hist; # escape Pipe "|" - $rowlist = join('§', @n2hist); - $rowlist = encode_base64($rowlist,""); - } - eval {$dbh->commit() if(!$dbh->{AutoCommit});}; # issue Turning on AutoCommit failed + + eval { $dbh->begin_work() if($useta && $dbh->{AutoCommit}); }; # Transaktion wenn gewünscht und autocommit ein if ($@) { - Log3($name, 2, "DbLog $name -> Error commit history - $@"); - } else { - if(!$dbh->{AutoCommit}) { - Log3($name, 4, "DbLog $name -> insert table history committed"); - } else { - Log3($name, 4, "DbLog $name -> insert table history committed by autocommit"); - } - } - } - }; - - if ($@) { - $errorh = $@; - Log3 $hash->{NAME}, 2, "DbLog $name -> Error table history - $errorh"; - $error = encode_base64($errorh,""); - $rowlback = $rowlist if($useta); # nicht gespeicherte Datensätze nur zurück geben wenn Transaktion ein - } - - # update or insert current - eval { $dbh->begin_work() if($useta && $dbh->{AutoCommit}); }; # Transaktion wenn gewünscht und autocommit ein - if ($@) { - Log3($name, 2, "DbLog $name -> Error start transaction for current - $@"); - } - eval { - if (lc($DbLogType) =~ m(current) ) { - ($tuples, $rows) = $sth_uc->execute_array( { ArrayTupleStatus => \my @tuple_status } ); - my $nupd_cur = 0; - for my $tuple (0..$#row_array) { - my $status = $tuple_status[$tuple]; - $status = 0 if($status eq "0E0"); - next if($status); # $status ist "1" wenn update ok - Log3 $hash->{NAME}, 4, "DbLog $name -> Failed to update in current, try to insert - TS: $timestamp[$tuple], Device: $device[$tuple], Reading: $reading[$tuple], Status = $status"; - push(@timestamp_cur, "$timestamp[$tuple]"); - push(@device_cur, "$device[$tuple]"); - push(@type_cur, "$type[$tuple]"); - push(@event_cur, "$event[$tuple]"); - push(@reading_cur, "$reading[$tuple]"); - push(@value_cur, "$value[$tuple]"); - push(@unit_cur, "$unit[$tuple]"); - $nupd_cur++; - } - if(!$nupd_cur) { - Log3 $hash->{NAME}, 4, "DbLog $name -> $ceti of $ceti events updated in table current".($usepkc?" using PK on columns $pkc":""); - } else { - Log3 $hash->{NAME}, 4, "DbLog $name -> $nupd_cur of $ceti events not updated and try to insert into table current".($usepkc?" using PK on columns $pkc":""); - $doins = 1; - } - - if ($doins) { - # events die nicht in Tabelle current updated wurden, werden in current neu eingefügt - $sth_ic->bind_param_array(1, [@timestamp_cur]); - $sth_ic->bind_param_array(2, [@device_cur]); - $sth_ic->bind_param_array(3, [@type_cur]); - $sth_ic->bind_param_array(4, [@event_cur]); - $sth_ic->bind_param_array(5, [@reading_cur]); - $sth_ic->bind_param_array(6, [@value_cur]); - $sth_ic->bind_param_array(7, [@unit_cur]); - - ($tuples, $rows) = $sth_ic->execute_array( { ArrayTupleStatus => \my @tuple_status } ); - my $nins_cur = 0; - for my $tuple (0..$#device_cur) { - my $status = $tuple_status[$tuple]; - $status = 0 if($status eq "0E0"); - next if($status); # $status ist "1" wenn insert ok - Log3 $hash->{NAME}, 3, "DbLog $name -> Insert into current rejected - TS: $timestamp[$tuple], Device: $device_cur[$tuple], Reading: $reading_cur[$tuple], Status = $status"; - $nins_cur++; - } - if(!$nins_cur) { - Log3 $hash->{NAME}, 4, "DbLog $name -> ".($#device_cur+1)." of ".($#device_cur+1)." events inserted into table current ".($usepkc?" using PK on columns $pkc":""); - } else { - Log3 $hash->{NAME}, 4, "DbLog $name -> ".($#device_cur+1-$nins_cur)." of ".($#device_cur+1)." events inserted into table current".($usepkc?" using PK on columns $pkc":""); - } + Log3($name, 2, "DbLog $name -> Error start transaction for current - $@"); } - eval {$dbh->commit() if(!$dbh->{AutoCommit});}; # issue Turning on AutoCommit failed - if ($@) { - Log3($name, 2, "DbLog $name -> Error commit table current - $@"); + eval { + ($tuples, $rows) = $sth_uc->execute_array( { ArrayTupleStatus => \my @tuple_status } ); + my $nupd_cur = 0; + for my $tuple (0..$#row_array) { + my $status = $tuple_status[$tuple]; + $status = 0 if($status eq "0E0"); + next if($status); # $status ist "1" wenn update ok + Log3 $hash->{NAME}, 4, "DbLog $name -> Failed to update in current, try to insert - TS: $timestamp[$tuple], Device: $device[$tuple], Reading: $reading[$tuple], Status = $status"; + push(@timestamp_cur, "$timestamp[$tuple]"); + push(@device_cur, "$device[$tuple]"); + push(@type_cur, "$type[$tuple]"); + push(@event_cur, "$event[$tuple]"); + push(@reading_cur, "$reading[$tuple]"); + push(@value_cur, "$value[$tuple]"); + push(@unit_cur, "$unit[$tuple]"); + $nupd_cur++; + } + if(!$nupd_cur) { + Log3 $hash->{NAME}, 4, "DbLog $name -> $ceti of $ceti events updated in table current".($usepkc?" using PK on columns $pkc":""); + } else { + Log3 $hash->{NAME}, 4, "DbLog $name -> $nupd_cur of $ceti events not updated and try to insert into table current".($usepkc?" using PK on columns $pkc":""); + $doins = 1; + } + + if ($doins) { + # events die nicht in Tabelle current updated wurden, werden in current neu eingefügt + $sth_ic->bind_param_array(1, [@timestamp_cur]); + $sth_ic->bind_param_array(2, [@device_cur]); + $sth_ic->bind_param_array(3, [@type_cur]); + $sth_ic->bind_param_array(4, [@event_cur]); + $sth_ic->bind_param_array(5, [@reading_cur]); + $sth_ic->bind_param_array(6, [@value_cur]); + $sth_ic->bind_param_array(7, [@unit_cur]); + + ($tuples, $rows) = $sth_ic->execute_array( { ArrayTupleStatus => \my @tuple_status } ); + my $nins_cur = 0; + for my $tuple (0..$#device_cur) { + my $status = $tuple_status[$tuple]; + $status = 0 if($status eq "0E0"); + next if($status); # $status ist "1" wenn insert ok + Log3 $hash->{NAME}, 3, "DbLog $name -> Insert into current rejected - TS: $timestamp[$tuple], Device: $device_cur[$tuple], Reading: $reading_cur[$tuple], Status = $status"; + $nins_cur++; + } + if(!$nins_cur) { + Log3 $hash->{NAME}, 4, "DbLog $name -> ".($#device_cur+1)." of ".($#device_cur+1)." events inserted into table current ".($usepkc?" using PK on columns $pkc":""); + } else { + Log3 $hash->{NAME}, 4, "DbLog $name -> ".($#device_cur+1-$nins_cur)." of ".($#device_cur+1)." events inserted into table current".($usepkc?" using PK on columns $pkc":""); + } + } + eval {$dbh->commit() if(!$dbh->{AutoCommit});}; # issue Turning on AutoCommit failed + if ($@) { + Log3($name, 2, "DbLog $name -> Error commit table current - $@"); + } else { + if(!$dbh->{AutoCommit}) { + Log3($name, 4, "DbLog $name -> insert / update table current committed"); + } else { + Log3($name, 4, "DbLog $name -> insert / update table current committed by autocommit"); + } + } + }; + } + + } else { + ####################### + # Array-Insert + ####################### + + $st = [gettimeofday]; # SQL-Startzeit + + if (lc($DbLogType) =~ m(history)) { + ######################################## + # insert history mit/ohne primary key + if ($usepkh && $hash->{MODEL} eq 'MYSQL') { + eval { $sth_ih = $dbh->prepare("INSERT IGNORE INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; + } elsif ($usepkh && $hash->{MODEL} eq 'SQLITE') { + eval { $sth_ih = $dbh->prepare("INSERT OR IGNORE INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; + } elsif ($usepkh && $hash->{MODEL} eq 'POSTGRESQL') { + eval { $sth_ih = $dbh->prepare("INSERT INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?) ON CONFLICT DO NOTHING"); }; } else { - if(!$dbh->{AutoCommit}) { - Log3($name, 4, "DbLog $name -> insert / update table current committed"); - } else { - Log3($name, 4, "DbLog $name -> insert / update table current committed by autocommit"); - } - } - } - }; + # ohne PK + eval { $sth_ih = $dbh->prepare("INSERT INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; + } + if ($@) { + # Eventliste zurückgeben wenn z.B. Disk I/O Error bei SQLITE + $error = encode_base64($@,""); + Log3 ($name, 2, "DbLog $name - Error: $@"); + Log3 ($name, 5, "DbLog $name -> DbLog_PushAsync finished"); + $dbh->disconnect(); + return "$name|$error|0|$rowlist"; + } + + if($tl) { + # Tracelevel setzen + $sth_ih->{TraceLevel} = "$tl|$tf"; + } + + $sth_ih->bind_param_array(1, [@timestamp]); + $sth_ih->bind_param_array(2, [@device]); + $sth_ih->bind_param_array(3, [@type]); + $sth_ih->bind_param_array(4, [@event]); + $sth_ih->bind_param_array(5, [@reading]); + $sth_ih->bind_param_array(6, [@value]); + $sth_ih->bind_param_array(7, [@unit]); + + eval { $dbh->begin_work() if($useta && $dbh->{AutoCommit}); }; # Transaktion wenn gewünscht und autocommit ein + if ($@) { + Log3($name, 2, "DbLog $name -> Error start transaction for history - $@"); + } + eval { + ($tuples, $rows) = $sth_ih->execute_array( { ArrayTupleStatus => \my @tuple_status } ); + my $nins_hist = 0; + my @n2hist; + for my $tuple (0..$#row_array) { + my $status = $tuple_status[$tuple]; + $status = 0 if($status eq "0E0"); + next if($status); # $status ist "1" wenn insert ok + Log3 $hash->{NAME}, 3, "DbLog $name -> Insert into history rejected".($usepkh?" (possible PK violation) ":" ")."- TS: $timestamp[$tuple], Device: $device[$tuple], Event: $event[$tuple]"; + my $nlh = ($timestamp[$tuple]."|".$device[$tuple]."|".$type[$tuple]."|".$event[$tuple]."|".$reading[$tuple]."|".$value[$tuple]."|".$unit[$tuple]); + push(@n2hist, "$nlh"); + $nins_hist++; + } + if(!$nins_hist) { + Log3 $hash->{NAME}, 4, "DbLog $name -> $ceti of $ceti events inserted into table history".($usepkh?" using PK on columns $pkh":""); + } else { + if($usepkh) { + Log3 $hash->{NAME}, 3, "DbLog $name -> INFO - ".($ceti-$nins_hist)." of $ceti events inserted into table history due to PK on columns $pkh"; + } else { + Log3 $hash->{NAME}, 2, "DbLog $name -> WARNING - only ".($ceti-$nins_hist)." of $ceti events inserted into table history"; + } + s/\|/_ESC_/g for @n2hist; # escape Pipe "|" + $rowlist = join('§', @n2hist); + $rowlist = encode_base64($rowlist,""); + } + eval {$dbh->commit() if(!$dbh->{AutoCommit});}; # Data commit + if ($@) { + Log3($name, 2, "DbLog $name -> Error commit history - $@"); + } else { + if(!$dbh->{AutoCommit}) { + Log3($name, 4, "DbLog $name -> insert table history committed"); + } else { + Log3($name, 4, "DbLog $name -> insert table history committed by autocommit"); + } + } + }; + + if ($@) { + $errorh = $@; + Log3 $hash->{NAME}, 2, "DbLog $name -> Error table history - $errorh"; + $error = encode_base64($errorh,""); + $rowlback = $rowlist if($useta); # nicht gespeicherte Datensätze nur zurück geben wenn Transaktion ein + } + } + + if (lc($DbLogType) =~ m(current)) { + ######################################## + # insert current mit/ohne primary key + if ($usepkc && $hash->{MODEL} eq 'MYSQL') { + eval { $sth_ic = $dbh->prepare("INSERT IGNORE INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; + } elsif ($usepkc && $hash->{MODEL} eq 'SQLITE') { + eval { $sth_ic = $dbh->prepare("INSERT OR IGNORE INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; + } elsif ($usepkc && $hash->{MODEL} eq 'POSTGRESQL') { + eval { $sth_ic = $dbh->prepare("INSERT INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?) ON CONFLICT DO NOTHING"); }; + } else { + # ohne PK + eval { $sth_ic = $dbh->prepare("INSERT INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; + } + if ($@) { + # Eventliste zurückgeben wenn z.B. Disk I/O error bei SQLITE + $error = encode_base64($@,""); + Log3 ($name, 2, "DbLog $name - Error: $@"); + Log3 ($name, 5, "DbLog $name -> DbLog_PushAsync finished"); + $dbh->disconnect(); + return "$name|$error|0|$rowlist"; + } + + if ($usepkc && $hash->{MODEL} eq 'MYSQL') { + $sth_uc = $dbh->prepare("REPLACE INTO current (TIMESTAMP, TYPE, EVENT, VALUE, UNIT, DEVICE, READING) VALUES (?,?,?,?,?,?,?)"); + } elsif ($usepkc && $hash->{MODEL} eq 'SQLITE') { + $sth_uc = $dbh->prepare("INSERT OR REPLACE INTO current (TIMESTAMP, TYPE, EVENT, VALUE, UNIT, DEVICE, READING) VALUES (?,?,?,?,?,?,?)"); + } elsif ($usepkc && $hash->{MODEL} eq 'POSTGRESQL') { + $sth_uc = $dbh->prepare("INSERT INTO current (TIMESTAMP, TYPE, EVENT, VALUE, UNIT, DEVICE, READING) VALUES (?,?,?,?,?,?,?) ON CONFLICT ($pkc) + DO UPDATE SET TIMESTAMP=EXCLUDED.TIMESTAMP, DEVICE=EXCLUDED.DEVICE, TYPE=EXCLUDED.TYPE, EVENT=EXCLUDED.EVENT, READING=EXCLUDED.READING, + VALUE=EXCLUDED.VALUE, UNIT=EXCLUDED.UNIT"); + } else { + $sth_uc = $dbh->prepare("UPDATE current SET TIMESTAMP=?, TYPE=?, EVENT=?, VALUE=?, UNIT=? WHERE (DEVICE=?) AND (READING=?)"); + } + + if($tl) { + # Tracelevel setzen + $sth_uc->{TraceLevel} = "$tl|$tf"; + $sth_ic->{TraceLevel} = "$tl|$tf"; + } + + $sth_uc->bind_param_array(1, [@timestamp]); + $sth_uc->bind_param_array(2, [@type]); + $sth_uc->bind_param_array(3, [@event]); + $sth_uc->bind_param_array(4, [@value]); + $sth_uc->bind_param_array(5, [@unit]); + $sth_uc->bind_param_array(6, [@device]); + $sth_uc->bind_param_array(7, [@reading]); + + eval { $dbh->begin_work() if($useta && $dbh->{AutoCommit}); }; # Transaktion wenn gewünscht und autocommit ein + if ($@) { + Log3($name, 2, "DbLog $name -> Error start transaction for current - $@"); + } + eval { + ($tuples, $rows) = $sth_uc->execute_array( { ArrayTupleStatus => \my @tuple_status } ); + my $nupd_cur = 0; + for my $tuple (0..$#row_array) { + my $status = $tuple_status[$tuple]; + $status = 0 if($status eq "0E0"); + next if($status); # $status ist "1" wenn update ok + Log3 $hash->{NAME}, 4, "DbLog $name -> Failed to update in current, try to insert - TS: $timestamp[$tuple], Device: $device[$tuple], Reading: $reading[$tuple], Status = $status"; + push(@timestamp_cur, "$timestamp[$tuple]"); + push(@device_cur, "$device[$tuple]"); + push(@type_cur, "$type[$tuple]"); + push(@event_cur, "$event[$tuple]"); + push(@reading_cur, "$reading[$tuple]"); + push(@value_cur, "$value[$tuple]"); + push(@unit_cur, "$unit[$tuple]"); + $nupd_cur++; + } + if(!$nupd_cur) { + Log3 $hash->{NAME}, 4, "DbLog $name -> $ceti of $ceti events updated in table current".($usepkc?" using PK on columns $pkc":""); + } else { + Log3 $hash->{NAME}, 4, "DbLog $name -> $nupd_cur of $ceti events not updated and try to insert into table current".($usepkc?" using PK on columns $pkc":""); + $doins = 1; + } + + if ($doins) { + # events die nicht in Tabelle current updated wurden, werden in current neu eingefügt + $sth_ic->bind_param_array(1, [@timestamp_cur]); + $sth_ic->bind_param_array(2, [@device_cur]); + $sth_ic->bind_param_array(3, [@type_cur]); + $sth_ic->bind_param_array(4, [@event_cur]); + $sth_ic->bind_param_array(5, [@reading_cur]); + $sth_ic->bind_param_array(6, [@value_cur]); + $sth_ic->bind_param_array(7, [@unit_cur]); + + ($tuples, $rows) = $sth_ic->execute_array( { ArrayTupleStatus => \my @tuple_status } ); + my $nins_cur = 0; + for my $tuple (0..$#device_cur) { + my $status = $tuple_status[$tuple]; + $status = 0 if($status eq "0E0"); + next if($status); # $status ist "1" wenn insert ok + Log3 $hash->{NAME}, 3, "DbLog $name -> Insert into current rejected - TS: $timestamp[$tuple], Device: $device_cur[$tuple], Reading: $reading_cur[$tuple], Status = $status"; + $nins_cur++; + } + if(!$nins_cur) { + Log3 $hash->{NAME}, 4, "DbLog $name -> ".($#device_cur+1)." of ".($#device_cur+1)." events inserted into table current ".($usepkc?" using PK on columns $pkc":""); + } else { + Log3 $hash->{NAME}, 4, "DbLog $name -> ".($#device_cur+1-$nins_cur)." of ".($#device_cur+1)." events inserted into table current".($usepkc?" using PK on columns $pkc":""); + } + } + eval {$dbh->commit() if(!$dbh->{AutoCommit});}; # issue Turning on AutoCommit failed + if ($@) { + Log3($name, 2, "DbLog $name -> Error commit table current - $@"); + } else { + if(!$dbh->{AutoCommit}) { + Log3($name, 4, "DbLog $name -> insert / update table current committed"); + } else { + Log3($name, 4, "DbLog $name -> insert / update table current committed by autocommit"); + } + } + }; + } + } $dbh->disconnect(); @@ -2384,14 +2779,16 @@ sub DbLog_ConnectPush($;$$) { Log3 $hash->{NAME}, 3, "DbLog $name - Creating Push-Handle to database $dbconn with user $dbuser" if(!$get); my ($useac,$useta) = DbLog_commitMode($hash); - if(!$useac) { - eval {$dbhp = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, AutoCommit => 0, mysql_enable_utf8 => $utf8 });}; - } elsif($useac == 1) { - eval {$dbhp = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, AutoCommit => 1, mysql_enable_utf8 => $utf8 });}; - } else { - # Server default - eval {$dbhp = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, mysql_enable_utf8 => $utf8 });}; - } + eval { + if(!$useac) { + $dbhp = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, AutoCommit => 0, mysql_enable_utf8 => $utf8 }); + } elsif($useac == 1) { + $dbhp = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, AutoCommit => 1, mysql_enable_utf8 => $utf8 }); + } else { + # Server default + $dbhp = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, mysql_enable_utf8 => $utf8 }); + } + }; if($@) { $err = $@; @@ -2449,14 +2846,16 @@ sub DbLog_ConnectNewDBH($) { my $dbh; my ($useac,$useta) = DbLog_commitMode($hash); - if(!$useac) { - eval {$dbh = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, AutoCommit => 0, mysql_enable_utf8 => $utf8 });}; - } elsif($useac == 1) { - eval {$dbh = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, AutoCommit => 1, mysql_enable_utf8 => $utf8 });}; - } else { - # Server default - eval {$dbh = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, mysql_enable_utf8 => $utf8 });}; - } + eval { + if(!$useac) { + $dbh = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, AutoCommit => 0, mysql_enable_utf8 => $utf8 }); + } elsif($useac == 1) { + $dbh = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, AutoCommit => 1, mysql_enable_utf8 => $utf8 }); + } else { + # Server default + $dbh = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, mysql_enable_utf8 => $utf8 }); + } + }; if($@) { Log3($name, 2, "DbLog $name - $@"); @@ -2551,7 +2950,7 @@ sub DbLog_Get($@) { " see the #DbLog entries in the .gplot files\n" . " is not used, only for compatibility for FileLog, please use - \n" . " is a prefix, - means stdout\n" - if(int(@a) < 5); + if(int(@a) < 5); shift @a; my $inf = lc(shift @a); my $outf = lc(shift @a); @@ -2571,7 +2970,7 @@ sub DbLog_Get($@) { if($outf eq "int") { $outf = "-"; $internal = 1; - } elsif($outf eq "array"){ + } elsif($outf eq "array") { } elsif(lc($outf) eq "webchart") { # redirect the get request to the DbLog_chartQuery function @@ -2618,12 +3017,12 @@ sub DbLog_Get($@) { $readings[$i][1] = "%" if(!$readings[$i][1] || length($readings[$i][1])==0); #falls Reading nicht gefuellt setze Joker } - + Log3 $name, 4, "DbLog $name -> ################################################################"; Log3 $name, 4, "DbLog $name -> ### new get data for SVG ###"; Log3 $name, 4, "DbLog $name -> ################################################################"; Log3($name, 4, "DbLog $name -> main PID: $hash->{PID}, secondary PID: $$"); - + $dbh = $hash->{DBHP}; if ( !$dbh || not $dbh->ping ) { # DB Session dead, try to reopen now ! @@ -2783,10 +3182,10 @@ sub DbLog_Get($@) { $retval .= "Timestamp: Device, Type, Event, Reading, Value, Unit\n"; $retval .= "=====================================================\n"; } - + ################################ # Select Auswertung - ################################ + ################################ while($sth->fetch()) { ############ Auswerten des 5. Parameters: Regexp ################### @@ -5892,6 +6291,23 @@ return;
+
    + +
  • bulkInsert +
      + attr <device> bulkInsert [1|0] +
      + + Toggles the Insert mode between Array (default) and Bulk. This Bulk insert mode increase the write performance + into the history table significant in case of plenty of data to insert, especially if asynchronous mode is + used. + To get the whole improved performance, the attribute "DbLogType" should not contain the current table + in this use case.
      +
    +
  • +
+
+
  • commitMode @@ -7146,7 +7562,23 @@ return; Im asynchronen Mode werden die Daten nicht blockierend mit einem separaten Hintergrundprozess in die Datenbank geschrieben. Det Timeout-Wert für diesen Hintergrundprozess kann mit dem Attribut "timeout" (Default 86400s) eingestellt werden. Im synchronen Modus (Normalmodus) werden die Events nicht gecacht und sofort in die Datenbank geschrieben. Ist die Datenbank nicht - verfügbar gehen sie verloren.
    + verfügbar, gehen sie verloren.
    +
+ + +
+ +
    + +
  • bulkInsert +
      + attr <device> bulkInsert [1|0] +
      + + Schaltet den Insert-Modus zwischen "Array" (default) und "Bulk" um. Der Bulk Modus führt beim Insert von sehr + vielen Datensätzen in die history-Tabelle zu einer erheblichen Performancesteigerung vor allem im asynchronen + Mode. Um die volle Performancesteigerung zu erhalten, sollte in diesem Fall das Attribut "DbLogType" + nicht die current-Tabelle enthalten.
@@ -7762,18 +8194,18 @@ return; "perl": 5.014, "Data::Dumper": 0, "DBI": 0, - "DBD::mysql" :0, - "DBD::SQLite" :0, "Blocking": 0, "Time::HiRes": 0, "Time::Local": 0, "Encode": 0 }, "recommends": { - "FHEM::Meta": 0, - "DBD::Pg" :0 + "FHEM::Meta": 0 }, "suggests": { + "DBD::Pg" :0, + "DBD::mysql" :0, + "DBD::SQLite" :0 } } }, @@ -7781,6 +8213,16 @@ return; "x_wiki": { "web": "https://wiki.fhem.de/wiki/DbLog", "title": "DbLog" + }, + "repository": { + "x_dev": { + "type": "svn", + "url": "https://svn.fhem.de/trac/browser/trunk/fhem/contrib/DS_Starter", + "web": "https://svn.fhem.de/trac/browser/trunk/fhem/contrib/DS_Starter/93_DbLog.pm", + "x_branch": "dev", + "x_filepath": "fhem/contrib/", + "x_raw": "https://svn.fhem.de/fhem/trunk/fhem/contrib/DS_Starter/93_DbLog.pm" + } } } }