diff --git a/fhem/CHANGED b/fhem/CHANGED index 510b7e7b1..2d43dc312 100644 --- a/fhem/CHANGED +++ b/fhem/CHANGED @@ -1,5 +1,7 @@ # Add changes at the top of the list. Keep it in ASCII, and 80-char wide. # Do not insert empty lines here, update check depends on it. + - feature: 93_DbLog: V4.1.0 from contrib, write function rewritten, new attr + bulkInsert, fix reconnect behavior of MySQL-Forum: #99719 - change: 74_GardenaSmartDevice: change output for illigal year - bugfix: 98_Text2Speech: now working without mp3wrap too thanks to Mumpitzstuff diff --git a/fhem/FHEM/93_DbLog.pm b/fhem/FHEM/93_DbLog.pm index 24553c54c..187cf1154 100644 --- a/fhem/FHEM/93_DbLog.pm +++ b/fhem/FHEM/93_DbLog.pm @@ -28,6 +28,8 @@ no if $] >= 5.017011, warnings => 'experimental::smartmatch'; # Version History intern by DS_Starter: our %DbLog_vNotesIntern = ( + "4.1.0" => "17.04.2019 DbLog_Get: change reconnect for MySQL (Forum: #99719), change index suggestion in DbLog_configcheck ", + "4.0.0" => "14.04.2019 rewrite DbLog_PushAsync / DbLog_Push / DbLog_Connectxx, new attribute \"bulkInsert\" ", "3.14.1" => "12.04.2019 DbLog_Get: change select of MySQL Forum: https://forum.fhem.de/index.php/topic,99280.0.html ", "3.14.0" => "05.04.2019 add support for Meta.pm and X_DelayedShutdownFn, attribute shutdownWait removed, ". "direct attribute help in FHEMWEB ", @@ -225,6 +227,7 @@ sub DbLog_Initialize($) $hash->{SVG_regexpFn} = "DbLog_regexpFn"; $hash->{DelayedShutdownFn} = "DbLog_DelayedShutdown"; $hash->{AttrList} = "addStateEvent:0,1 ". + "bulkInsert:1,0 ". "commitMode:basic_ta:on,basic_ta:off,ac:on_ta:on,ac:on_ta:off,ac:off_ta:on ". "colEvent ". "colReading ". @@ -349,6 +352,7 @@ sub DbLog_DelayedShutdown($) { return 0 if(IsDisabled($name)); $hash->{HELPER}{SHUTDOWNSEQ} = 1; + # return 0 if(!$async && !$hash->{HELPER}{PUSHISRUNNING}); Log3($name, 2, "DbLog $name - Last database write cycle due to shutdown ..."); DbLog_execmemcache($hash); @@ -570,7 +574,10 @@ sub DbLog_Set($@) { } elsif ($a[1] eq 'reopen') { if ($dbh) { - $dbh->commit() if(!$dbh->{AutoCommit}); + eval {$dbh->commit() if(!$dbh->{AutoCommit});}; + if ($@) { + Log3($name, 2, "DbLog $name -> Error commit history - $@"); + } $dbh->disconnect(); } if (!$a[2]) { @@ -1452,7 +1459,7 @@ sub DbLog_Log($$) { $net = tv_interval($nst); } else { CancelDelayedShutdown($name) if($hash->{HELPER}{SHUTDOWNSEQ}); - Log3 ($name, 2, "DbLog $name - no data for Last database write cycle") if(delete $hash->{HELPER}{SHUTDOWNSEQ}); + Log3 ($name, 2, "DbLog $name - no data for last database write cycle") if(delete $hash->{HELPER}{SHUTDOWNSEQ}); } } if($net && AttrVal($name, "showNotifyTime", undef)) { @@ -1471,6 +1478,7 @@ sub DbLog_Push(@) { my $supk = AttrVal($name, "noSupportPK", 0); my $tl = AttrVal($name, "traceLevel", 0); my $tf = AttrVal($name, "traceFlag", "SQL"); + my $bi = AttrVal($name, "bulkInsert", 0); my $errorh = 0; my $error = 0; my $doins = 0; # Hilfsvariable, wenn "1" sollen inserts in Tabelle current erfolgen (updates schlugen fehl) @@ -1501,6 +1509,11 @@ sub DbLog_Push(@) { $dbh->{RaiseError} = 1; $dbh->{PrintError} = 0; + if($tl) { + # Tracelevel setzen + $dbh->{TraceLevel} = "$tl|$tf"; + } + my ($useac,$useta) = DbLog_commitMode($hash); my $ac = ($dbh->{AutoCommit})?"ON":"OFF"; my $tm = ($useta)?"ON":"OFF"; @@ -1510,6 +1523,7 @@ sub DbLog_Push(@) { Log3 $name, 4, "DbLog $name -> ################################################################"; Log3 $name, 4, "DbLog $name -> DbLogType is: $DbLogType"; Log3 $name, 4, "DbLog $name -> AutoCommit mode: $ac, Transaction mode: $tm"; + Log3 $name, 4, "DbLog $name -> Insert mode: ".($bi?"Bulk":"Array"); # check ob PK verwendet wird, @usepkx?Anzahl der Felder im PK:0 wenn kein PK, $pkx?Namen der Felder:none wenn kein PK my ($usepkh,$usepkc,$pkh,$pkc); @@ -1521,7 +1535,9 @@ sub DbLog_Push(@) { my (@timestamp,@device,@type,@event,@reading,@value,@unit); my (@timestamp_cur,@device_cur,@type_cur,@event_cur,@reading_cur,@value_cur,@unit_cur); - my ($sth_ih,$sth_ic,$sth_uc); + my ($st,$sth_ih,$sth_ic,$sth_uc,$sqlins); + my ($tuples, $rows); + no warnings 'uninitialized'; my $ceti = $#row_array+1; @@ -1540,212 +1556,404 @@ sub DbLog_Push(@) { if($vb4show); } use warnings; - - if (lc($DbLogType) =~ m(history)) { - # insert history mit/ohne primary key - if ($usepkh && $hash->{MODEL} eq 'MYSQL') { - eval { $sth_ih = $dbh->prepare("INSERT IGNORE INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; - } elsif ($usepkh && $hash->{MODEL} eq 'SQLITE') { - eval { $sth_ih = $dbh->prepare("INSERT OR IGNORE INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; - } elsif ($usepkh && $hash->{MODEL} eq 'POSTGRESQL') { - eval { $sth_ih = $dbh->prepare("INSERT INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?) ON CONFLICT DO NOTHING"); }; - } else { - # old behavior - eval { $sth_ih = $dbh->prepare("INSERT INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; - } - if ($@) { - return $@; - } - $sth_ih->bind_param_array(1, [@timestamp]); - $sth_ih->bind_param_array(2, [@device]); - $sth_ih->bind_param_array(3, [@type]); - $sth_ih->bind_param_array(4, [@event]); - $sth_ih->bind_param_array(5, [@reading]); - $sth_ih->bind_param_array(6, [@value]); - $sth_ih->bind_param_array(7, [@unit]); - } - - if (lc($DbLogType) =~ m(current) ) { - # insert current mit/ohne primary key, insert-values für current werden generiert - if ($usepkc && $hash->{MODEL} eq 'MYSQL') { - eval { $sth_ic = $dbh->prepare("INSERT IGNORE INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; - } elsif ($usepkc && $hash->{MODEL} eq 'SQLITE') { - eval { $sth_ic = $dbh->prepare("INSERT OR IGNORE INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; - } elsif ($usepkc && $hash->{MODEL} eq 'POSTGRESQL') { - eval { $sth_ic = $dbh->prepare("INSERT INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?) ON CONFLICT DO NOTHING"); }; - } else { - # old behavior - eval { $sth_ic = $dbh->prepare("INSERT INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; - } - if ($@) { - return $@; - } - if ($usepkc && $hash->{MODEL} eq 'MYSQL') { - # update current (mit PK), insert-values für current wird generiert - $sth_uc = $dbh->prepare("REPLACE INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); - $sth_uc->bind_param_array(1, [@timestamp]); - $sth_uc->bind_param_array(2, [@device]); - $sth_uc->bind_param_array(3, [@type]); - $sth_uc->bind_param_array(4, [@event]); - $sth_uc->bind_param_array(5, [@reading]); - $sth_uc->bind_param_array(6, [@value]); - $sth_uc->bind_param_array(7, [@unit]); - } elsif ($usepkc && $hash->{MODEL} eq 'SQLITE') { - # update current (mit PK), insert-values für current wird generiert - $sth_uc = $dbh->prepare("INSERT OR REPLACE INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); - $sth_uc->bind_param_array(1, [@timestamp]); - $sth_uc->bind_param_array(2, [@device]); - $sth_uc->bind_param_array(3, [@type]); - $sth_uc->bind_param_array(4, [@event]); - $sth_uc->bind_param_array(5, [@reading]); - $sth_uc->bind_param_array(6, [@value]); - $sth_uc->bind_param_array(7, [@unit]); - } elsif ($usepkc && $hash->{MODEL} eq 'POSTGRESQL') { - # update current (mit PK), insert-values für current wird generiert - $sth_uc = $dbh->prepare("INSERT INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?) ON CONFLICT ($pkc) - DO UPDATE SET TIMESTAMP=EXCLUDED.TIMESTAMP, DEVICE=EXCLUDED.DEVICE, TYPE=EXCLUDED.TYPE, EVENT=EXCLUDED.EVENT, READING=EXCLUDED.READING, - VALUE=EXCLUDED.VALUE, UNIT=EXCLUDED.UNIT"); - $sth_uc->bind_param_array(1, [@timestamp]); - $sth_uc->bind_param_array(2, [@device]); - $sth_uc->bind_param_array(3, [@type]); - $sth_uc->bind_param_array(4, [@event]); - $sth_uc->bind_param_array(5, [@reading]); - $sth_uc->bind_param_array(6, [@value]); - $sth_uc->bind_param_array(7, [@unit]); - } else { - # for update current (ohne PK), insert-values für current wird generiert - $sth_uc = $dbh->prepare("UPDATE current SET TIMESTAMP=?, TYPE=?, EVENT=?, VALUE=?, UNIT=? WHERE (DEVICE=?) AND (READING=?)"); - $sth_uc->bind_param_array(1, [@timestamp]); + + if($bi) { + ####################### + # Bulk-Insert + ####################### + $st = [gettimeofday]; # SQL-Startzeit + + if (lc($DbLogType) =~ m(history)) { + ######################################## + # insert history mit/ohne primary key + if ($usepkh && $hash->{MODEL} eq 'MYSQL') { + $sqlins = "INSERT IGNORE INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES "; + } elsif ($usepkh && $hash->{MODEL} eq 'SQLITE') { + $sqlins = "INSERT OR IGNORE INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES "; + } elsif ($usepkh && $hash->{MODEL} eq 'POSTGRESQL') { + $sqlins = "INSERT INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES "; + } else { + # ohne PK + $sqlins = "INSERT INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES "; + } + + no warnings 'uninitialized'; + foreach my $row (@row_array) { + my @a = split("\\|",$row); + s/_ESC_/\|/g for @a; # escaped Pipe return to "|" + Log3 $hash->{NAME}, 5, "DbLog $name -> processing event Timestamp: $a[0], Device: $a[1], Type: $a[2], Event: $a[3], Reading: $a[4], Value: $a[5], Unit: $a[6]"; + $a[3] =~ s/'/''/g; # escape ' with '' + $a[5] =~ s/'/''/g; # escape ' with '' + $a[6] =~ s/'/''/g; # escape ' with '' + $sqlins .= "('$a[0]','$a[1]','$a[2]','$a[3]','$a[4]','$a[5]','$a[6]'),"; + } + use warnings; + + chop($sqlins); + + if ($usepkh && $hash->{MODEL} eq 'POSTGRESQL') { + $sqlins .= " ON CONFLICT DO NOTHING"; + } + + eval { $dbh->begin_work() if($useta && $dbh->{AutoCommit}); }; # Transaktion wenn gewünscht und autocommit ein + if ($@) { + Log3($name, 2, "DbLog $name -> Error start transaction for history - $@"); + } + eval { $sth_ih = $dbh->prepare($sqlins); + if($tl) { + # Tracelevel setzen + $sth_ih->{TraceLevel} = "$tl|$tf"; + } + my $ins_hist = $sth_ih->execute(); + $ins_hist = 0 if($ins_hist eq "0E0"); + + if($ins_hist == $ceti) { + Log3 $hash->{NAME}, 4, "DbLog $name -> $ins_hist of $ceti events inserted into table history".($usepkh?" using PK on columns $pkh":""); + } else { + if($usepkh) { + Log3 $hash->{NAME}, 3, "DbLog $name -> INFO - ".$ins_hist." of $ceti events inserted into table history due to PK on columns $pkh"; + } else { + Log3 $hash->{NAME}, 2, "DbLog $name -> WARNING - only ".$ins_hist." of $ceti events inserted into table history"; + } + } + eval {$dbh->commit() if(!$dbh->{AutoCommit});}; # Data commit + if ($@) { + Log3($name, 2, "DbLog $name -> Error commit history - $@"); + } else { + if(!$dbh->{AutoCommit}) { + Log3($name, 4, "DbLog $name -> insert table history committed"); + } else { + Log3($name, 4, "DbLog $name -> insert table history committed by autocommit"); + } + } + }; + + if ($@) { + $errorh = $@; + Log3 $hash->{NAME}, 2, "DbLog $name -> Error table history - $errorh"; + eval {$dbh->rollback() if(!$dbh->{AutoCommit});}; # issue Turning on AutoCommit failed + if ($@) { + Log3($name, 2, "DbLog $name -> Error rollback history - $@"); + } else { + Log3($name, 4, "DbLog $name -> insert history rolled back"); + } + } + } + + if (lc($DbLogType) =~ m(current)) { + ################################################################# + # insert current mit/ohne primary key + # Array-Insert wird auch bei Bulk verwendet weil im Bulk-Mode + # die nicht upgedateten Sätze nicht identifiziert werden können + if ($usepkc && $hash->{MODEL} eq 'MYSQL') { + eval { $sth_ic = $dbh->prepare("INSERT IGNORE INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; + } elsif ($usepkc && $hash->{MODEL} eq 'SQLITE') { + eval { $sth_ic = $dbh->prepare("INSERT OR IGNORE INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; + } elsif ($usepkc && $hash->{MODEL} eq 'POSTGRESQL') { + eval { $sth_ic = $dbh->prepare("INSERT INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?) ON CONFLICT DO NOTHING"); }; + } else { + # ohne PK + eval { $sth_ic = $dbh->prepare("INSERT INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; + } + if ($@) { + return $@; + } + + if ($usepkc && $hash->{MODEL} eq 'MYSQL') { + $sth_uc = $dbh->prepare("REPLACE INTO current (TIMESTAMP, TYPE, EVENT, VALUE, UNIT, DEVICE, READING) VALUES (?,?,?,?,?,?,?)"); + } elsif ($usepkc && $hash->{MODEL} eq 'SQLITE') { + $sth_uc = $dbh->prepare("INSERT OR REPLACE INTO current (TIMESTAMP, TYPE, EVENT, VALUE, UNIT, DEVICE, READING) VALUES (?,?,?,?,?,?,?)"); + } elsif ($usepkc && $hash->{MODEL} eq 'POSTGRESQL') { + $sth_uc = $dbh->prepare("INSERT INTO current (TIMESTAMP, TYPE, EVENT, VALUE, UNIT, DEVICE, READING) VALUES (?,?,?,?,?,?,?) ON CONFLICT ($pkc) + DO UPDATE SET TIMESTAMP=EXCLUDED.TIMESTAMP, DEVICE=EXCLUDED.DEVICE, TYPE=EXCLUDED.TYPE, EVENT=EXCLUDED.EVENT, READING=EXCLUDED.READING, + VALUE=EXCLUDED.VALUE, UNIT=EXCLUDED.UNIT"); + } else { + $sth_uc = $dbh->prepare("UPDATE current SET TIMESTAMP=?, TYPE=?, EVENT=?, VALUE=?, UNIT=? WHERE (DEVICE=?) AND (READING=?)"); + } + + if($tl) { + # Tracelevel setzen + $sth_uc->{TraceLevel} = "$tl|$tf"; + $sth_ic->{TraceLevel} = "$tl|$tf"; + } + + $sth_uc->bind_param_array(1, [@timestamp]); $sth_uc->bind_param_array(2, [@type]); $sth_uc->bind_param_array(3, [@event]); $sth_uc->bind_param_array(4, [@value]); $sth_uc->bind_param_array(5, [@unit]); $sth_uc->bind_param_array(6, [@device]); $sth_uc->bind_param_array(7, [@reading]); - } - } - - if($tl) { - # Tracelevel setzen - $dbh->{TraceLevel} = "$tl|$tf"; - $sth_ih->{TraceLevel} = "$tl|$tf"; - } - - my ($tuples, $rows); - - # insert into history-Tabelle - eval { $dbh->begin_work() if($useta && $dbh->{AutoCommit}); }; # Transaktion wenn gewünscht und autocommit ein - if ($@) { - Log3($name, 2, "DbLog $name -> Error start transaction for history - $@"); - } - eval { - if (lc($DbLogType) =~ m(history) ) { - ($tuples, $rows) = $sth_ih->execute_array( { ArrayTupleStatus => \my @tuple_status } ); - my $nins_hist = 0; - for my $tuple (0..$#row_array) { - my $status = $tuple_status[$tuple]; - $status = 0 if($status eq "0E0"); - next if($status); # $status ist "1" wenn insert ok - Log3 $hash->{NAME}, 3, "DbLog $name -> Insert into history rejected".($usepkh?" (possible PK violation) ":" ")."- TS: $timestamp[$tuple], Device: $device[$tuple], Event: $event[$tuple]"; - $nins_hist++; - } - if(!$nins_hist) { - Log3 $hash->{NAME}, 4, "DbLog $name -> $ceti of $ceti events inserted into table history".($usepkh?" using PK on columns $pkh":""); - } else { - Log3 $hash->{NAME}, 4, "DbLog $name -> ".($ceti-$nins_hist)." of $ceti events inserted into table history".($usepkh?" using PK on columns $pkh":""); - } - eval {$dbh->commit() if(!$dbh->{AutoCommit});}; # issue Turning on AutoCommit failed + + eval { $dbh->begin_work() if($useta && $dbh->{AutoCommit}); }; # Transaktion wenn gewünscht und autocommit ein if ($@) { - Log3($name, 2, "DbLog $name -> Error commit history - $@"); - } else { - if(!$dbh->{AutoCommit}) { - Log3($name, 4, "DbLog $name -> insert table history committed"); - } else { - Log3($name, 4, "DbLog $name -> insert table history committed by autocommit"); - } - } - } - }; - - if ($@) { - Log3 $hash->{NAME}, 2, "DbLog $name -> Error table history - $@"; - $errorh = $@; - eval {$dbh->rollback() if(!$dbh->{AutoCommit});}; # issue Turning on AutoCommit failed - if ($@) { - Log3($name, 2, "DbLog $name -> Error rollback history - $@"); - } else { - Log3($name, 4, "DbLog $name -> insert history rolled back"); - } - } - - # update or insert current - eval { $dbh->begin_work() if($useta && $dbh->{AutoCommit}); }; # Transaktion wenn gewünscht und autocommit ein - if ($@) { - Log3($name, 2, "DbLog $name -> Error start transaction for history - $@"); - } - eval { - if (lc($DbLogType) =~ m(current) ) { - ($tuples, $rows) = $sth_uc->execute_array( { ArrayTupleStatus => \my @tuple_status } ); - # Log3 $hash->{NAME}, 2, "DbLog $name -> Rows: $rows, Ceti: $ceti"; - my $nupd_cur = 0; - for my $tuple (0..$#row_array) { - my $status = $tuple_status[$tuple]; - $status = 0 if($status eq "0E0"); - next if($status); # $status ist "1" wenn update ok - Log3 $hash->{NAME}, 4, "DbLog $name -> Failed to update in current, try to insert - TS: $timestamp[$tuple], Device: $device[$tuple], Reading: $reading[$tuple], Status = $status"; - push(@timestamp_cur, "$timestamp[$tuple]"); - push(@device_cur, "$device[$tuple]"); - push(@type_cur, "$type[$tuple]"); - push(@event_cur, "$event[$tuple]"); - push(@reading_cur, "$reading[$tuple]"); - push(@value_cur, "$value[$tuple]"); - push(@unit_cur, "$unit[$tuple]"); - $nupd_cur++; - } - if(!$nupd_cur) { - Log3 $hash->{NAME}, 4, "DbLog $name -> $ceti of $ceti events updated in table current".($usepkc?" using PK on columns $pkc":""); - } else { - Log3 $hash->{NAME}, 4, "DbLog $name -> $nupd_cur of $ceti events not updated and try to insert into table current".($usepkc?" using PK on columns $pkc":""); - $doins = 1; - } - - if ($doins) { - # events die nicht in Tabelle current updated wurden, werden in current neu eingefügt - $sth_ic->bind_param_array(1, [@timestamp_cur]); - $sth_ic->bind_param_array(2, [@device_cur]); - $sth_ic->bind_param_array(3, [@type_cur]); - $sth_ic->bind_param_array(4, [@event_cur]); - $sth_ic->bind_param_array(5, [@reading_cur]); - $sth_ic->bind_param_array(6, [@value_cur]); - $sth_ic->bind_param_array(7, [@unit_cur]); - - ($tuples, $rows) = $sth_ic->execute_array( { ArrayTupleStatus => \my @tuple_status } ); - my $nins_cur = 0; - for my $tuple (0..$#device_cur) { - my $status = $tuple_status[$tuple]; - $status = 0 if($status eq "0E0"); - next if($status); # $status ist "1" wenn insert ok - Log3 $hash->{NAME}, 3, "DbLog $name -> Insert into current rejected - TS: $timestamp[$tuple], Device: $device_cur[$tuple], Reading: $reading_cur[$tuple], Status = $status"; - $nins_cur++; - } - if(!$nins_cur) { - Log3 $hash->{NAME}, 4, "DbLog $name -> ".($#device_cur+1)." of ".($#device_cur+1)." events inserted into table current ".($usepkc?" using PK on columns $pkc":""); - } else { - Log3 $hash->{NAME}, 4, "DbLog $name -> ".($#device_cur+1-$nins_cur)." of ".($#device_cur+1)." events inserted into table current".($usepkc?" using PK on columns $pkc":""); - } + Log3($name, 2, "DbLog $name -> Error start transaction for current - $@"); } - eval {$dbh->commit() if(!$dbh->{AutoCommit});}; # issue Turning on AutoCommit failed - if ($@) { - Log3($name, 2, "DbLog $name -> Error commit table current - $@"); + eval { + ($tuples, $rows) = $sth_uc->execute_array( { ArrayTupleStatus => \my @tuple_status } ); + my $nupd_cur = 0; + for my $tuple (0..$#row_array) { + my $status = $tuple_status[$tuple]; + $status = 0 if($status eq "0E0"); + next if($status); # $status ist "1" wenn update ok + Log3 $hash->{NAME}, 4, "DbLog $name -> Failed to update in current, try to insert - TS: $timestamp[$tuple], Device: $device[$tuple], Reading: $reading[$tuple], Status = $status"; + push(@timestamp_cur, "$timestamp[$tuple]"); + push(@device_cur, "$device[$tuple]"); + push(@type_cur, "$type[$tuple]"); + push(@event_cur, "$event[$tuple]"); + push(@reading_cur, "$reading[$tuple]"); + push(@value_cur, "$value[$tuple]"); + push(@unit_cur, "$unit[$tuple]"); + $nupd_cur++; + } + if(!$nupd_cur) { + Log3 $hash->{NAME}, 4, "DbLog $name -> $ceti of $ceti events updated in table current".($usepkc?" using PK on columns $pkc":""); + } else { + Log3 $hash->{NAME}, 4, "DbLog $name -> $nupd_cur of $ceti events not updated and try to insert into table current".($usepkc?" using PK on columns $pkc":""); + $doins = 1; + } + + if ($doins) { + # events die nicht in Tabelle current updated wurden, werden in current neu eingefügt + $sth_ic->bind_param_array(1, [@timestamp_cur]); + $sth_ic->bind_param_array(2, [@device_cur]); + $sth_ic->bind_param_array(3, [@type_cur]); + $sth_ic->bind_param_array(4, [@event_cur]); + $sth_ic->bind_param_array(5, [@reading_cur]); + $sth_ic->bind_param_array(6, [@value_cur]); + $sth_ic->bind_param_array(7, [@unit_cur]); + + ($tuples, $rows) = $sth_ic->execute_array( { ArrayTupleStatus => \my @tuple_status } ); + my $nins_cur = 0; + for my $tuple (0..$#device_cur) { + my $status = $tuple_status[$tuple]; + $status = 0 if($status eq "0E0"); + next if($status); # $status ist "1" wenn insert ok + Log3 $hash->{NAME}, 3, "DbLog $name -> Insert into current rejected - TS: $timestamp[$tuple], Device: $device_cur[$tuple], Reading: $reading_cur[$tuple], Status = $status"; + $nins_cur++; + } + if(!$nins_cur) { + Log3 $hash->{NAME}, 4, "DbLog $name -> ".($#device_cur+1)." of ".($#device_cur+1)." events inserted into table current ".($usepkc?" using PK on columns $pkc":""); + } else { + Log3 $hash->{NAME}, 4, "DbLog $name -> ".($#device_cur+1-$nins_cur)." of ".($#device_cur+1)." events inserted into table current".($usepkc?" using PK on columns $pkc":""); + } + } + eval {$dbh->commit() if(!$dbh->{AutoCommit});}; # issue Turning on AutoCommit failed + if ($@) { + Log3($name, 2, "DbLog $name -> Error commit table current - $@"); + } else { + if(!$dbh->{AutoCommit}) { + Log3($name, 4, "DbLog $name -> insert / update table current committed"); + } else { + Log3($name, 4, "DbLog $name -> insert / update table current committed by autocommit"); + } + } + }; + } + + } else { + ####################### + # Array-Insert + ####################### + + $st = [gettimeofday]; # SQL-Startzeit + + if (lc($DbLogType) =~ m(history)) { + ######################################## + # insert history mit/ohne primary key + if ($usepkh && $hash->{MODEL} eq 'MYSQL') { + eval { $sth_ih = $dbh->prepare("INSERT IGNORE INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; + } elsif ($usepkh && $hash->{MODEL} eq 'SQLITE') { + eval { $sth_ih = $dbh->prepare("INSERT OR IGNORE INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; + } elsif ($usepkh && $hash->{MODEL} eq 'POSTGRESQL') { + eval { $sth_ih = $dbh->prepare("INSERT INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?) ON CONFLICT DO NOTHING"); }; } else { - if(!$dbh->{AutoCommit}) { - Log3($name, 4, "DbLog $name -> insert / update table current committed"); - } else { - Log3($name, 4, "DbLog $name -> insert / update table current committed by autocommit"); - } - } - } - }; + # ohne PK + eval { $sth_ih = $dbh->prepare("INSERT INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; + } + if ($@) { + return $@; + } + + if($tl) { + # Tracelevel setzen + $sth_ih->{TraceLevel} = "$tl|$tf"; + } + + $sth_ih->bind_param_array(1, [@timestamp]); + $sth_ih->bind_param_array(2, [@device]); + $sth_ih->bind_param_array(3, [@type]); + $sth_ih->bind_param_array(4, [@event]); + $sth_ih->bind_param_array(5, [@reading]); + $sth_ih->bind_param_array(6, [@value]); + $sth_ih->bind_param_array(7, [@unit]); + + eval { $dbh->begin_work() if($useta && $dbh->{AutoCommit}); }; # Transaktion wenn gewünscht und autocommit ein + if ($@) { + Log3($name, 2, "DbLog $name -> Error start transaction for history - $@"); + } + eval { + ($tuples, $rows) = $sth_ih->execute_array( { ArrayTupleStatus => \my @tuple_status } ); + my $nins_hist = 0; + for my $tuple (0..$#row_array) { + my $status = $tuple_status[$tuple]; + $status = 0 if($status eq "0E0"); + next if($status); # $status ist "1" wenn insert ok + Log3 $hash->{NAME}, 3, "DbLog $name -> Insert into history rejected".($usepkh?" (possible PK violation) ":" ")."- TS: $timestamp[$tuple], Device: $device[$tuple], Event: $event[$tuple]"; + my $nlh = ($timestamp[$tuple]."|".$device[$tuple]."|".$type[$tuple]."|".$event[$tuple]."|".$reading[$tuple]."|".$value[$tuple]."|".$unit[$tuple]); + $nins_hist++; + } + if(!$nins_hist) { + Log3 $hash->{NAME}, 4, "DbLog $name -> $ceti of $ceti events inserted into table history".($usepkh?" using PK on columns $pkh":""); + } else { + if($usepkh) { + Log3 $hash->{NAME}, 3, "DbLog $name -> INFO - ".($ceti-$nins_hist)." of $ceti events inserted into table history due to PK on columns $pkh"; + } else { + Log3 $hash->{NAME}, 2, "DbLog $name -> WARNING - only ".($ceti-$nins_hist)." of $ceti events inserted into table history"; + } + } + eval {$dbh->commit() if(!$dbh->{AutoCommit});}; # Data commit + if ($@) { + Log3($name, 2, "DbLog $name -> Error commit history - $@"); + } else { + if(!$dbh->{AutoCommit}) { + Log3($name, 4, "DbLog $name -> insert table history committed"); + } else { + Log3($name, 4, "DbLog $name -> insert table history committed by autocommit"); + } + } + }; + + if ($@) { + $errorh = $@; + Log3 $hash->{NAME}, 2, "DbLog $name -> Error table history - $errorh"; + eval {$dbh->rollback() if(!$dbh->{AutoCommit});}; # issue Turning on AutoCommit failed + if ($@) { + Log3($name, 2, "DbLog $name -> Error rollback history - $@"); + } else { + Log3($name, 4, "DbLog $name -> insert history rolled back"); + } + } + } + + if (lc($DbLogType) =~ m(current)) { + ######################################## + # insert current mit/ohne primary key + if ($usepkc && $hash->{MODEL} eq 'MYSQL') { + eval { $sth_ic = $dbh->prepare("INSERT IGNORE INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; + } elsif ($usepkc && $hash->{MODEL} eq 'SQLITE') { + eval { $sth_ic = $dbh->prepare("INSERT OR IGNORE INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; + } elsif ($usepkc && $hash->{MODEL} eq 'POSTGRESQL') { + eval { $sth_ic = $dbh->prepare("INSERT INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?) ON CONFLICT DO NOTHING"); }; + } else { + # ohne PK + eval { $sth_ic = $dbh->prepare("INSERT INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; + } + if ($@) { + return $@; + } + + if ($usepkc && $hash->{MODEL} eq 'MYSQL') { + $sth_uc = $dbh->prepare("REPLACE INTO current (TIMESTAMP, TYPE, EVENT, VALUE, UNIT, DEVICE, READING) VALUES (?,?,?,?,?,?,?)"); + } elsif ($usepkc && $hash->{MODEL} eq 'SQLITE') { + $sth_uc = $dbh->prepare("INSERT OR REPLACE INTO current (TIMESTAMP, TYPE, EVENT, VALUE, UNIT, DEVICE, READING) VALUES (?,?,?,?,?,?,?)"); + } elsif ($usepkc && $hash->{MODEL} eq 'POSTGRESQL') { + $sth_uc = $dbh->prepare("INSERT INTO current (TIMESTAMP, TYPE, EVENT, VALUE, UNIT, DEVICE, READING) VALUES (?,?,?,?,?,?,?) ON CONFLICT ($pkc) + DO UPDATE SET TIMESTAMP=EXCLUDED.TIMESTAMP, DEVICE=EXCLUDED.DEVICE, TYPE=EXCLUDED.TYPE, EVENT=EXCLUDED.EVENT, READING=EXCLUDED.READING, + VALUE=EXCLUDED.VALUE, UNIT=EXCLUDED.UNIT"); + } else { + $sth_uc = $dbh->prepare("UPDATE current SET TIMESTAMP=?, TYPE=?, EVENT=?, VALUE=?, UNIT=? WHERE (DEVICE=?) AND (READING=?)"); + } + + if($tl) { + # Tracelevel setzen + $sth_uc->{TraceLevel} = "$tl|$tf"; + $sth_ic->{TraceLevel} = "$tl|$tf"; + } + + $sth_uc->bind_param_array(1, [@timestamp]); + $sth_uc->bind_param_array(2, [@type]); + $sth_uc->bind_param_array(3, [@event]); + $sth_uc->bind_param_array(4, [@value]); + $sth_uc->bind_param_array(5, [@unit]); + $sth_uc->bind_param_array(6, [@device]); + $sth_uc->bind_param_array(7, [@reading]); + + eval { $dbh->begin_work() if($useta && $dbh->{AutoCommit}); }; # Transaktion wenn gewünscht und autocommit ein + if ($@) { + Log3($name, 2, "DbLog $name -> Error start transaction for current - $@"); + } + eval { + ($tuples, $rows) = $sth_uc->execute_array( { ArrayTupleStatus => \my @tuple_status } ); + my $nupd_cur = 0; + for my $tuple (0..$#row_array) { + my $status = $tuple_status[$tuple]; + $status = 0 if($status eq "0E0"); + next if($status); # $status ist "1" wenn update ok + Log3 $hash->{NAME}, 4, "DbLog $name -> Failed to update in current, try to insert - TS: $timestamp[$tuple], Device: $device[$tuple], Reading: $reading[$tuple], Status = $status"; + push(@timestamp_cur, "$timestamp[$tuple]"); + push(@device_cur, "$device[$tuple]"); + push(@type_cur, "$type[$tuple]"); + push(@event_cur, "$event[$tuple]"); + push(@reading_cur, "$reading[$tuple]"); + push(@value_cur, "$value[$tuple]"); + push(@unit_cur, "$unit[$tuple]"); + $nupd_cur++; + } + if(!$nupd_cur) { + Log3 $hash->{NAME}, 4, "DbLog $name -> $ceti of $ceti events updated in table current".($usepkc?" using PK on columns $pkc":""); + } else { + Log3 $hash->{NAME}, 4, "DbLog $name -> $nupd_cur of $ceti events not updated and try to insert into table current".($usepkc?" using PK on columns $pkc":""); + $doins = 1; + } + + if ($doins) { + # events die nicht in Tabelle current updated wurden, werden in current neu eingefügt + $sth_ic->bind_param_array(1, [@timestamp_cur]); + $sth_ic->bind_param_array(2, [@device_cur]); + $sth_ic->bind_param_array(3, [@type_cur]); + $sth_ic->bind_param_array(4, [@event_cur]); + $sth_ic->bind_param_array(5, [@reading_cur]); + $sth_ic->bind_param_array(6, [@value_cur]); + $sth_ic->bind_param_array(7, [@unit_cur]); + + ($tuples, $rows) = $sth_ic->execute_array( { ArrayTupleStatus => \my @tuple_status } ); + my $nins_cur = 0; + for my $tuple (0..$#device_cur) { + my $status = $tuple_status[$tuple]; + $status = 0 if($status eq "0E0"); + next if($status); # $status ist "1" wenn insert ok + Log3 $hash->{NAME}, 3, "DbLog $name -> Insert into current rejected - TS: $timestamp[$tuple], Device: $device_cur[$tuple], Reading: $reading_cur[$tuple], Status = $status"; + $nins_cur++; + } + if(!$nins_cur) { + Log3 $hash->{NAME}, 4, "DbLog $name -> ".($#device_cur+1)." of ".($#device_cur+1)." events inserted into table current ".($usepkc?" using PK on columns $pkc":""); + } else { + Log3 $hash->{NAME}, 4, "DbLog $name -> ".($#device_cur+1-$nins_cur)." of ".($#device_cur+1)." events inserted into table current".($usepkc?" using PK on columns $pkc":""); + } + } + eval {$dbh->commit() if(!$dbh->{AutoCommit});}; # issue Turning on AutoCommit failed + if ($@) { + Log3($name, 2, "DbLog $name -> Error commit table current - $@"); + } else { + if(!$dbh->{AutoCommit}) { + Log3($name, 4, "DbLog $name -> insert / update table current committed"); + } else { + Log3($name, 4, "DbLog $name -> insert / update table current committed by autocommit"); + } + } + }; + } + } + + # SQL-Laufzeit ermitteln + my $rt = tv_interval($st); + + if(AttrVal($name, "showproctime", 0)) { + readingsBeginUpdate($hash); + readingsBulkUpdate($hash, "sql_processing_time", sprintf("%.4f",$rt)); + readingsEndUpdate($hash, 0); + } if ($errorh) { $error = $errorh; @@ -1858,7 +2066,7 @@ sub DbLog_execmemcache ($) { $error = "Commit already running - resync at NextSync"; } else { CancelDelayedShutdown($name) if($hash->{HELPER}{SHUTDOWNSEQ}); - Log3 ($name, 2, "DbLog $name - no data for Last database write cycle") if(delete $hash->{HELPER}{SHUTDOWNSEQ}); + Log3 ($name, 2, "DbLog $name - no data for last database write cycle") if(delete $hash->{HELPER}{SHUTDOWNSEQ}); } } @@ -1899,6 +2107,7 @@ sub DbLog_PushAsync(@) { my $supk = AttrVal($name, "noSupportPK", 0); my $tl = AttrVal($name, "traceLevel", 0); my $tf = AttrVal($name, "traceFlag", "SQL"); + my $bi = AttrVal($name, "bulkInsert", 0); my $utf8 = defined($hash->{UTF8})?$hash->{UTF8}:0; my $errorh = 0; my $error = 0; @@ -1913,14 +2122,16 @@ sub DbLog_PushAsync(@) { my $bst = [gettimeofday]; my ($useac,$useta) = DbLog_commitMode($hash); - if(!$useac) { - eval {$dbh = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, AutoCommit => 0, mysql_enable_utf8 => $utf8 });}; - } elsif($useac == 1) { - eval {$dbh = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, AutoCommit => 1, mysql_enable_utf8 => $utf8 });}; - } else { - # Server default - eval {$dbh = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, mysql_enable_utf8 => $utf8 });}; - } + eval { + if(!$useac) { + $dbh = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, AutoCommit => 0, mysql_enable_utf8 => $utf8 }); + } elsif($useac == 1) { + $dbh = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, AutoCommit => 1, mysql_enable_utf8 => $utf8 }); + } else { + # Server default + $dbh = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, mysql_enable_utf8 => $utf8 }); + } + }; if ($@) { $error = encode_base64($@,""); Log3 ($name, 2, "DbLog $name - Error: $@"); @@ -1928,9 +2139,15 @@ sub DbLog_PushAsync(@) { return "$name|$error|0|$rowlist"; } + if($tl) { + # Tracelevel setzen + $dbh->{TraceLevel} = "$tl|$tf"; + } + my $ac = ($dbh->{AutoCommit})?"ON":"OFF"; my $tm = ($useta)?"ON":"OFF"; Log3 $hash->{NAME}, 4, "DbLog $name -> AutoCommit mode: $ac, Transaction mode: $tm"; + Log3 $hash->{NAME}, 4, "DbLog $name -> Insert mode: ".($bi?"Bulk":"Array"); # check ob PK verwendet wird, @usepkx?Anzahl der Felder im PK:0 wenn kein PK, $pkx?Namen der Felder:none wenn kein PK my ($usepkh,$usepkc,$pkh,$pkc); @@ -1940,249 +2157,428 @@ sub DbLog_PushAsync(@) { Log3 $hash->{NAME}, 5, "DbLog $name -> Primary Key usage suppressed by attribute noSupportPK"; } - my $rowldec = decode_base64($rowlist); + my $rowldec = decode_base64($rowlist); my @row_array = split('§', $rowldec); + my $ceti = $#row_array+1; my (@timestamp,@device,@type,@event,@reading,@value,@unit); my (@timestamp_cur,@device_cur,@type_cur,@event_cur,@reading_cur,@value_cur,@unit_cur); - my ($sth_ih,$sth_ic,$sth_uc); + my ($st,$sth_ih,$sth_ic,$sth_uc,$sqlins); + my ($tuples, $rows); + no warnings 'uninitialized'; - - my $ceti = $#row_array+1; - foreach my $row (@row_array) { my @a = split("\\|",$row); s/_ESC_/\|/g for @a; # escaped Pipe return to "|" - push(@timestamp, "$a[0]"); - push(@device, "$a[1]"); - push(@type, "$a[2]"); - push(@event, "$a[3]"); - push(@reading, "$a[4]"); - push(@value, "$a[5]"); - push(@unit, "$a[6]"); - Log3 $hash->{NAME}, 5, "DbLog $name -> processing event Timestamp: $a[0], Device: $a[1], Type: $a[2], Event: $a[3], Reading: $a[4], Value: $a[5], Unit: $a[6]"; + push(@timestamp, "$a[0]"); + push(@device, "$a[1]"); + push(@type, "$a[2]"); + push(@event, "$a[3]"); + push(@reading, "$a[4]"); + push(@value, "$a[5]"); + push(@unit, "$a[6]"); + Log3 $hash->{NAME}, 5, "DbLog $name -> processing event Timestamp: $a[0], Device: $a[1], Type: $a[2], Event: $a[3], Reading: $a[4], Value: $a[5], Unit: $a[6]"; } - use warnings; + use warnings; - if (lc($DbLogType) =~ m(history)) { - # insert history mit/ohne primary key - if ($usepkh && $hash->{MODEL} eq 'MYSQL') { - eval { $sth_ih = $dbh->prepare("INSERT IGNORE INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; - } elsif ($usepkh && $hash->{MODEL} eq 'SQLITE') { - eval { $sth_ih = $dbh->prepare("INSERT OR IGNORE INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; - } elsif ($usepkh && $hash->{MODEL} eq 'POSTGRESQL') { - eval { $sth_ih = $dbh->prepare("INSERT INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?) ON CONFLICT DO NOTHING"); }; - } else { - # old behavior - eval { $sth_ih = $dbh->prepare("INSERT INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; - } - if ($@) { - # Eventliste zurückgeben wenn z.B. disk I/O error bei SQLITE - $error = encode_base64($@,""); - Log3 ($name, 2, "DbLog $name - Error: $@"); - Log3 ($name, 5, "DbLog $name -> DbLog_PushAsync finished"); - $dbh->disconnect(); - return "$name|$error|0|$rowlist"; - } - $sth_ih->bind_param_array(1, [@timestamp]); - $sth_ih->bind_param_array(2, [@device]); - $sth_ih->bind_param_array(3, [@type]); - $sth_ih->bind_param_array(4, [@event]); - $sth_ih->bind_param_array(5, [@reading]); - $sth_ih->bind_param_array(6, [@value]); - $sth_ih->bind_param_array(7, [@unit]); - } - - if (lc($DbLogType) =~ m(current) ) { - # insert current mit/ohne primary key, insert-values für current werden generiert - if ($usepkc && $hash->{MODEL} eq 'MYSQL') { - eval { $sth_ic = $dbh->prepare("INSERT IGNORE INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; - } elsif ($usepkc && $hash->{MODEL} eq 'SQLITE') { - eval { $sth_ic = $dbh->prepare("INSERT OR IGNORE INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; - } elsif ($usepkc && $hash->{MODEL} eq 'POSTGRESQL') { - eval { $sth_ic = $dbh->prepare("INSERT INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?) ON CONFLICT DO NOTHING"); }; - } else { - # old behavior - eval { $sth_ic = $dbh->prepare("INSERT INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; - } - if ($@) { - # Eventliste zurückgeben wenn z.B. Disk I/O error bei SQLITE - $error = encode_base64($@,""); - Log3 ($name, 2, "DbLog $name - Error: $@"); - Log3 ($name, 5, "DbLog $name -> DbLog_PushAsync finished"); - $dbh->disconnect(); - return "$name|$error|0|$rowlist"; - } - if ($usepkc && $hash->{MODEL} eq 'MYSQL') { - # update current (mit PK), insert-values für current wird generiert - $sth_uc = $dbh->prepare("REPLACE INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); - $sth_uc->bind_param_array(1, [@timestamp]); - $sth_uc->bind_param_array(2, [@device]); - $sth_uc->bind_param_array(3, [@type]); - $sth_uc->bind_param_array(4, [@event]); - $sth_uc->bind_param_array(5, [@reading]); - $sth_uc->bind_param_array(6, [@value]); - $sth_uc->bind_param_array(7, [@unit]); - } elsif ($usepkc && $hash->{MODEL} eq 'SQLITE') { - # update current (mit PK), insert-values für current wird generiert - $sth_uc = $dbh->prepare("INSERT OR REPLACE INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); - $sth_uc->bind_param_array(1, [@timestamp]); - $sth_uc->bind_param_array(2, [@device]); - $sth_uc->bind_param_array(3, [@type]); - $sth_uc->bind_param_array(4, [@event]); - $sth_uc->bind_param_array(5, [@reading]); - $sth_uc->bind_param_array(6, [@value]); - $sth_uc->bind_param_array(7, [@unit]); - } elsif ($usepkc && $hash->{MODEL} eq 'POSTGRESQL') { - # update current (mit PK), insert-values für current wird generiert - $sth_uc = $dbh->prepare("INSERT INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?) ON CONFLICT ($pkc) - DO UPDATE SET TIMESTAMP=EXCLUDED.TIMESTAMP, DEVICE=EXCLUDED.DEVICE, TYPE=EXCLUDED.TYPE, EVENT=EXCLUDED.EVENT, READING=EXCLUDED.READING, - VALUE=EXCLUDED.VALUE, UNIT=EXCLUDED.UNIT"); - $sth_uc->bind_param_array(1, [@timestamp]); - $sth_uc->bind_param_array(2, [@device]); - $sth_uc->bind_param_array(3, [@type]); - $sth_uc->bind_param_array(4, [@event]); - $sth_uc->bind_param_array(5, [@reading]); - $sth_uc->bind_param_array(6, [@value]); - $sth_uc->bind_param_array(7, [@unit]); - } else { - # update current (ohne PK), insert-values für current wird generiert - $sth_uc = $dbh->prepare("UPDATE current SET TIMESTAMP=?, TYPE=?, EVENT=?, VALUE=?, UNIT=? WHERE (DEVICE=?) AND (READING=?)"); - $sth_uc->bind_param_array(1, [@timestamp]); + if($bi) { + ####################### + # Bulk-Insert + ####################### + $st = [gettimeofday]; # SQL-Startzeit + + if (lc($DbLogType) =~ m(history)) { + ######################################## + # insert history mit/ohne primary key + if ($usepkh && $hash->{MODEL} eq 'MYSQL') { + $sqlins = "INSERT IGNORE INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES "; + } elsif ($usepkh && $hash->{MODEL} eq 'SQLITE') { + $sqlins = "INSERT OR IGNORE INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES "; + } elsif ($usepkh && $hash->{MODEL} eq 'POSTGRESQL') { + $sqlins = "INSERT INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES "; + } else { + # ohne PK + $sqlins = "INSERT INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES "; + } + no warnings 'uninitialized'; + foreach my $row (@row_array) { + my @a = split("\\|",$row); + s/_ESC_/\|/g for @a; # escaped Pipe return to "|" + Log3 $hash->{NAME}, 5, "DbLog $name -> processing event Timestamp: $a[0], Device: $a[1], Type: $a[2], Event: $a[3], Reading: $a[4], Value: $a[5], Unit: $a[6]"; + $a[3] =~ s/'/''/g; # escape ' with '' + $a[5] =~ s/'/''/g; # escape ' with '' + $a[6] =~ s/'/''/g; # escape ' with '' + $sqlins .= "('$a[0]','$a[1]','$a[2]','$a[3]','$a[4]','$a[5]','$a[6]'),"; + } + use warnings; + + chop($sqlins); + + if ($usepkh && $hash->{MODEL} eq 'POSTGRESQL') { + $sqlins .= " ON CONFLICT DO NOTHING"; + } + + eval { $dbh->begin_work() if($useta && $dbh->{AutoCommit}); }; # Transaktion wenn gewünscht und autocommit ein + if ($@) { + Log3($name, 2, "DbLog $name -> Error start transaction for history - $@"); + } + eval { $sth_ih = $dbh->prepare($sqlins); + if($tl) { + # Tracelevel setzen + $sth_ih->{TraceLevel} = "$tl|$tf"; + } + my $ins_hist = $sth_ih->execute(); + $ins_hist = 0 if($ins_hist eq "0E0"); + + if($ins_hist == $ceti) { + Log3 $hash->{NAME}, 4, "DbLog $name -> $ins_hist of $ceti events inserted into table history".($usepkh?" using PK on columns $pkh":""); + } else { + if($usepkh) { + Log3 $hash->{NAME}, 3, "DbLog $name -> INFO - ".$ins_hist." of $ceti events inserted into table history due to PK on columns $pkh"; + } else { + Log3 $hash->{NAME}, 2, "DbLog $name -> WARNING - only ".$ins_hist." of $ceti events inserted into table history"; + } + } + eval {$dbh->commit() if(!$dbh->{AutoCommit});}; # Data commit + if ($@) { + Log3($name, 2, "DbLog $name -> Error commit history - $@"); + } else { + if(!$dbh->{AutoCommit}) { + Log3($name, 4, "DbLog $name -> insert table history committed"); + } else { + Log3($name, 4, "DbLog $name -> insert table history committed by autocommit"); + } + } + }; + + if ($@) { + $errorh = $@; + Log3 $hash->{NAME}, 2, "DbLog $name -> Error table history - $errorh"; + $error = encode_base64($errorh,""); + $rowlback = $rowlist if($useta); # nicht gespeicherte Datensätze nur zurück geben wenn Transaktion ein + } + } + + if (lc($DbLogType) =~ m(current)) { + ################################################################# + # insert current mit/ohne primary key + # Array-Insert wird auch bei Bulk verwendet weil im Bulk-Mode + # die nicht upgedateten Sätze nicht identifiziert werden können + if ($usepkc && $hash->{MODEL} eq 'MYSQL') { + eval { $sth_ic = $dbh->prepare("INSERT IGNORE INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; + } elsif ($usepkc && $hash->{MODEL} eq 'SQLITE') { + eval { $sth_ic = $dbh->prepare("INSERT OR IGNORE INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; + } elsif ($usepkc && $hash->{MODEL} eq 'POSTGRESQL') { + eval { $sth_ic = $dbh->prepare("INSERT INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?) ON CONFLICT DO NOTHING"); }; + } else { + # ohne PK + eval { $sth_ic = $dbh->prepare("INSERT INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; + } + if ($@) { + $error = encode_base64($@,""); + Log3 ($name, 2, "DbLog $name - Error: $@"); + Log3 ($name, 5, "DbLog $name -> DbLog_PushAsync finished"); + $dbh->disconnect(); + return "$name|$error|0|"; + } + + if ($usepkc && $hash->{MODEL} eq 'MYSQL') { + $sth_uc = $dbh->prepare("REPLACE INTO current (TIMESTAMP, TYPE, EVENT, VALUE, UNIT, DEVICE, READING) VALUES (?,?,?,?,?,?,?)"); + } elsif ($usepkc && $hash->{MODEL} eq 'SQLITE') { + $sth_uc = $dbh->prepare("INSERT OR REPLACE INTO current (TIMESTAMP, TYPE, EVENT, VALUE, UNIT, DEVICE, READING) VALUES (?,?,?,?,?,?,?)"); + } elsif ($usepkc && $hash->{MODEL} eq 'POSTGRESQL') { + $sth_uc = $dbh->prepare("INSERT INTO current (TIMESTAMP, TYPE, EVENT, VALUE, UNIT, DEVICE, READING) VALUES (?,?,?,?,?,?,?) ON CONFLICT ($pkc) + DO UPDATE SET TIMESTAMP=EXCLUDED.TIMESTAMP, DEVICE=EXCLUDED.DEVICE, TYPE=EXCLUDED.TYPE, EVENT=EXCLUDED.EVENT, READING=EXCLUDED.READING, + VALUE=EXCLUDED.VALUE, UNIT=EXCLUDED.UNIT"); + } else { + $sth_uc = $dbh->prepare("UPDATE current SET TIMESTAMP=?, TYPE=?, EVENT=?, VALUE=?, UNIT=? WHERE (DEVICE=?) AND (READING=?)"); + } + + if($tl) { + # Tracelevel setzen + $sth_uc->{TraceLevel} = "$tl|$tf"; + $sth_ic->{TraceLevel} = "$tl|$tf"; + } + + $sth_uc->bind_param_array(1, [@timestamp]); $sth_uc->bind_param_array(2, [@type]); $sth_uc->bind_param_array(3, [@event]); $sth_uc->bind_param_array(4, [@value]); $sth_uc->bind_param_array(5, [@unit]); $sth_uc->bind_param_array(6, [@device]); $sth_uc->bind_param_array(7, [@reading]); - } - } - - if($tl) { - # Tracelevel setzen - $dbh->{TraceLevel} = "$tl|$tf"; - $sth_ih->{TraceLevel} = "$tl|$tf"; - } - - # SQL-Startzeit - my $st = [gettimeofday]; - - my ($tuples, $rows); - - # insert into history - eval { $dbh->begin_work() if($useta && $dbh->{AutoCommit}); }; # Transaktion wenn gewünscht und autocommit ein - if ($@) { - Log3($name, 2, "DbLog $name -> Error start transaction for history - $@"); - } - eval { - if (lc($DbLogType) =~ m(history) ) { - ($tuples, $rows) = $sth_ih->execute_array( { ArrayTupleStatus => \my @tuple_status } ); - my $nins_hist = 0; - my @n2hist; - for my $tuple (0..$#row_array) { - my $status = $tuple_status[$tuple]; - $status = 0 if($status eq "0E0"); - next if($status); # $status ist "1" wenn insert ok - Log3 $hash->{NAME}, 3, "DbLog $name -> Insert into history rejected".($usepkh?" (possible PK violation) ":" ")."- TS: $timestamp[$tuple], Device: $device[$tuple], Event: $event[$tuple]"; - my $nlh = ($timestamp[$tuple]."|".$device[$tuple]."|".$type[$tuple]."|".$event[$tuple]."|".$reading[$tuple]."|".$value[$tuple]."|".$unit[$tuple]); - push(@n2hist, "$nlh"); - $nins_hist++; - } - if(!$nins_hist) { - Log3 $hash->{NAME}, 4, "DbLog $name -> $ceti of $ceti events inserted into table history".($usepkh?" using PK on columns $pkh":""); - } else { - Log3 $hash->{NAME}, 4, "DbLog $name -> ".($ceti-$nins_hist)." of $ceti events inserted into table history".($usepkh?" using PK on columns $pkh":""); - s/\|/_ESC_/g for @n2hist; # escape Pipe "|" - $rowlist = join('§', @n2hist); - $rowlist = encode_base64($rowlist,""); - } - eval {$dbh->commit() if(!$dbh->{AutoCommit});}; # issue Turning on AutoCommit failed + + eval { $dbh->begin_work() if($useta && $dbh->{AutoCommit}); }; # Transaktion wenn gewünscht und autocommit ein if ($@) { - Log3($name, 2, "DbLog $name -> Error commit history - $@"); - } else { - if(!$dbh->{AutoCommit}) { - Log3($name, 4, "DbLog $name -> insert table history committed"); - } else { - Log3($name, 4, "DbLog $name -> insert table history committed by autocommit"); - } - } - } - }; - - if ($@) { - $errorh = $@; - Log3 $hash->{NAME}, 2, "DbLog $name -> Error table history - $errorh"; - $error = encode_base64($errorh,""); - $rowlback = $rowlist if($useta); # nicht gespeicherte Datensätze nur zurück geben wenn Transaktion ein - } - - # update or insert current - eval { $dbh->begin_work() if($useta && $dbh->{AutoCommit}); }; # Transaktion wenn gewünscht und autocommit ein - if ($@) { - Log3($name, 2, "DbLog $name -> Error start transaction for current - $@"); - } - eval { - if (lc($DbLogType) =~ m(current) ) { - ($tuples, $rows) = $sth_uc->execute_array( { ArrayTupleStatus => \my @tuple_status } ); - my $nupd_cur = 0; - for my $tuple (0..$#row_array) { - my $status = $tuple_status[$tuple]; - $status = 0 if($status eq "0E0"); - next if($status); # $status ist "1" wenn update ok - Log3 $hash->{NAME}, 4, "DbLog $name -> Failed to update in current, try to insert - TS: $timestamp[$tuple], Device: $device[$tuple], Reading: $reading[$tuple], Status = $status"; - push(@timestamp_cur, "$timestamp[$tuple]"); - push(@device_cur, "$device[$tuple]"); - push(@type_cur, "$type[$tuple]"); - push(@event_cur, "$event[$tuple]"); - push(@reading_cur, "$reading[$tuple]"); - push(@value_cur, "$value[$tuple]"); - push(@unit_cur, "$unit[$tuple]"); - $nupd_cur++; - } - if(!$nupd_cur) { - Log3 $hash->{NAME}, 4, "DbLog $name -> $ceti of $ceti events updated in table current".($usepkc?" using PK on columns $pkc":""); - } else { - Log3 $hash->{NAME}, 4, "DbLog $name -> $nupd_cur of $ceti events not updated and try to insert into table current".($usepkc?" using PK on columns $pkc":""); - $doins = 1; - } - - if ($doins) { - # events die nicht in Tabelle current updated wurden, werden in current neu eingefügt - $sth_ic->bind_param_array(1, [@timestamp_cur]); - $sth_ic->bind_param_array(2, [@device_cur]); - $sth_ic->bind_param_array(3, [@type_cur]); - $sth_ic->bind_param_array(4, [@event_cur]); - $sth_ic->bind_param_array(5, [@reading_cur]); - $sth_ic->bind_param_array(6, [@value_cur]); - $sth_ic->bind_param_array(7, [@unit_cur]); - - ($tuples, $rows) = $sth_ic->execute_array( { ArrayTupleStatus => \my @tuple_status } ); - my $nins_cur = 0; - for my $tuple (0..$#device_cur) { - my $status = $tuple_status[$tuple]; - $status = 0 if($status eq "0E0"); - next if($status); # $status ist "1" wenn insert ok - Log3 $hash->{NAME}, 3, "DbLog $name -> Insert into current rejected - TS: $timestamp[$tuple], Device: $device_cur[$tuple], Reading: $reading_cur[$tuple], Status = $status"; - $nins_cur++; - } - if(!$nins_cur) { - Log3 $hash->{NAME}, 4, "DbLog $name -> ".($#device_cur+1)." of ".($#device_cur+1)." events inserted into table current ".($usepkc?" using PK on columns $pkc":""); - } else { - Log3 $hash->{NAME}, 4, "DbLog $name -> ".($#device_cur+1-$nins_cur)." of ".($#device_cur+1)." events inserted into table current".($usepkc?" using PK on columns $pkc":""); - } + Log3($name, 2, "DbLog $name -> Error start transaction for current - $@"); } - eval {$dbh->commit() if(!$dbh->{AutoCommit});}; # issue Turning on AutoCommit failed - if ($@) { - Log3($name, 2, "DbLog $name -> Error commit table current - $@"); + eval { + ($tuples, $rows) = $sth_uc->execute_array( { ArrayTupleStatus => \my @tuple_status } ); + my $nupd_cur = 0; + for my $tuple (0..$#row_array) { + my $status = $tuple_status[$tuple]; + $status = 0 if($status eq "0E0"); + next if($status); # $status ist "1" wenn update ok + Log3 $hash->{NAME}, 4, "DbLog $name -> Failed to update in current, try to insert - TS: $timestamp[$tuple], Device: $device[$tuple], Reading: $reading[$tuple], Status = $status"; + push(@timestamp_cur, "$timestamp[$tuple]"); + push(@device_cur, "$device[$tuple]"); + push(@type_cur, "$type[$tuple]"); + push(@event_cur, "$event[$tuple]"); + push(@reading_cur, "$reading[$tuple]"); + push(@value_cur, "$value[$tuple]"); + push(@unit_cur, "$unit[$tuple]"); + $nupd_cur++; + } + if(!$nupd_cur) { + Log3 $hash->{NAME}, 4, "DbLog $name -> $ceti of $ceti events updated in table current".($usepkc?" using PK on columns $pkc":""); + } else { + Log3 $hash->{NAME}, 4, "DbLog $name -> $nupd_cur of $ceti events not updated and try to insert into table current".($usepkc?" using PK on columns $pkc":""); + $doins = 1; + } + + if ($doins) { + # events die nicht in Tabelle current updated wurden, werden in current neu eingefügt + $sth_ic->bind_param_array(1, [@timestamp_cur]); + $sth_ic->bind_param_array(2, [@device_cur]); + $sth_ic->bind_param_array(3, [@type_cur]); + $sth_ic->bind_param_array(4, [@event_cur]); + $sth_ic->bind_param_array(5, [@reading_cur]); + $sth_ic->bind_param_array(6, [@value_cur]); + $sth_ic->bind_param_array(7, [@unit_cur]); + + ($tuples, $rows) = $sth_ic->execute_array( { ArrayTupleStatus => \my @tuple_status } ); + my $nins_cur = 0; + for my $tuple (0..$#device_cur) { + my $status = $tuple_status[$tuple]; + $status = 0 if($status eq "0E0"); + next if($status); # $status ist "1" wenn insert ok + Log3 $hash->{NAME}, 3, "DbLog $name -> Insert into current rejected - TS: $timestamp[$tuple], Device: $device_cur[$tuple], Reading: $reading_cur[$tuple], Status = $status"; + $nins_cur++; + } + if(!$nins_cur) { + Log3 $hash->{NAME}, 4, "DbLog $name -> ".($#device_cur+1)." of ".($#device_cur+1)." events inserted into table current ".($usepkc?" using PK on columns $pkc":""); + } else { + Log3 $hash->{NAME}, 4, "DbLog $name -> ".($#device_cur+1-$nins_cur)." of ".($#device_cur+1)." events inserted into table current".($usepkc?" using PK on columns $pkc":""); + } + } + eval {$dbh->commit() if(!$dbh->{AutoCommit});}; # issue Turning on AutoCommit failed + if ($@) { + Log3($name, 2, "DbLog $name -> Error commit table current - $@"); + } else { + if(!$dbh->{AutoCommit}) { + Log3($name, 4, "DbLog $name -> insert / update table current committed"); + } else { + Log3($name, 4, "DbLog $name -> insert / update table current committed by autocommit"); + } + } + }; + } + + } else { + ####################### + # Array-Insert + ####################### + + $st = [gettimeofday]; # SQL-Startzeit + + if (lc($DbLogType) =~ m(history)) { + ######################################## + # insert history mit/ohne primary key + if ($usepkh && $hash->{MODEL} eq 'MYSQL') { + eval { $sth_ih = $dbh->prepare("INSERT IGNORE INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; + } elsif ($usepkh && $hash->{MODEL} eq 'SQLITE') { + eval { $sth_ih = $dbh->prepare("INSERT OR IGNORE INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; + } elsif ($usepkh && $hash->{MODEL} eq 'POSTGRESQL') { + eval { $sth_ih = $dbh->prepare("INSERT INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?) ON CONFLICT DO NOTHING"); }; } else { - if(!$dbh->{AutoCommit}) { - Log3($name, 4, "DbLog $name -> insert / update table current committed"); - } else { - Log3($name, 4, "DbLog $name -> insert / update table current committed by autocommit"); - } - } - } - }; + # ohne PK + eval { $sth_ih = $dbh->prepare("INSERT INTO history (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; + } + if ($@) { + # Eventliste zurückgeben wenn z.B. Disk I/O Error bei SQLITE + $error = encode_base64($@,""); + Log3 ($name, 2, "DbLog $name - Error: $@"); + Log3 ($name, 5, "DbLog $name -> DbLog_PushAsync finished"); + $dbh->disconnect(); + return "$name|$error|0|$rowlist"; + } + + if($tl) { + # Tracelevel setzen + $sth_ih->{TraceLevel} = "$tl|$tf"; + } + + $sth_ih->bind_param_array(1, [@timestamp]); + $sth_ih->bind_param_array(2, [@device]); + $sth_ih->bind_param_array(3, [@type]); + $sth_ih->bind_param_array(4, [@event]); + $sth_ih->bind_param_array(5, [@reading]); + $sth_ih->bind_param_array(6, [@value]); + $sth_ih->bind_param_array(7, [@unit]); + + eval { $dbh->begin_work() if($useta && $dbh->{AutoCommit}); }; # Transaktion wenn gewünscht und autocommit ein + if ($@) { + Log3($name, 2, "DbLog $name -> Error start transaction for history - $@"); + } + eval { + ($tuples, $rows) = $sth_ih->execute_array( { ArrayTupleStatus => \my @tuple_status } ); + my $nins_hist = 0; + my @n2hist; + for my $tuple (0..$#row_array) { + my $status = $tuple_status[$tuple]; + $status = 0 if($status eq "0E0"); + next if($status); # $status ist "1" wenn insert ok + Log3 $hash->{NAME}, 3, "DbLog $name -> Insert into history rejected".($usepkh?" (possible PK violation) ":" ")."- TS: $timestamp[$tuple], Device: $device[$tuple], Event: $event[$tuple]"; + my $nlh = ($timestamp[$tuple]."|".$device[$tuple]."|".$type[$tuple]."|".$event[$tuple]."|".$reading[$tuple]."|".$value[$tuple]."|".$unit[$tuple]); + push(@n2hist, "$nlh"); + $nins_hist++; + } + if(!$nins_hist) { + Log3 $hash->{NAME}, 4, "DbLog $name -> $ceti of $ceti events inserted into table history".($usepkh?" using PK on columns $pkh":""); + } else { + if($usepkh) { + Log3 $hash->{NAME}, 3, "DbLog $name -> INFO - ".($ceti-$nins_hist)." of $ceti events inserted into table history due to PK on columns $pkh"; + } else { + Log3 $hash->{NAME}, 2, "DbLog $name -> WARNING - only ".($ceti-$nins_hist)." of $ceti events inserted into table history"; + } + s/\|/_ESC_/g for @n2hist; # escape Pipe "|" + $rowlist = join('§', @n2hist); + $rowlist = encode_base64($rowlist,""); + } + eval {$dbh->commit() if(!$dbh->{AutoCommit});}; # Data commit + if ($@) { + Log3($name, 2, "DbLog $name -> Error commit history - $@"); + } else { + if(!$dbh->{AutoCommit}) { + Log3($name, 4, "DbLog $name -> insert table history committed"); + } else { + Log3($name, 4, "DbLog $name -> insert table history committed by autocommit"); + } + } + }; + + if ($@) { + $errorh = $@; + Log3 $hash->{NAME}, 2, "DbLog $name -> Error table history - $errorh"; + $error = encode_base64($errorh,""); + $rowlback = $rowlist if($useta); # nicht gespeicherte Datensätze nur zurück geben wenn Transaktion ein + } + } + + if (lc($DbLogType) =~ m(current)) { + ######################################## + # insert current mit/ohne primary key + if ($usepkc && $hash->{MODEL} eq 'MYSQL') { + eval { $sth_ic = $dbh->prepare("INSERT IGNORE INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; + } elsif ($usepkc && $hash->{MODEL} eq 'SQLITE') { + eval { $sth_ic = $dbh->prepare("INSERT OR IGNORE INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; + } elsif ($usepkc && $hash->{MODEL} eq 'POSTGRESQL') { + eval { $sth_ic = $dbh->prepare("INSERT INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?) ON CONFLICT DO NOTHING"); }; + } else { + # ohne PK + eval { $sth_ic = $dbh->prepare("INSERT INTO current (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT) VALUES (?,?,?,?,?,?,?)"); }; + } + if ($@) { + # Eventliste zurückgeben wenn z.B. Disk I/O error bei SQLITE + $error = encode_base64($@,""); + Log3 ($name, 2, "DbLog $name - Error: $@"); + Log3 ($name, 5, "DbLog $name -> DbLog_PushAsync finished"); + $dbh->disconnect(); + return "$name|$error|0|$rowlist"; + } + + if ($usepkc && $hash->{MODEL} eq 'MYSQL') { + $sth_uc = $dbh->prepare("REPLACE INTO current (TIMESTAMP, TYPE, EVENT, VALUE, UNIT, DEVICE, READING) VALUES (?,?,?,?,?,?,?)"); + } elsif ($usepkc && $hash->{MODEL} eq 'SQLITE') { + $sth_uc = $dbh->prepare("INSERT OR REPLACE INTO current (TIMESTAMP, TYPE, EVENT, VALUE, UNIT, DEVICE, READING) VALUES (?,?,?,?,?,?,?)"); + } elsif ($usepkc && $hash->{MODEL} eq 'POSTGRESQL') { + $sth_uc = $dbh->prepare("INSERT INTO current (TIMESTAMP, TYPE, EVENT, VALUE, UNIT, DEVICE, READING) VALUES (?,?,?,?,?,?,?) ON CONFLICT ($pkc) + DO UPDATE SET TIMESTAMP=EXCLUDED.TIMESTAMP, DEVICE=EXCLUDED.DEVICE, TYPE=EXCLUDED.TYPE, EVENT=EXCLUDED.EVENT, READING=EXCLUDED.READING, + VALUE=EXCLUDED.VALUE, UNIT=EXCLUDED.UNIT"); + } else { + $sth_uc = $dbh->prepare("UPDATE current SET TIMESTAMP=?, TYPE=?, EVENT=?, VALUE=?, UNIT=? WHERE (DEVICE=?) AND (READING=?)"); + } + + if($tl) { + # Tracelevel setzen + $sth_uc->{TraceLevel} = "$tl|$tf"; + $sth_ic->{TraceLevel} = "$tl|$tf"; + } + + $sth_uc->bind_param_array(1, [@timestamp]); + $sth_uc->bind_param_array(2, [@type]); + $sth_uc->bind_param_array(3, [@event]); + $sth_uc->bind_param_array(4, [@value]); + $sth_uc->bind_param_array(5, [@unit]); + $sth_uc->bind_param_array(6, [@device]); + $sth_uc->bind_param_array(7, [@reading]); + + eval { $dbh->begin_work() if($useta && $dbh->{AutoCommit}); }; # Transaktion wenn gewünscht und autocommit ein + if ($@) { + Log3($name, 2, "DbLog $name -> Error start transaction for current - $@"); + } + eval { + ($tuples, $rows) = $sth_uc->execute_array( { ArrayTupleStatus => \my @tuple_status } ); + my $nupd_cur = 0; + for my $tuple (0..$#row_array) { + my $status = $tuple_status[$tuple]; + $status = 0 if($status eq "0E0"); + next if($status); # $status ist "1" wenn update ok + Log3 $hash->{NAME}, 4, "DbLog $name -> Failed to update in current, try to insert - TS: $timestamp[$tuple], Device: $device[$tuple], Reading: $reading[$tuple], Status = $status"; + push(@timestamp_cur, "$timestamp[$tuple]"); + push(@device_cur, "$device[$tuple]"); + push(@type_cur, "$type[$tuple]"); + push(@event_cur, "$event[$tuple]"); + push(@reading_cur, "$reading[$tuple]"); + push(@value_cur, "$value[$tuple]"); + push(@unit_cur, "$unit[$tuple]"); + $nupd_cur++; + } + if(!$nupd_cur) { + Log3 $hash->{NAME}, 4, "DbLog $name -> $ceti of $ceti events updated in table current".($usepkc?" using PK on columns $pkc":""); + } else { + Log3 $hash->{NAME}, 4, "DbLog $name -> $nupd_cur of $ceti events not updated and try to insert into table current".($usepkc?" using PK on columns $pkc":""); + $doins = 1; + } + + if ($doins) { + # events die nicht in Tabelle current updated wurden, werden in current neu eingefügt + $sth_ic->bind_param_array(1, [@timestamp_cur]); + $sth_ic->bind_param_array(2, [@device_cur]); + $sth_ic->bind_param_array(3, [@type_cur]); + $sth_ic->bind_param_array(4, [@event_cur]); + $sth_ic->bind_param_array(5, [@reading_cur]); + $sth_ic->bind_param_array(6, [@value_cur]); + $sth_ic->bind_param_array(7, [@unit_cur]); + + ($tuples, $rows) = $sth_ic->execute_array( { ArrayTupleStatus => \my @tuple_status } ); + my $nins_cur = 0; + for my $tuple (0..$#device_cur) { + my $status = $tuple_status[$tuple]; + $status = 0 if($status eq "0E0"); + next if($status); # $status ist "1" wenn insert ok + Log3 $hash->{NAME}, 3, "DbLog $name -> Insert into current rejected - TS: $timestamp[$tuple], Device: $device_cur[$tuple], Reading: $reading_cur[$tuple], Status = $status"; + $nins_cur++; + } + if(!$nins_cur) { + Log3 $hash->{NAME}, 4, "DbLog $name -> ".($#device_cur+1)." of ".($#device_cur+1)." events inserted into table current ".($usepkc?" using PK on columns $pkc":""); + } else { + Log3 $hash->{NAME}, 4, "DbLog $name -> ".($#device_cur+1-$nins_cur)." of ".($#device_cur+1)." events inserted into table current".($usepkc?" using PK on columns $pkc":""); + } + } + eval {$dbh->commit() if(!$dbh->{AutoCommit});}; # issue Turning on AutoCommit failed + if ($@) { + Log3($name, 2, "DbLog $name -> Error commit table current - $@"); + } else { + if(!$dbh->{AutoCommit}) { + Log3($name, 4, "DbLog $name -> insert / update table current committed"); + } else { + Log3($name, 4, "DbLog $name -> insert / update table current committed by autocommit"); + } + } + }; + } + } $dbh->disconnect(); @@ -2384,14 +2780,16 @@ sub DbLog_ConnectPush($;$$) { Log3 $hash->{NAME}, 3, "DbLog $name - Creating Push-Handle to database $dbconn with user $dbuser" if(!$get); my ($useac,$useta) = DbLog_commitMode($hash); - if(!$useac) { - eval {$dbhp = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, AutoCommit => 0, mysql_enable_utf8 => $utf8 });}; - } elsif($useac == 1) { - eval {$dbhp = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, AutoCommit => 1, mysql_enable_utf8 => $utf8 });}; - } else { - # Server default - eval {$dbhp = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, mysql_enable_utf8 => $utf8 });}; - } + eval { + if(!$useac) { + $dbhp = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, AutoCommit => 0, mysql_enable_utf8 => $utf8 }); + } elsif($useac == 1) { + $dbhp = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, AutoCommit => 1, mysql_enable_utf8 => $utf8 }); + } else { + # Server default + $dbhp = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, mysql_enable_utf8 => $utf8 }); + } + }; if($@) { $err = $@; @@ -2449,14 +2847,16 @@ sub DbLog_ConnectNewDBH($) { my $dbh; my ($useac,$useta) = DbLog_commitMode($hash); - if(!$useac) { - eval {$dbh = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, AutoCommit => 0, mysql_enable_utf8 => $utf8 });}; - } elsif($useac == 1) { - eval {$dbh = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, AutoCommit => 1, mysql_enable_utf8 => $utf8 });}; - } else { - # Server default - eval {$dbh = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, mysql_enable_utf8 => $utf8 });}; - } + eval { + if(!$useac) { + $dbh = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, AutoCommit => 0, mysql_enable_utf8 => $utf8 }); + } elsif($useac == 1) { + $dbh = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, AutoCommit => 1, mysql_enable_utf8 => $utf8 }); + } else { + # Server default + $dbh = DBI->connect("dbi:$dbconn", $dbuser, $dbpassword, { PrintError => 0, RaiseError => 1, mysql_enable_utf8 => $utf8 }); + } + }; if($@) { Log3($name, 2, "DbLog $name - $@"); @@ -2551,7 +2951,7 @@ sub DbLog_Get($@) { " see the #DbLog entries in the .gplot files\n" . " is not used, only for compatibility for FileLog, please use - \n" . " is a prefix, - means stdout\n" - if(int(@a) < 5); + if(int(@a) < 5); shift @a; my $inf = lc(shift @a); my $outf = lc(shift @a); @@ -2571,7 +2971,7 @@ sub DbLog_Get($@) { if($outf eq "int") { $outf = "-"; $internal = 1; - } elsif($outf eq "array"){ + } elsif($outf eq "array") { } elsif(lc($outf) eq "webchart") { # redirect the get request to the DbLog_chartQuery function @@ -2618,25 +3018,47 @@ sub DbLog_Get($@) { $readings[$i][1] = "%" if(!$readings[$i][1] || length($readings[$i][1])==0); #falls Reading nicht gefuellt setze Joker } - + Log3 $name, 4, "DbLog $name -> ################################################################"; Log3 $name, 4, "DbLog $name -> ### new get data for SVG ###"; Log3 $name, 4, "DbLog $name -> ################################################################"; Log3($name, 4, "DbLog $name -> main PID: $hash->{PID}, secondary PID: $$"); - - $dbh = $hash->{DBHP}; - if ( !$dbh || not $dbh->ping ) { - # DB Session dead, try to reopen now ! - return "Can't connect to database." if(!DbLog_ConnectPush($hash,1)); - $dbh = $hash->{DBHP}; - } - if( $hash->{PID} != $$ ) { - #create new connection for plotfork - $dbh->disconnect(); - return "Can't connect to database." if(!DbLog_ConnectPush($hash,1)); - $dbh = $hash->{DBHP}; - } +# $dbh = $hash->{DBHP}; +# if ( !$dbh || not $dbh->ping ) { +# # DB Session dead, try to reopen now ! +# return "Can't connect to database." if(!DbLog_ConnectPush($hash,1)); +# $dbh = $hash->{DBHP}; +# } + +# if( $hash->{PID} != $$ ) { +# #create new connection for plotfork +# $dbh->disconnect(); +# return "Can't connect to database." if(!DbLog_ConnectPush($hash,1)); +# $dbh = $hash->{DBHP}; +# } + + + my $nh = ($hash->{MODEL} ne 'SQLITE')?1:0; + # $hash->{PID} != $$ -> create new connection for plotfork + if ($nh || $hash->{PID} != $$) { # 17.04.2019 Forum: https://forum.fhem.de/index.php/topic,99719.0.html + $dbh = DbLog_ConnectNewDBH($hash); + return "Can't connect to database." if(!$dbh); + } else { + $dbh = $hash->{DBHP}; + eval { + if ( !$dbh || not $dbh->ping ) { + # DB Session dead, try to reopen now ! + DbLog_ConnectPush($hash,1); + } + }; + if ($@) { + Log3($name, 1, "DbLog $name: DBLog_Push - DB Session dead! - $@"); + return $@; + } else { + $dbh = $hash->{DBHP}; + } + } #vorbereiten der DB-Abfrage, DB-Modell-abhaengig if ($hash->{MODEL} eq "POSTGRESQL") { @@ -2783,10 +3205,10 @@ sub DbLog_Get($@) { $retval .= "Timestamp: Device, Type, Event, Reading, Value, Unit\n"; $retval .= "=====================================================\n"; } - + ################################ # Select Auswertung - ################################ + ################################ while($sth->fetch()) { ############ Auswerten des 5. Parameters: Regexp ################### @@ -3054,7 +3476,9 @@ sub DbLog_Get($@) { } #cleanup (plotfork) connection - $dbh->disconnect() if( $hash->{PID} != $$ ); + # $dbh->disconnect() if( $hash->{PID} != $$ ); + + $dbh->disconnect() if($nh || $hash->{PID} != $$); if($internal) { $internal_data = \$retval; @@ -3412,13 +3836,13 @@ sub DbLog_configcheck($) { @six_rdg = DbLog_sqlget($hash,"SHOW INDEX FROM history where Key_name='Search_Idx' and Column_name='READING'"); @six_tsp = DbLog_sqlget($hash,"SHOW INDEX FROM history where Key_name='Search_Idx' and Column_name='TIMESTAMP'"); if (@six_dev && @six_rdg && @six_tsp) { - $check .= "Index 'Search_Idx' exists and contains recommended fields 'DEVICE', 'READING', 'TIMESTAMP'.
"; + $check .= "Index 'Search_Idx' exists and contains recommended fields 'DEVICE', 'TIMESTAMP', 'READING'.
"; $rec = "settings o.k."; } else { $check .= "Index 'Search_Idx' exists but doesn't contain recommended field 'DEVICE'.
" if (!@six_dev); $check .= "Index 'Search_Idx' exists but doesn't contain recommended field 'READING'.
" if (!@six_rdg); $check .= "Index 'Search_Idx' exists but doesn't contain recommended field 'TIMESTAMP'.
" if (!@six_tsp); - $rec = "The index should contain the fields 'DEVICE', 'READING', 'TIMESTAMP'. "; + $rec = "The index should contain the fields 'DEVICE', 'TIMESTAMP', 'READING'. "; $rec .= "You can change the index by executing e.g.
"; $rec .= "'ALTER TABLE `history` DROP INDEX `Search_Idx`, ADD INDEX `Search_Idx` (`DEVICE`, `READING`, `TIMESTAMP`) USING BTREE;'
"; $rec .= "Depending on your database size this command may running a long time.
"; @@ -3507,7 +3931,7 @@ sub DbLog_configcheck($) { @dix = DbLog_sqlget($hash,"SHOW INDEX FROM history where Key_name='Report_Idx'"); if (!@dix) { $check .= "At least one DbRep-device assigned to $name is used, but the recommended index 'Report_Idx' is missing.
"; - $rec = "You can create the index by executing statement 'CREATE INDEX Report_Idx ON `history` (READING, TIMESTAMP) USING BTREE;'
"; + $rec = "You can create the index by executing statement 'CREATE INDEX Report_Idx ON `history` (TIMESTAMP,READING) USING BTREE;'
"; $rec .= "Depending on your database size this command may running a long time.
"; $rec .= "Please make sure the device '$name' is operating in asynchronous mode to avoid FHEM from blocking when creating the index.
"; $rec .= "Note: If you have just created another index which covers the same fields and order as suggested (e.g. a primary key) you don't need to create the 'Report_Idx' as well !
"; @@ -3516,15 +3940,15 @@ sub DbLog_configcheck($) { @dix_tsp = DbLog_sqlget($hash,"SHOW INDEX FROM history where Key_name='Report_Idx' and Column_name='TIMESTAMP'"); if (@dix_rdg && @dix_tsp) { $check .= "At least one DbRep-device assigned to $name is used. "; - $check .= "Index 'Report_Idx' exists and contains recommended fields 'READING', 'TIMESTAMP'.
"; + $check .= "Index 'Report_Idx' exists and contains recommended fields 'TIMESTAMP', 'READING'.
"; $rec = "settings o.k."; } else { $check .= "You use at least one DbRep-device assigned to $name. "; $check .= "Index 'Report_Idx' exists but doesn't contain recommended field 'READING'.
" if (!@dix_rdg); $check .= "Index 'Report_Idx' exists but doesn't contain recommended field 'TIMESTAMP'.
" if (!@dix_tsp); - $rec = "The index should contain the fields 'READING', 'TIMESTAMP'. "; + $rec = "The index should contain the fields 'TIMESTAMP', 'READING'. "; $rec .= "You can change the index by executing e.g.
"; - $rec .= "'ALTER TABLE `history` DROP INDEX `Report_Idx`, ADD INDEX `Report_Idx` (`READING`, `TIMESTAMP`) USING BTREE'
"; + $rec .= "'ALTER TABLE `history` DROP INDEX `Report_Idx`, ADD INDEX `Report_Idx` (`TIMESTAMP`, `READING`) USING BTREE'
"; $rec .= "Depending on your database size this command may running a long time.
"; } } @@ -3533,7 +3957,7 @@ sub DbLog_configcheck($) { @dix = DbLog_sqlget($hash,"SELECT * FROM pg_indexes WHERE tablename='history' and indexname ='Report_Idx'"); if (!@dix) { $check .= "You use at least one DbRep-device assigned to $name, but the recommended index 'Report_Idx' is missing.
"; - $rec = "You can create the index by executing statement 'CREATE INDEX \"Report_Idx\" ON history USING btree (reading, \"timestamp\")'
"; + $rec = "You can create the index by executing statement 'CREATE INDEX \"Report_Idx\" ON history USING btree (\"timestamp\", reading)'
"; $rec .= "Depending on your database size this command may running a long time.
"; $rec .= "Please make sure the device '$name' is operating in asynchronous mode to avoid FHEM from blocking when creating the index.
"; $rec .= "Note: If you have just created another index which covers the same fields and order as suggested (e.g. a primary key) you don't need to create the 'Report_Idx' as well !
"; @@ -3542,14 +3966,14 @@ sub DbLog_configcheck($) { $irep_rdg = 1 if($irep =~ /reading/); $irep_tsp = 1 if($irep =~ /timestamp/); if ($irep_rdg && $irep_tsp) { - $check .= "Index 'Report_Idx' exists and contains recommended fields 'READING', 'TIMESTAMP'.
"; + $check .= "Index 'Report_Idx' exists and contains recommended fields 'TIMESTAMP', 'READING'.
"; $rec = "settings o.k."; } else { $check .= "Index 'Report_Idx' exists but doesn't contain recommended field 'READING'.
" if (!$irep_rdg); $check .= "Index 'Report_Idx' exists but doesn't contain recommended field 'TIMESTAMP'.
" if (!$irep_tsp); - $rec = "The index should contain the fields 'READING', 'TIMESTAMP'. "; + $rec = "The index should contain the fields 'TIMESTAMP', 'READING'. "; $rec .= "You can change the index by executing e.g.
"; - $rec .= "'DROP INDEX \"Report_Idx\"; CREATE INDEX \"Report_Idx\" ON history USING btree (reading, \"timestamp\")'
"; + $rec .= "'DROP INDEX \"Report_Idx\"; CREATE INDEX \"Report_Idx\" ON history USING btree (\"timestamp\", reading)'
"; $rec .= "Depending on your database size this command may running a long time.
"; } } @@ -3558,7 +3982,7 @@ sub DbLog_configcheck($) { @dix = DbLog_sqlget($hash,"SELECT name,sql FROM sqlite_master WHERE type='index' AND name='Report_Idx'"); if (!$dix[0]) { $check .= "The index 'Report_Idx' is missing.
"; - $rec = "You can create the index by executing statement 'CREATE INDEX Report_Idx ON `history` (READING, TIMESTAMP)'
"; + $rec = "You can create the index by executing statement 'CREATE INDEX Report_Idx ON `history` (TIMESTAMP,READING)'
"; $rec .= "Depending on your database size this command may running a long time.
"; $rec .= "Please make sure the device '$name' is operating in asynchronous mode to avoid FHEM from blocking when creating the index.
"; $rec .= "Note: If you have just created another index which covers the same fields and order as suggested (e.g. a primary key) you don't need to create the 'Search_Idx' as well !
"; @@ -3567,14 +3991,14 @@ sub DbLog_configcheck($) { $irep_rdg = 1 if(lc($irep) =~ /reading/); $irep_tsp = 1 if(lc($irep) =~ /timestamp/); if ($irep_rdg && $irep_tsp) { - $check .= "Index 'Report_Idx' exists and contains recommended fields 'READING', 'TIMESTAMP'.
"; + $check .= "Index 'Report_Idx' exists and contains recommended fields 'TIMESTAMP', 'READING'.
"; $rec = "settings o.k."; } else { $check .= "Index 'Report_Idx' exists but doesn't contain recommended field 'READING'.
" if (!$irep_rdg); $check .= "Index 'Report_Idx' exists but doesn't contain recommended field 'TIMESTAMP'.
" if (!$irep_tsp); - $rec = "The index should contain the fields 'READING', 'TIMESTAMP'. "; + $rec = "The index should contain the fields 'TIMESTAMP', 'READING'. "; $rec .= "You can change the index by executing e.g.
"; - $rec .= "'DROP INDEX \"Report_Idx\"; CREATE INDEX Report_Idx ON `history` (READING, TIMESTAMP)'
"; + $rec .= "'DROP INDEX \"Report_Idx\"; CREATE INDEX Report_Idx ON `history` (TIMESTAMP,READING)'
"; $rec .= "Depending on your database size this command may running a long time.
"; } } @@ -5892,6 +6316,23 @@ return;
+ +
+ + + +
+ +