Uebetrag aller geloggten Daten auf Postgres

Der Titel sagt im Prinzip alles, was es dazu zu sagen gibt…


<?php
//
//  Script by AAG 
// 
//  This script creates a PostgreSQL copy of all archived IPS variables.  
//  For performance reasons, variables are stored in 4 distinct tables depending on their type. 
//  A further table is created to host all variable definitions. Outer joins to the value tables are created as appropriate. 
// 
//  Note 1: if the script is run more than once, any tables created by previous iterations will be automatically removed 
//  and replaced with the new tables. 
// 
//  Note 2: the primary key of each table combines each variable's ID with its timestamp. The underlying assumption is 
//  that one and the same variable cannot be assigned two values at exactly the same time (even if the two values are identical). 
//  If such records are encountered, the script will log an error to "TransferSqLitePostgresLog.txt". 
//  In my experience, IPS can produce such implausible values, but they occur in <0.01% of records. 
// 


$ArchiveHandlerId = 53419; //replace number with ID of your archive handler

//--------------------------------------------
// do not change anything below here

//redirects ECHO to file
echo "output redirected to file";
{unlink('TransferSqLiteToPostgresLog.html');
$ob_file = fopen('TransferSqLiteToPostgresLog.html','w');
ob_start('ob_file_callback');
echo "Start of debug file <br>";
}

//this function creates a table containing all IPS objects
EnumerateIpsObjects();

set_time_limit (600); //it takes a long time, hence allow script to run for 10 minutes!

$dbconn = pg_connect("host=localhost 
					  port=5432 
					  dbname=IpsLogging 
					  user=IPS 
					  password=ips") 
	or die ('connection aborted: ' . pg_last_error().chr(13));


//remove old logging tables if existing
$QueryResult = pg_query($dbconn, 
				  "DROP TABLE loggingdb_ips_boolean;
				   DROP TABLE loggingdb_ips_integer;
				   DROP TABLE loggingdb_ips_float;
				   DROP TABLE loggingdb_ips_string;");
if (!$QueryResult) 
		echo "PG Error: ". pg_last_error().chr(13);
	else
		echo "tables successfully dropped! <br>";

//creates 4 tables for int, float, boolean, and string, respectively
$createTransferTables = "    
CREATE TABLE loggingdb_ips_integer (
	IPSTIMESTAMP timestamp without time zone, 
	VARID integer not null, 
	IPSLASTTIME timestamp without time zone, 
	IPSVALUE INTEGER, 
	PRIMARY KEY(IPSTIMESTAMP, VARID) 
    );
CREATE TABLE loggingdb_ips_float (
	IPSTIMESTAMP timestamp without time zone, 
	VARID integer not null, 
	IPSLASTTIME timestamp without time zone, 
	IPSVALUE NUMERIC, 
	PRIMARY KEY(IPSTIMESTAMP, VARID) 
    );
CREATE TABLE loggingdb_ips_boolean (
	IPSTIMESTAMP timestamp without time zone, 
	VARID integer not null, 
	IPSLASTTIME timestamp without time zone, 
	IPSVALUE BOOLEAN,
	PRIMARY KEY(IPSTIMESTAMP, VARID) 
	);
CREATE TABLE loggingdb_ips_string (
	IPSTIMESTAMP timestamp without time zone, 
	VARID integer not null, 
	IPSLASTTIME timestamp without time zone, 
	IPSVALUE TEXT, 
	PRIMARY KEY(IPSTIMESTAMP, VARID) 
	);";

$createIndexes = "
CREATE INDEX  idx_float1
	ON loggingdb_ips_float
	USING btree
	(ipstimestamp );
CREATE INDEX idx_float2
	ON loggingdb_ips_float
	USING btree
	(varid );

CREATE INDEX  idx_integer1
	ON loggingdb_ips_integer
	USING btree
	(ipstimestamp );
CREATE INDEX idx_integer2
	ON loggingdb_ips_integer
	USING btree
	(varid );

CREATE INDEX  idx_boolean1
	ON loggingdb_ips_boolean
	USING btree
	(ipstimestamp );
CREATE INDEX idx_boolean2
	ON loggingdb_ips_boolean
	USING btree
	(varid );

CREATE INDEX  idx_string1
	ON loggingdb_ips_string
	USING btree
	(ipstimestamp );
CREATE INDEX idx_string2
	ON loggingdb_ips_string
	USING btree
	(varid );";


 $QueryResult=pg_query ($dbconn, $createTransferTables);
 if (!$QueryResult) echo "PG Error: ". pg_last_error().chr(13);
 $QueryResult=pg_query($dbconn, $createIndexes);
 if (!$QueryResult) echo "PG Error: ". pg_last_error().chr(13);


FetchArchivedData($ArchiveHandlerId);
pg_close($dbconn);


//-----------------------------------------------------


Function FetchArchivedData($ArchiveHandlerId)
{ // fetch all archived data (script modified from khc)

$EndTime = time()+(24*60*60);   // tomorrow, to be on the safe side
$StartTime = $EndTime - 3650*24*60*60; // past 10 years
(int) $VarCounter=0;
(int) $RecordCounter =0;
global $dbconn ;


$SqlInsertString = "IPSTIMESTAMP, 
				    VARID, 
					IPSLASTTIME, 
					IPSVALUE ";

// get all logged variables
$VariablesArray = AC_GetAggregationVariables($ArchiveHandlerId, false );	
foreach($VariablesArray as $Var)
{	$VarCounter=$VarCounter+1;
	$VariableId = $Var["VariableID"];
	$Location = IPS_GetLocation($VariableId) ;
    (array) $variable = IPS_GetVariable($VariableId);
	(int) $varType = $variable['VariableValue']['ValueType']; //Variablentyp (0: Boolean, 1: Integer,  2: Float, 3: String)
	//data will be written to different tables depending on variable type
	switch 	($varType)
	{
    case 0:
        $IpsTable = "loggingdb_ips_boolean";
        break;
    case 1:
        $IpsTable = "loggingdb_ips_integer";
        break;
    case 2:
        $IpsTable = "loggingdb_ips_float";
        break;
    case 3:
        $IpsTable = "loggingdb_ips_string";
        break;
		}
// read from loggingdb database
$LoggedData = AC_GetLoggedValues($ArchiveHandlerId, $VariableId, $StartTime, $EndTime, 0);
	
if ($LoggedData==false) continue;

// transpose to postgres
$SqlErrors =0;
foreach($LoggedData as $V)
  {
  $RecordCounter =$RecordCounter +1;
  
 //setup value string
  $SqlValueString = 
   $setSQL = "INSERT INTO 
   				$IpsTable 
		  	   ($SqlInsertString) 
			  VALUES (
				to_timestamp(".$V['TimeStamp']. "), ". 
				$VariableId . ', '.
				'to_timestamp('.$V['LastTime'].') , '.
				$V['Value'].');'
				.chr(13);

   // echo $setSQL;
    $QueryResult = pg_query($dbconn, $setSQL);
if (!$QueryResult) 
	{	
	$SqlErrors = $SqlErrors +1;
	echo "PG Error on record #$RecordCounter, error #$SqlErrors: ". pg_last_error().chr(13)."<br";
	}

	//echo "Record #$RecordCounter appended <br>".chr(13); 
	}

echo "Variable #$VarCounter appended".chr(13); 	

}


//end of function FetchArchivedData
}
ob_end_flush();

//------------------------------------------
function ob_file_callback($buffer)
{ //redirects ECHO output to a log file
  global $ob_file;
  fwrite($ob_file,$buffer);
}


//------------------------------------------


function EnumerateIpsObjects()
{
$IpsObjectList = IPS_GetObjectList();
$CountIpsObjects = count($IpsObjectList);
echo $CountIpsObjects. " ips objects"."<br>";
//print_r($IpsObjectList);

$dbconn = pg_connect("host=localhost port=5432 dbname=IpsLogging user=IPS password=ips") or die ('Verbindungsaufbau fehlgeschlagen: ' . pg_last_error().chr(13));
echo $dbconn . "<br>";
if ($dbconn==0) {die ("no odbc connection");} 

//create table only if it does not yet exist, Primary key is ObjectID
$createTable = "    CREATE TABLE IF NOT EXISTS IpsObjects (
					CHILDRENIDS TEXT, 
					HASCHILDREN BOOLEAN, 
					ISPERSISTENT BOOLEAN, 
					OBJECTICON TEXT, 
					OBJECTID INTEGER UNIQUE, 
					OBJECTIDENT TEXT, 
					OBJECTINFO TEXT, 
					OBJECTISHIDDEN BOOLEAN, 
					OBJECTISREADONLY BOOLEAN, 
					OBJECTNAME TEXT, 
					OBJECTPOSITION TEXT, 
					OBJECTSUMMARY TEXT, 
					OBJECTTYPE INTEGER, 
					PARENTID INTEGER,
					ID SERIAL, 
                    PRIMARY KEY(OBJECTID)
                    );
					";
$QueryResult= pg_query($dbconn, $createTable);
if (!$QueryResult) echo "Postgres error:". pg_last_error().chr(13);


$SqlExecuteString ="";
$SqlInsertString = "
	  ChildrenIDs
	, HasChildren
	, IsPersistent
	, ObjectIcon
	, ObjectID
	, ObjectIdent
	, ObjectInfo
	, ObjectIsHidden
	, ObjectIsReadOnly
	, ObjectName
	, ObjectPosition
	, ObjectSummary
	, ObjectType
	, ParentID";

//$SqlValueString = "0,0,0,0,0,0,0,0,0,0, 0,0,0,0";

$IpsObjectCounter =0;
$SqlErrors = 0;
foreach ($IpsObjectList as $key1 => $value1) 
	{
	$IpsObjectCounter =$IpsObjectCounter+1;
    echo "<br> [$key1] => $value1   ";
	$IpsObject=IPS_GetObject($value1);
		(string)$ChildrenIDs = utf8_encode(SqlNull(implode(",",$IpsObject['ChildrenIDs'])));
		(bool)$HasChildren = SqlNull($IpsObject['HasChildren']);
		(bool)$IsPersistent= $IpsObject['IsPersistent'];
		(string)$ObjectIcon=utf8_encode(SqlNull($IpsObject['ObjectIcon']));
		(int)$ObjectID=SqlNull($IpsObject['ObjectID']);
		(string)$ObjectIdent=utf8_encode(SqlNull($IpsObject['ObjectIdent']));
		(string)$ObjectInfo=utf8_encode(SqlNull($IpsObject['ObjectInfo']));
		(bool)$ObjectIsHidden= SqlNull($IpsObject['ObjectIsHidden']);
		(bool)$ObjectIsReadOnly= SqlNull($IpsObject['ObjectIsReadOnly']);
		(string)$ObjectName=utf8_encode($IpsObject['ObjectName']);
		(string)$ObjectPosition=utf8_encode(SqlNull($IpsObject['ObjectPosition']));
		(string)$ObjectSummary=utf8_encode(SqlNull($IpsObject['ObjectSummary']));
		(int)$ObjectType=SqlNull($IpsObject['ObjectType']);
		(int)$ParentID=SqlNull($IpsObject['ParentID']);
$SqlValueString = 
  "'".$ChildrenIDs."',". 
  $HasChildren . ",". 
  $IsPersistent . ",". 
  "'" .$ObjectIcon."'" . ",". 
  $ObjectID . ",". 
  "'" .$ObjectIdent."'" . ",". 
  "'".$ObjectInfo."'". ",". 
  $ObjectIsHidden . ",". 
  $ObjectIsReadOnly . ",". 
  "'".$ObjectName. "'".",". 
  $ObjectPosition . ",". 
  "'".$ObjectSummary. "'".",". 
  $ObjectType . ",". 
  $ParentID;

$UpsertFields = 
	" SET ChildrenIDs = '$ChildrenIDs' 
	,  HASCHILDREN = $HasChildren
	, IsPersistent = $IsPersistent
	, ObjectIcon = '$ObjectIcon'
	, ObjectIdent = '$ObjectIdent'
	, ObjectInfo = '$ObjectInfo'
	, ObjectIsHidden = $ObjectIsHidden
	, ObjectIsReadOnly = $ObjectIsReadOnly
	, ObjectName = '$ObjectName'
	, ObjectPosition = '$ObjectPosition'
	, ObjectSummary = '$ObjectSummary'
	, ObjectType = $ObjectType
	, ParentID = $ParentID";

$SqlUpsertString2 = "
	  ChildrenIDs
	, HasChildren
	, IsPersistent
	, ObjectIcon
	, ObjectIdent
	, ObjectInfo
	, ObjectIsHidden
	, ObjectIsReadOnly
	, ObjectName
	, ObjectPosition
	, ObjectSummary
	, ObjectType
	, ParentID ";

$SqlUpsertString3 = 
	 "'". $ChildrenIDs."'"
	.", ". $HasChildren
	.", ". $IsPersistent
	.", ". "'". $ObjectIcon. "'"
	.", ". "'". $ObjectIdent. "'"
	.", ". "'". $ObjectInfo. "'"
	.", ". $ObjectIsHidden
	.", ". $ObjectIsReadOnly
	.", ". "'".$ObjectName. "'"
	.", ". "'". $ObjectPosition. "'"
	.", ". "'". $ObjectSummary. "'"
	.", ". $ObjectType
	.", ". $ParentID ." " ;


/* $SqlUpsertString = 
			"UPDATE IpsObjects field='C', field2='Z' WHERE OBJECTID = $ObjectID; 
			INSERT INTO IpsObjects (OBJECTID, field, field2) 
       		SELECT $ObjectID, 'C', 'Z' 
       		WHERE NOT EXISTS (SELECT 1 FROM IpsObjects WHERE OBJECTID = $ObjectID); ";
*/

$SqlUpsert = 
	"UPDATE IpsObjects $UpsertFields WHERE OBJECTID = $ObjectID; 
	INSERT INTO IpsObjects (OBJECTID, $SqlUpsertString2) 
	SELECT $ObjectID, $SqlUpsertString3  
	WHERE NOT EXISTS (SELECT 1 FROM IpsObjects WHERE OBJECTID = $ObjectID); ";



echo "record #$IpsObjectCounter processed ";
$QueryResult = pg_query($dbconn,  $SqlUpsert);
if (!$QueryResult) echo "PG Error: ". pg_last_error().chr(13);


}
$makeIpsObjectIndex = "
    DROP INDEX IF EXISTS idx_IpsObject;
	CREATE INDEX idx_IpsObject
	ON IpsObjects 
	USING btree
	(ObjectID);";

$QueryResult = pg_query($dbconn,  $makeIpsObjectIndex);
if (!$QueryResult) echo "PG Error: ". pg_last_error().chr(13);

pg_close($dbconn);
echo "<br> <br> -------------------------- <br> <br> ";
}

//----------------------------------------------


function SqlNull($SqlValue) 
	{
// insert the string "Null" into a null variable, for compatibility with Access SQL.
	if (((string)$SqlValue==""))
		{(string)$SqlValue='NULL';return ($SqlValue);}
	else
		{return ($SqlValue);}
	}	


?>

Cool! Auf wieviel Stunden man wohl die Max Excution Time für Scripts hochsetzen muß? :stuck_out_tongue:

Die üblichen 30 Sekunden reichen sicher nicht aus, aber 3 tage müssen auch nicht sein! Ich habe set_time_limit() 10 Minuten eingestellt, was bei mir (SSD, core-i5) völlig ausreichend war.

Ich spiele noch mit den Indizes herum, und werde ggf. eine updated Version des Scripts liefern.

Das Problem mit den Variablen, die zeitgleich zwei Werte aufweisen, ist sehr selten, kam aber effektiv vor bei ca. 20 von 50’000 records. Ich denke, dass kommt wahrscheinlich durch die etwas gröbere Zeitauflösung des timestamp von SqLite. Bei Postgres kann man mikrosekunden-genau aufzeichnen, da sollte so was nicht mehr passieren.

und hier kommt ein Leckerbissen! Nämlich: die vollständige Auswertung von Helligkeit, Ladezuständen, und Bewegungstrigger einer beliebigen Anzahl von Enocean-Sensoren mittels Postgres. Der Script ist so eingestellt, dass er auf meinem Server für Test- und Demonstrationszwecken zugreift (ich werde dies aber nach einem Paar Tage wieder kappen!). Wenn Ihr die ganze Geschichte bei Euch installiert, müsst ihr die contrib „Tablefunc“ auf Postgres installieren, damit die Pivot-Queries laufen. Voraussetzung ist die Postgres-Installation - das kostet 10 Minuten und dazu gibt es im IPS-Forum eine sehr anschauliche und detaillierte Anleitung.

Enjoy!


<?php
$RedirectEchoToFile = false;

//--------------------------------------------
// do not change anything below here

//redirects ECHO to file
/*
if ($RedirectEchoToFile == true)
{echo "output redirected to file";
unlink('PostgresCrosstabs.html');
$ob_file = fopen('PostgresCrosstabs.html','w');
ob_start('ob_file_callback');
echo "Start of PostgresCrosstabs file <br>";}
*/



 
/*
echo implode(",<br>", pg_fetch_all_columns($LightSensorResult,2));
echo "<strong>".implode(",<br>" , pg_fetch_all_columns($ChargeSensorResult,2))."</strong>";
echo implode(",<br>", pg_fetch_all_columns($MovementSensorResult,2));
*/
//Echo BetterTable($LightSensorResult);
//echo BetterTable($ChargeSensorResult);
//echo BetterTable($MovementSensorResult);


EnoceanCrossTab("DigitalInput 2", "boolean");
EnoceanCrossTab("Analog Value 2", "integer");


//-----------------------------------------------------

Function EnoceanCrossTab($SensorString, $TableType)
{
$dbconn = pg_connect("host=aag.sytes.net 
					  port=5432 
					  dbname=IpsLogging 
					  user=guest 
					  password=guest") 
	or die ('connection aborted: ' . pg_last_error().chr(13));


switch ($TableType)
	{
	case "boolean":
		$IpsTable = "loggingdb_ips_boolean";
		$AggregateFunctions = "COUNT(*)::integer As value ";
		$SensorColumn = 2;
	break;
	case "integer":
		$IpsTable = "loggingdb_ips_integer";
		$AggregateFunctions = "AVG(ipsvalue) As value ";
		$SensorColumn = 3;
	break;
	case "float":
		$IpsTable = "loggingdb_ips_float";
		$AggregateFunctions = "AVG(ipsvalue) As value ";
		$SensorColumn = 3;
	break;
	case "string":
		$IpsTable = "loggingdb_ips_string";
	break;
	}
// echo "type ".$IpsTable." <br>";
$SensorResult = GetSensorNames($SensorString, $TableType, "", "",0, 1);
$CategoryString = implode(", ", pg_fetch_all_columns($SensorResult,$SensorColumn));

$CrossTabMovementSensors = 
	"SELECT *
		FROM crosstab
		('SELECT 
		to_char(ipstimestamp, ''mon DD HH24h'') As row_name, 
		varid As category, " 
		.$AggregateFunctions
		." FROM $IpsTable As log 
		JOIN ipsobjects_with_parent ips 
		ON log.varid = ips.objectid
		WHERE (ips.objectname LIKE ''".$SensorString."'')
	GROUP BY to_char(ipstimestamp, ''yyyy MM DD HH24h''), row_name, objectid, category 	
	ORDER BY to_char(ipstimestamp, ''yyyy MM DD HH24h''), row_name, objectid, category', 

	'SELECT DISTINCT varid
	FROM $IpsTable As log 
	JOIN ipsobjects_with_parent ips 
	ON log.varid = ips.objectid
	WHERE (ips.objectname LIKE ''".$SensorString."'')
	ORDER BY 1;' 
	)

As CountsPerHour(row_name text,"  
.$CategoryString.")";


//echo $CrossTabMovementSensors;


$QueryResult=pg_query ($dbconn, $CrossTabMovementSensors);
$CrosstabArray = pg_fetch_all($QueryResult);
//$table = good_query_table($CrosstabArray, $QueryResult, 0);
BetterTable($QueryResult);

if (!$QueryResult) 
		echo "PG Error: ". pg_last_error().chr(13);

}



//---------------------------------------

function BetterTable($result)
{
$i = 0;
echo "<html><body><table><table border='1'>";

echo "<tr>";
echo '<td>Line #</td>';
while ($i < pg_num_fields($result))
{
	$fieldName = pg_field_name($result, $i);
	echo '<td>' . $fieldName . '</td>';
	$i = $i + 1;
}
echo '</tr>';
$i = 0;

while ($row = pg_fetch_row($result)) 
{
		if ($i%2 == 0) 
  			Echo "<tr bgcolor=\"#d0d0d0\" >";
		else 
			Echo "<tr bgcolor=\"#eeeeee\">";
	$fields = count($row);
	$y = 0;
	echo '<td>'.$i. '</td>';
	while ($y < $fields)
	{
		$c_row = current($row);
		echo '<td>'.$c_row . '</td>';
		next($row);
		$y = $y + 1;
	}
	echo '</tr>';
	$i = $i + 1;
}


echo '</table><br><br></body></html>';

}

//----------------------------------------------------

Function GetSensorNames($SensorType, $TableType, $prefix, $postfix, $QueryType, $column)
{//TableType specifies the type of variable, and redirectes to the correct table.
//prefix and postfix are adapters returned for each row
//
//retrieve full names of EnOcean variables, including parent names
//QueryColumns specifies which query columns (see below)
$dbconn = pg_connect("host=aag.sytes.net 
					  port=5432 
					  dbname=IpsLogging 
					  user=guest 
					  password=guest") 
	or die ('connection aborted: ' . pg_last_error().chr(13));


switch ($TableType)
	{
	case "boolean":
		$IpsTable = "loggingdb_ips_boolean";
	break;
	case "integer":
		$IpsTable = "loggingdb_ips_integer";
	break;
	case "float":
		$IpsTable = "loggingdb_ips_float";
	break;
	case "string":
		$IpsTable = "loggingdb_ips_string";
	break;
	}
// echo "type ".$IpsTable." <br>";


if ($prefix!="") $prefix = $prefix." || ";
if ($postfix!="")$postfix = " || ". $postfix;

$GetSensorNamesString = 
"SELECT 
DISTINCT '\"' || varid ||'\"' || ' integer ' as integername,
varid as NakedVariable, 
'\"' || ips.parentname || ' - ' || ips.objectname || ' (' || ips.objectid || ')\"' || ' integer '  AS fullname,
'\"' || ips.parentname || ' - ' || ips.objectname || ' (' || ips.objectid || ')\"' || ' numeric(5,2) '  AS fullnamenumeric,
$prefix varid $postfix as DecoratedVariable

FROM $IpsTable As log 
JOIN ipsobjects_with_parent ips 
ON log.varid = ips.objectid
WHERE (ips.objectname LIKE '$SensorType')
ORDER BY 1;
";


$QueryResult=pg_query ($dbconn, $GetSensorNamesString);
if (!$QueryResult) 
		echo "PG Error: ". pg_last_error().chr(13);
	else
		{return $QueryResult;}
}

?>

Hallo,

ich habe meine IPS DB auch gerade nach Postgresql kopiert, mit insert kommt man aber nicht weit. Nach 24h und nur 10% übertragener Daten habe ich aufgehört und habe von der IPS DB ein CSV Dump gemacht und diesen dann in Postgresql eingelesen. Meine float csv Datei hatte eine Größe von 2800 MB bei einer Übertragungsrate von 6 MB/min brauchte ich selbst so noch 8 Stunden für den Import.

Schöne Grüße
Thomas