Skip to content
  • Home
  • Aktuell
  • Tags
  • 0 Ungelesen 0
  • Kategorien
  • Unreplied
  • Beliebt
  • GitHub
  • Docu
  • Hilfe
Skins
  • Light
  • Brite
  • Cerulean
  • Cosmo
  • Flatly
  • Journal
  • Litera
  • Lumen
  • Lux
  • Materia
  • Minty
  • Morph
  • Pulse
  • Sandstone
  • Simplex
  • Sketchy
  • Spacelab
  • United
  • Yeti
  • Zephyr
  • Dark
  • Cyborg
  • Darkly
  • Quartz
  • Slate
  • Solar
  • Superhero
  • Vapor

  • Standard: (Kein Skin)
  • Kein Skin
Einklappen
ioBroker Logo

Community Forum

donate donate
  1. ioBroker Community Home
  2. Deutsch
  3. ioBroker Allgemein
  4. migration Influxdb2 nach SQL

NEWS

  • Jahresrückblick 2025 – unser neuer Blogbeitrag ist online! ✨
    BluefoxB
    Bluefox
    17
    1
    3.6k

  • Neuer Blogbeitrag: Monatsrückblick - Dezember 2025 🎄
    BluefoxB
    Bluefox
    13
    1
    1.1k

  • Weihnachtsangebot 2025! 🎄
    BluefoxB
    Bluefox
    25
    1
    2.5k

migration Influxdb2 nach SQL

Geplant Angeheftet Gesperrt Verschoben ioBroker Allgemein
influxdb2sqljavascriptdatentransfer
1 Beiträge 1 Kommentatoren 101 Aufrufe
  • Älteste zuerst
  • Neuste zuerst
  • Meiste Stimmen
Antworten
  • In einem neuen Thema antworten
Anmelden zum Antworten
Dieses Thema wurde gelöscht. Nur Nutzer mit entsprechenden Rechten können es sehen.
  • U Offline
    U Offline
    UlliJ
    schrieb am zuletzt editiert von
    #1

    von ein paar Jahren habe ich mit MariaDB als Datengrab bei Iobroker angefangen und bin dann zu Influx gewechselt. Schön schnell, transfer nach außen geht so, Konstanz in der Abfragesprache, na ja... da ist SQL langlebiger
    Mit dem/den Script(en) können die Daten direkt über "sendTo" von einer InfluxDB in eine SQL DB kopiert werden. Da ich selbst gerade erst mit JS anfange gab es Unterstützung einer KI und erwartet nicht unbedingt qualifizierte Antworten auf Fragen.

    Voraussetzungen:

    • eine Influx Instanz
    • eine SQL instanz
    • ein Datenpunkt im userdata Bereich als JSON definiert
    • der name des buckets
    • namen der SQL Tabelle(n)
      wird jeweils im Kopfbereich der scripte konfiguriert

    Ablauf
    Script 1 holt sich über das DB Schema die Namen aller Datenpunkte aus influx und legt
    das als JSON in dem selbst angelegten DP ab
    die Einträge in der SQL Tabelle "datapoints" an
    Konfiguration:

    const influxInstance = "influxdb.0"; // deine Influx Instanz
    const sqlInstance    = "sql.2";      // deine PostgreSQL Instanz
    const bucket         = "iobrokerST";
    const statePath      = "0_userdata.0.Test.statesInflux"; // Ziel-State im ioBroker
    const sqlTable       = "datapoints";  // SQL-Zieltabelle
    

    /**
    * Holt alle Measurements aus InfluxDB und schreibt sie in einen ioBroker-State
    * Optional: Einträge in SQL-Tabelle "datapoints" anlegen (PostgreSQL, auto-increment)
    */
    
    const influxInstance = "influxdb.0"; // deine Influx Instanz
    const sqlInstance    = "sql.2";      // deine PostgreSQL Instanz
    const bucket         = "iobrokerST";
    const statePath      = "0_userdata.0.Test.statesInflux"; // Ziel-State im ioBroker
    const sqlTable       = "datapoints";  // SQL-Zieltabelle
    
    async function fetchMeasurements() {
       console.log("📡 Hole Measurements aus InfluxDB …");
    
       const flux = `
    import "influxdata/influxdb/schema"
    schema.measurements(bucket: "${bucket}")
    `;
    
       const result = await new Promise((resolve) => {
           sendTo(influxInstance, "query", flux, (res) => {
               if (!res || res.error) {
                   console.error("❌ Fehler beim Abrufen der Measurements:", res?.error);
                   resolve(null);
               } else {
                   resolve(res);
               }
           });
       });
    
       if (!result || !result.result || !result.result[0]) {
           console.error("❌ Keine Measurements gefunden.");
           return;
       }
    
       const measurements = result.result[0].map((m, index) => ({
           name: m._value,
           id: index + 1
       }));
    
       console.log(`✅ Gefundene Measurements: ${measurements.length}`);
    
       // 1️⃣ In ioBroker-State schreiben
       setState(statePath, JSON.stringify(measurements), true);
    
       // 2️⃣ Optional: SQL-Einträge erzeugen (auto-increment id)
       for (const m of measurements) {
           const sql = `INSERT INTO ${sqlTable} (name) VALUES ('${m.name}');`;
           sendTo(sqlInstance, "query", sql, (res) => {
               if (res && typeof res === "object" && res.error) {
                   console.error(`❌ Fehler beim SQL-Eintrag für ${m.name}:`, res.error);
               } else {
                   console.log(`✅ SQL-Eintrag für "${m.name}" erstellt.`);
               }
           });
       }
    
       console.log("🏁 Fertig: Measurements in State und SQL eingetragen.");
    }
    
    // ======================================================
    // Aufruf
    // ======================================================
    fetchMeasurements();
    
    


    anschließend weiter mit script 2
    holt die Datensätze aller measurements aus dem zuvor erzeugten JSON beginnend mit dem aktuellen Monat und dann rückwärts bis keine weiteren Daten kommen
    ermittelt die "id" des measurements in der Tabelle "datapoints" über den Namen des Wertes
    schreibt die Daten über Inserts in kleinen Portionen (10000 Werte) in die SQL DB (nur ts_number, also numerische Werte)
    die Anzahl der Datensätze und die erfolgte Migration wird im Json eingetragen
    pausieren/fortsetzen lässt sich das script mit Abschalten/Einschalten der SQL Instanz

    Konfiguration

    const influxInstance = "influxdb.0";
    const sqlInstance    = "sql.2";
    const bucket         = "iobrokerST";
    const measurementState = "0_userdata.0.Test.statesInflux"; // JSON-State mit {name, id}
    const sqlTable       = "ts_number";   // Ziel-Tabelle
    const batchSize      = 10000;
    

    /**
    * Migration Influx → PostgreSQL aus JSON-State mit Measurement-Namen + IDs
    * Features:
    * - Zählt migrierte Datensätze pro DP
    * - Gesamtzahl aller migrierten Datensätze
    * - Vor jedem Batch: prüft ob sql.2 alive, wartet ggf.
    * - SQL-ID aus datapoints holen und als sqlId ins JSON schreiben
    * - Sicherheitsprüfung: gültige SQL-ID erforderlich
    * - Boolean korrekt, Zahlen konvertiert
    * - Ungültige Zeilen übersprungen
    * - Fehlerdetails geloggt
    * - Monatsabschluss mit Measurement-Name
    * - Migration pro Measurement markiert: "migrate": true
    */
    
    const influxInstance = "influxdb.0";
    const sqlInstance    = "sql.2";
    const bucket         = "iobrokerST";
    const measurementState = "0_userdata.0.Test.statesInflux"; // JSON-State mit {name, id}
    const sqlTable       = "ts_number";   // Ziel-Tabelle
    const batchSize      = 10000;
    
    // automatisierte Umschaltung Influx/SQL Instanz nicht getetstet
    const switchAdaptersAfterMigration = false; // bei true: schaltet Adapter am Ende um Influx aus, SQL ein
    
    // 🔹 Prüft, ob SQL-Adapter alive ist
    async function waitForSqlAlive(interval = 5000) {
       while (true) {
           const state = await getStateAsync("system.adapter.sql.2.alive");
           if (state?.val === true) return; // alive → weiter
           console.log("⏳ SQL-Adapter sql.2 nicht verfügbar, warte 5 Sekunden …");
           await new Promise(resolve => setTimeout(resolve, interval));
       }
    }
    
    // -----------------------------------------------------------
    // Adapter-Umschaltung (optional)
    async function switchDpAdapter(dpName) {
       try {
           const obj = await getObjectAsync(dpName);
           if (!obj) return;
    
           if (switchAdaptersAfterMigration) {
               if (obj.native && obj.native.influx !== undefined) obj.native.influx = false;
               if (obj.native && obj.native.sql !== undefined) obj.native.sql = true;
               await setObjectAsync(dpName, obj);
               console.log(`✅ Adapter für ${dpName} umgeschaltet: Influx deaktiviert, SQL aktiviert`);
           }
       } catch (e) {
           console.error(`❌ Fehler beim Umschalten der Adapter für ${dpName}:`, e);
       }
    }
    
    async function migrateFromJsonState() {
       // 1️⃣ JSON-State einlesen
       const state = await getStateAsync(measurementState);
       if (!state || !state.val) {
           console.error("❌ State nicht gefunden oder leer:", measurementState);
           return;
       }
    
       let measurements;
       try {
           measurements = JSON.parse(state.val);
       } catch (e) {
           console.error("❌ Fehler beim Parsen des JSON:", e);
           return;
       }
    
       if (!Array.isArray(measurements) || measurements.length === 0) {
           console.error("❌ Keine Measurements im JSON gefunden.");
           return;
       }
    
       console.log(`📦 Gefundene Measurements: ${measurements.length}`);
    
       let totalMigratedAll = measurements.totalMigratedAll || 0;
    
       // 2️⃣ SQL-ID für jedes Measurement holen und ins JSON schreiben
       for (const m of measurements) {
           m.migratedCount = m.migratedCount || 0; // neu: Anzahl der migrierten Datensätze pro DP
    
           await new Promise((resolve) => {
               const sqlCheck = `SELECT id FROM datapoints WHERE name='${m.name}' LIMIT 1;`;
               sendTo(sqlInstance, "query", sqlCheck, async (res) => {
                   try {
                       const row = res?.result?.[0];
                       if (!row || !row.id) {
                           console.error(`❌ Keine ID in 'datapoints' gefunden für ${m.name}. Dieses Measurement wird übersprungen.`);
                           m.sqlId = null;
                           resolve();
                           return;
                       }
                       m.sqlId = row.id;
    
                       // SQL-ID ins JSON-State schreiben
                       try {
                           const state = await getStateAsync(measurementState);
                           if (state && state.val) {
                               const json = JSON.parse(state.val);
                               const item = json.find(x => x.name === m.name);
                               if (item) {
                                   item.sqlId = m.sqlId;
                                   item.migratedCount = m.migratedCount; // jetzt persistiert
                                   await setStateAsync(measurementState, JSON.stringify(json), true);
                                   console.log(`ℹ️ sqlId ${m.sqlId} für "${m.name}" ins JSON geschrieben.`);
                               }
                           }
                       } catch (e) {
                           console.error(`❌ Fehler beim Eintragen von sqlId für ${m.name}:`, e);
                       }
    
                       console.log(`ℹ️ Measurement ${m.name} → SQL-ID ${m.sqlId}`);
                   } catch (e) {
                       console.error("❌ Fehler beim Auslesen der SQL-ID:", e);
                   }
                   resolve();
               });
           });
       }
    
       // 3️⃣ Migration monatsweise rückwärts
       for (const m of measurements) {
           console.log(`\n📡 Starte Migration für Measurement ${m.name}`);
    
           if (!m.sqlId) {
               console.warn(`⚠️ Keine gültige SQL-ID für ${m.name}, Migration übersprungen.`);
               continue;
           }
    
           if (m.migrate) {
               console.log(`ℹ️ Measurement ${m.name} wurde bereits migriert, wird übersprungen.`);
               continue;
           }
    
           let current = new Date();
           let finished = false;
    
           while (!finished) {
               const end = new Date(current.getFullYear(), current.getMonth() + 1, 0, 23, 59, 59);
               const start = new Date(current.getFullYear(), current.getMonth(), 1, 0, 0, 0);
    
               const rangeStart = start.toISOString().replace(".000", "");
               const rangeStop  = end.toISOString().replace(".000", "");
    
               console.log(`⏳ Zeitraum: ${rangeStart} → ${rangeStop}`);
    
               // Flux-Abfrage
               const flux = `
                   from(bucket: "${bucket}")
                   |> range(start: ${rangeStart}, stop: ${rangeStop})
                   |> filter(fn: (r) =>
                       r._measurement == "${m.name}" and
                       (r._field == "value" or r._field == "ack" or r._field == "q" or r._field == "_from")
                   )
                   |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
                   |> keep(columns: ["_time", "value", "ack", "_from", "q", "ts"])
                   |> sort(columns: ["_time"])
                   `;
    
               const influxResult = await new Promise((resolve) => {
                   sendTo(influxInstance, "query", flux, (res) => resolve(res));
               });
    
               const records = influxResult?.result?.[0];
               if (!records || records.length === 0) {
                   console.log(`✅ Keine weiteren Daten für ${m.name} im Monat ${start.toLocaleDateString()}`);
                   finished = true;
                   break;
               }
    
               // Batchweise Insert in SQL
               const totalBatches = Math.ceil(records.length / batchSize);
               let inserted = 0;
    
               for (let i = 0; i < records.length; i += batchSize) {
                   await waitForSqlAlive(); // SQL alive check
    
                   const batch = records.slice(i, i + batchSize);
                   const values = [];
    
                   for (let j = 0; j < batch.length; j++) {
                       const r = batch[j];
                       try {
                           const ts = BigInt(r.ts || new Date(r._time).getTime());
                           const val = Number(r.value ?? 0);
                           const ack = r.ack === true ? true : false;
                           const from = Number(r._from ?? 0);
                           const q = Number(r.q ?? 0);
    
                           values.push(`(${m.sqlId}, ${ts}, ${val}, ${ack}, ${from}, ${q})`);
                       } catch (e) {
                           console.error(`❌ Ungültige Daten in Batch, Zeile ${j + 1} für "${m.name}":`, r, e);
                       }
                   }
    
                   if (values.length === 0) continue;
    
                   const sql = `INSERT INTO ${sqlTable} (id, ts, val, ack, _from, q) VALUES ${values.join(",")};`;
    
                   await new Promise(resolve => {
                       sendTo(sqlInstance, "query", sql, (res) => {
                           if (res?.error) {
                               console.error(`❌ Fehler beim Batch ${Math.floor(i/batchSize)+1}/${totalBatches} für "${m.name}":`, res.error);
                           } else {
                               inserted += values.length;
                               m.migratedCount += values.length;
                               totalMigratedAll += values.length;
                               console.log(`✅ Batch ${Math.floor(i/batchSize)+1}/${totalBatches} für "${m.name}" erfolgreich (${values.length} Zeilen).`);
                           }
                           resolve();
                       });
                   });
               }
    
               console.log(`📊 Monat abgeschlossen für "${m.name}": ${inserted} Datensätze in SQL übertragen.`);
    
               // Nächster Monat rückwärts
               current.setMonth(current.getMonth() - 1);
           }
    
           // 4️⃣ Nach erfolgreicher Migration: Attribut "migrate" = true setzen
           try {
               const state = await getStateAsync(measurementState);
               if (state && state.val) {
                   const json = JSON.parse(state.val);
                   const item = json.find(x => x.name === m.name);
                   if (item) {
                       item.migrate = true;
                       item.migratedCount = m.migratedCount;
                   }
                   json.totalMigratedAll = totalMigratedAll; // Gesamtanzahl aller Datensätze
                   await setStateAsync(measurementState, JSON.stringify(json), true);
                   console.log(`✅ Migration für "${m.name}" abgeschlossen – Attribut "migrate" gesetzt, migrierte Datensätze: ${m.migratedCount}`);
               }
           } catch (e) {
               console.error(`❌ Fehler beim Setzen von "migrate" für "${m.name}":`, e);
           }
    
           // 🔹 Umschalten der Adapter für den DP
           await switchDpAdapter(m.name);
       }
    
       console.log("🏁 Alle Measurements migriert.");
    }
    
    // ======================================================
    // Aufruf
    // ======================================================
    migrateFromJsonState();
    
    


    Es sind relativ viele Konsolenausgaben enthalten, die bei sehr vielen Daten das Log stark aufblasen, ggf. bei Bedarf auskommentieren. Auch ist die Flux Abfrage mit dem pivot ziemlich zeitintensiv.
    Beim Test ist das script in einer eigenen Instanz gelaufen, Ziel DB ist Postgres (ggf. muss die Syntax für den Insert und für den Select bei der id angepasst werden).
    Geschrieben wurden ca. 30T Werte /s. Stichprobenartige Prüfung einiger Datensätze passte für Anzahl und Wert, Gesamtanzahl passte auch exakt (ca. 297Mio)
    Das Json sieht am Ende etwa so aus

    {
        "name": "0_userdata.0.4000_EnergieErzeugung.PV.DCPowerOst",
        "id": 35,
        "sqlId": 3,
        "migratedCount": 8742354,
        "migrate": true
      },
    

    gut finde ich die "inline" Migration, die scheinbar sehr schnelle und simple sendTo Methode und das es jederzeit wiederholbar ist.
    Jedem der es nutzen will sei ein vorheriges Backup nahe gelegt, auch wenn das script lediglich eine Kopie der Daten erzeugt.
    Hoffe der eine andere kann es brauchen

    Proxmox auf iNuc, lxc für IoB, InfluxDB2, Grafana, u.a. *** Homematic & Homematic IP, Shellies, Zigbee etc

    1 Antwort Letzte Antwort
    0
    Antworten
    • In einem neuen Thema antworten
    Anmelden zum Antworten
    • Älteste zuerst
    • Neuste zuerst
    • Meiste Stimmen


    Support us

    ioBroker
    Community Adapters
    Donate
    FAQ Cloud / IOT
    HowTo: Node.js-Update
    HowTo: Backup/Restore
    Downloads
    BLOG

    722

    Online

    32.7k

    Benutzer

    82.4k

    Themen

    1.3m

    Beiträge
    Community
    Impressum | Datenschutz-Bestimmungen | Nutzungsbedingungen | Einwilligungseinstellungen
    ioBroker Community 2014-2025
    logo
    • Anmelden

    • Du hast noch kein Konto? Registrieren

    • Anmelden oder registrieren, um zu suchen
    • Erster Beitrag
      Letzter Beitrag
    0
    • Home
    • Aktuell
    • Tags
    • Ungelesen 0
    • Kategorien
    • Unreplied
    • Beliebt
    • GitHub
    • Docu
    • Hilfe