von ein paar Jahren habe ich mit MariaDB als Datengrab bei Iobroker angefangen und bin dann zu Influx gewechselt. Schön schnell, transfer nach außen geht so, Konstanz in der Abfragesprache, na ja... da ist SQL langlebiger
Mit dem/den Script(en) können die Daten direkt über "sendTo" von einer InfluxDB in eine SQL DB kopiert werden. Da ich selbst gerade erst mit JS anfange gab es Unterstützung einer KI und erwartet nicht unbedingt qualifizierte Antworten auf Fragen.
Voraussetzungen:
eine Influx Instanz
eine SQL instanz
ein Datenpunkt im userdata Bereich als JSON definiert
der name des buckets
namen der SQL Tabelle(n)
wird jeweils im Kopfbereich der scripte konfiguriert
Ablauf
Script 1 holt sich über das DB Schema die Namen aller Datenpunkte aus influx und legt
das als JSON in dem selbst angelegten DP ab
die Einträge in der SQL Tabelle "datapoints" an
Konfiguration:
/**
* Holt alle Measurements aus InfluxDB und schreibt sie in einen ioBroker-State
* Optional: Einträge in SQL-Tabelle "datapoints" anlegen (PostgreSQL, auto-increment)
*/
const influxInstance = "influxdb.0"; // deine Influx Instanz
const sqlInstance = "sql.2"; // deine PostgreSQL Instanz
const bucket = "iobrokerST";
const statePath = "0_userdata.0.Test.statesInflux"; // Ziel-State im ioBroker
const sqlTable = "datapoints"; // SQL-Zieltabelle
async function fetchMeasurements() {
console.log("📡 Hole Measurements aus InfluxDB …");
const flux = `
import "influxdata/influxdb/schema"
schema.measurements(bucket: "${bucket}")
`;
const result = await new Promise((resolve) => {
sendTo(influxInstance, "query", flux, (res) => {
if (!res || res.error) {
console.error("❌ Fehler beim Abrufen der Measurements:", res?.error);
resolve(null);
} else {
resolve(res);
}
});
});
if (!result || !result.result || !result.result[0]) {
console.error("❌ Keine Measurements gefunden.");
return;
}
const measurements = result.result[0].map((m, index) => ({
name: m._value,
id: index + 1
}));
console.log(`✅ Gefundene Measurements: ${measurements.length}`);
// 1️⃣ In ioBroker-State schreiben
setState(statePath, JSON.stringify(measurements), true);
// 2️⃣ Optional: SQL-Einträge erzeugen (auto-increment id)
for (const m of measurements) {
const sql = `INSERT INTO ${sqlTable} (name) VALUES ('${m.name}');`;
sendTo(sqlInstance, "query", sql, (res) => {
if (res && typeof res === "object" && res.error) {
console.error(`❌ Fehler beim SQL-Eintrag für ${m.name}:`, res.error);
} else {
console.log(`✅ SQL-Eintrag für "${m.name}" erstellt.`);
}
});
}
console.log("🏁 Fertig: Measurements in State und SQL eingetragen.");
}
// ======================================================
// Aufruf
// ======================================================
fetchMeasurements();
anschließend weiter mit script 2
holt die Datensätze aller measurements aus dem zuvor erzeugten JSON beginnend mit dem aktuellen Monat und dann rückwärts bis keine weiteren Daten kommen
ermittelt die "id" des measurements in der Tabelle "datapoints" über den Namen des Wertes
schreibt die Daten über Inserts in kleinen Portionen (10000 Werte) in die SQL DB (nur ts_number, also numerische Werte)
die Anzahl der Datensätze und die erfolgte Migration wird im Json eingetragen
pausieren/fortsetzen lässt sich das script mit Abschalten/Einschalten der SQL Instanz
/**
* Migration Influx → PostgreSQL aus JSON-State mit Measurement-Namen + IDs
* Features:
* - Zählt migrierte Datensätze pro DP
* - Gesamtzahl aller migrierten Datensätze
* - Vor jedem Batch: prüft ob sql.2 alive, wartet ggf.
* - SQL-ID aus datapoints holen und als sqlId ins JSON schreiben
* - Sicherheitsprüfung: gültige SQL-ID erforderlich
* - Boolean korrekt, Zahlen konvertiert
* - Ungültige Zeilen übersprungen
* - Fehlerdetails geloggt
* - Monatsabschluss mit Measurement-Name
* - Migration pro Measurement markiert: "migrate": true
*/
const influxInstance = "influxdb.0";
const sqlInstance = "sql.2";
const bucket = "iobrokerST";
const measurementState = "0_userdata.0.Test.statesInflux"; // JSON-State mit {name, id}
const sqlTable = "ts_number"; // Ziel-Tabelle
const batchSize = 10000;
// automatisierte Umschaltung Influx/SQL Instanz nicht getetstet
const switchAdaptersAfterMigration = false; // bei true: schaltet Adapter am Ende um Influx aus, SQL ein
// 🔹 Prüft, ob SQL-Adapter alive ist
async function waitForSqlAlive(interval = 5000) {
while (true) {
const state = await getStateAsync("system.adapter.sql.2.alive");
if (state?.val === true) return; // alive → weiter
console.log("⏳ SQL-Adapter sql.2 nicht verfügbar, warte 5 Sekunden …");
await new Promise(resolve => setTimeout(resolve, interval));
}
}
// -----------------------------------------------------------
// Adapter-Umschaltung (optional)
async function switchDpAdapter(dpName) {
try {
const obj = await getObjectAsync(dpName);
if (!obj) return;
if (switchAdaptersAfterMigration) {
if (obj.native && obj.native.influx !== undefined) obj.native.influx = false;
if (obj.native && obj.native.sql !== undefined) obj.native.sql = true;
await setObjectAsync(dpName, obj);
console.log(`✅ Adapter für ${dpName} umgeschaltet: Influx deaktiviert, SQL aktiviert`);
}
} catch (e) {
console.error(`❌ Fehler beim Umschalten der Adapter für ${dpName}:`, e);
}
}
async function migrateFromJsonState() {
// 1️⃣ JSON-State einlesen
const state = await getStateAsync(measurementState);
if (!state || !state.val) {
console.error("❌ State nicht gefunden oder leer:", measurementState);
return;
}
let measurements;
try {
measurements = JSON.parse(state.val);
} catch (e) {
console.error("❌ Fehler beim Parsen des JSON:", e);
return;
}
if (!Array.isArray(measurements) || measurements.length === 0) {
console.error("❌ Keine Measurements im JSON gefunden.");
return;
}
console.log(`📦 Gefundene Measurements: ${measurements.length}`);
let totalMigratedAll = measurements.totalMigratedAll || 0;
// 2️⃣ SQL-ID für jedes Measurement holen und ins JSON schreiben
for (const m of measurements) {
m.migratedCount = m.migratedCount || 0; // neu: Anzahl der migrierten Datensätze pro DP
await new Promise((resolve) => {
const sqlCheck = `SELECT id FROM datapoints WHERE name='${m.name}' LIMIT 1;`;
sendTo(sqlInstance, "query", sqlCheck, async (res) => {
try {
const row = res?.result?.[0];
if (!row || !row.id) {
console.error(`❌ Keine ID in 'datapoints' gefunden für ${m.name}. Dieses Measurement wird übersprungen.`);
m.sqlId = null;
resolve();
return;
}
m.sqlId = row.id;
// SQL-ID ins JSON-State schreiben
try {
const state = await getStateAsync(measurementState);
if (state && state.val) {
const json = JSON.parse(state.val);
const item = json.find(x => x.name === m.name);
if (item) {
item.sqlId = m.sqlId;
item.migratedCount = m.migratedCount; // jetzt persistiert
await setStateAsync(measurementState, JSON.stringify(json), true);
console.log(`ℹ️ sqlId ${m.sqlId} für "${m.name}" ins JSON geschrieben.`);
}
}
} catch (e) {
console.error(`❌ Fehler beim Eintragen von sqlId für ${m.name}:`, e);
}
console.log(`ℹ️ Measurement ${m.name} → SQL-ID ${m.sqlId}`);
} catch (e) {
console.error("❌ Fehler beim Auslesen der SQL-ID:", e);
}
resolve();
});
});
}
// 3️⃣ Migration monatsweise rückwärts
for (const m of measurements) {
console.log(`\n📡 Starte Migration für Measurement ${m.name}`);
if (!m.sqlId) {
console.warn(`⚠️ Keine gültige SQL-ID für ${m.name}, Migration übersprungen.`);
continue;
}
if (m.migrate) {
console.log(`ℹ️ Measurement ${m.name} wurde bereits migriert, wird übersprungen.`);
continue;
}
let current = new Date();
let finished = false;
while (!finished) {
const end = new Date(current.getFullYear(), current.getMonth() + 1, 0, 23, 59, 59);
const start = new Date(current.getFullYear(), current.getMonth(), 1, 0, 0, 0);
const rangeStart = start.toISOString().replace(".000", "");
const rangeStop = end.toISOString().replace(".000", "");
console.log(`⏳ Zeitraum: ${rangeStart} → ${rangeStop}`);
// Flux-Abfrage
const flux = `
from(bucket: "${bucket}")
|> range(start: ${rangeStart}, stop: ${rangeStop})
|> filter(fn: (r) =>
r._measurement == "${m.name}" and
(r._field == "value" or r._field == "ack" or r._field == "q" or r._field == "_from")
)
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> keep(columns: ["_time", "value", "ack", "_from", "q", "ts"])
|> sort(columns: ["_time"])
`;
const influxResult = await new Promise((resolve) => {
sendTo(influxInstance, "query", flux, (res) => resolve(res));
});
const records = influxResult?.result?.[0];
if (!records || records.length === 0) {
console.log(`✅ Keine weiteren Daten für ${m.name} im Monat ${start.toLocaleDateString()}`);
finished = true;
break;
}
// Batchweise Insert in SQL
const totalBatches = Math.ceil(records.length / batchSize);
let inserted = 0;
for (let i = 0; i < records.length; i += batchSize) {
await waitForSqlAlive(); // SQL alive check
const batch = records.slice(i, i + batchSize);
const values = [];
for (let j = 0; j < batch.length; j++) {
const r = batch[j];
try {
const ts = BigInt(r.ts || new Date(r._time).getTime());
const val = Number(r.value ?? 0);
const ack = r.ack === true ? true : false;
const from = Number(r._from ?? 0);
const q = Number(r.q ?? 0);
values.push(`(${m.sqlId}, ${ts}, ${val}, ${ack}, ${from}, ${q})`);
} catch (e) {
console.error(`❌ Ungültige Daten in Batch, Zeile ${j + 1} für "${m.name}":`, r, e);
}
}
if (values.length === 0) continue;
const sql = `INSERT INTO ${sqlTable} (id, ts, val, ack, _from, q) VALUES ${values.join(",")};`;
await new Promise(resolve => {
sendTo(sqlInstance, "query", sql, (res) => {
if (res?.error) {
console.error(`❌ Fehler beim Batch ${Math.floor(i/batchSize)+1}/${totalBatches} für "${m.name}":`, res.error);
} else {
inserted += values.length;
m.migratedCount += values.length;
totalMigratedAll += values.length;
console.log(`✅ Batch ${Math.floor(i/batchSize)+1}/${totalBatches} für "${m.name}" erfolgreich (${values.length} Zeilen).`);
}
resolve();
});
});
}
console.log(`📊 Monat abgeschlossen für "${m.name}": ${inserted} Datensätze in SQL übertragen.`);
// Nächster Monat rückwärts
current.setMonth(current.getMonth() - 1);
}
// 4️⃣ Nach erfolgreicher Migration: Attribut "migrate" = true setzen
try {
const state = await getStateAsync(measurementState);
if (state && state.val) {
const json = JSON.parse(state.val);
const item = json.find(x => x.name === m.name);
if (item) {
item.migrate = true;
item.migratedCount = m.migratedCount;
}
json.totalMigratedAll = totalMigratedAll; // Gesamtanzahl aller Datensätze
await setStateAsync(measurementState, JSON.stringify(json), true);
console.log(`✅ Migration für "${m.name}" abgeschlossen – Attribut "migrate" gesetzt, migrierte Datensätze: ${m.migratedCount}`);
}
} catch (e) {
console.error(`❌ Fehler beim Setzen von "migrate" für "${m.name}":`, e);
}
// 🔹 Umschalten der Adapter für den DP
await switchDpAdapter(m.name);
}
console.log("🏁 Alle Measurements migriert.");
}
// ======================================================
// Aufruf
// ======================================================
migrateFromJsonState();
Es sind relativ viele Konsolenausgaben enthalten, die bei sehr vielen Daten das Log stark aufblasen, ggf. bei Bedarf auskommentieren. Auch ist die Flux Abfrage mit dem pivot ziemlich zeitintensiv.
Beim Test ist das script in einer eigenen Instanz gelaufen, Ziel DB ist Postgres (ggf. muss die Syntax für den Insert und für den Select bei der id angepasst werden).
Geschrieben wurden ca. 30T Werte /s. Stichprobenartige Prüfung einiger Datensätze passte für Anzahl und Wert, Gesamtanzahl passte auch exakt (ca. 297Mio)
Das Json sieht am Ende etwa so aus
gut finde ich die "inline" Migration, die scheinbar sehr schnelle und simple sendTo Methode und das es jederzeit wiederholbar ist.
Jedem der es nutzen will sei ein vorheriges Backup nahe gelegt, auch wenn das script lediglich eine Kopie der Daten erzeugt.
Hoffe der eine andere kann es brauchen