Skip to content
  • Aktuell
  • Tags
  • 0 Ungelesen 0
  • Kategorien
  • Unreplied
  • Beliebt
  • GitHub
  • Docu
  • Hilfe
Skins
  • Light
  • Brite
  • Cerulean
  • Cosmo
  • Flatly
  • Journal
  • Litera
  • Lumen
  • Lux
  • Materia
  • Minty
  • Morph
  • Pulse
  • Sandstone
  • Simplex
  • Sketchy
  • Spacelab
  • United
  • Yeti
  • Zephyr
  • Dark
  • Cyborg
  • Darkly
  • Quartz
  • Slate
  • Solar
  • Superhero
  • Vapor

  • Standard: (Kein Skin)
  • Kein Skin
Einklappen
ioBroker Logo
  1. ioBroker Community Home
  2. Deutsch
  3. ioBroker Allgemein
  4. Frage : Migrate MySQL nach Influxdb

NEWS

  • UPDATE 31.10.: Amazon Alexa - ioBroker Skill läuft aus ?
    apollon77A
    apollon77
    48
    3
    8.1k

  • Monatsrückblick – September 2025
    BluefoxB
    Bluefox
    13
    1
    1.8k

  • Neues Video "KI im Smart Home" - ioBroker plus n8n
    BluefoxB
    Bluefox
    15
    1
    2.1k

Frage : Migrate MySQL nach Influxdb

Geplant Angeheftet Gesperrt Verschoben ioBroker Allgemein
151 Beiträge 31 Kommentatoren 25.7k Aufrufe 25 Watching
  • Älteste zuerst
  • Neuste zuerst
  • Meiste Stimmen
Antworten
  • In einem neuen Thema antworten
Anmelden zum Antworten
Dieses Thema wurde gelöscht. Nur Nutzer mit entsprechenden Rechten können es sehen.
  • G Offline
    G Offline
    gender
    schrieb am zuletzt editiert von
    #19

    Also ich habe wie beschrieben ca. 2 Millionen Datensätze in einem Rutsch importieren können. Da gab es keine Fehlermeldung... und alle Hunderttausend Datensätze immer eine Konsolenausgabe. Hat ca. 2 1/2 Minuten gedauert.
    Ob es an Umlauten liegt, kann ich dir nicht sagen. Ich hatte auf jeden Fall keine in meinen Datensätzen drin.

    Kriegst du in der Konsole keine Fehlermeldung für einzelne Datensätze? Das war bei mir jedenfalls so, sodass ich genauer gucken konnte, wenn er etwas nicht importiert hat.

    S 1 Antwort Letzte Antwort
    0
    • G gender

      Also ich habe wie beschrieben ca. 2 Millionen Datensätze in einem Rutsch importieren können. Da gab es keine Fehlermeldung... und alle Hunderttausend Datensätze immer eine Konsolenausgabe. Hat ca. 2 1/2 Minuten gedauert.
      Ob es an Umlauten liegt, kann ich dir nicht sagen. Ich hatte auf jeden Fall keine in meinen Datensätzen drin.

      Kriegst du in der Konsole keine Fehlermeldung für einzelne Datensätze? Das war bei mir jedenfalls so, sodass ich genauer gucken konnte, wenn er etwas nicht importiert hat.

      S Offline
      S Offline
      schittl
      schrieb am zuletzt editiert von schittl
      #20

      @gender In der ini von influx bzw. von der Doku dazu hatte ich nur was gelesen das man die Files splitten sollte bei 10000. Vielleicht liegt es auch an der neuen Version 1.7

      siehe hier

      „If your data file has more than 5,000 points, it may be necessary to split that file into several files in order to write your data in batches to InfluxDB. We recommend writing points in batches of 5,000 to 10,000 points. Smaller batches, and more HTTP requests, will result in sub-optimal performance. By default, the HTTP request times out after five seconds. InfluxDB will still attempt to write the points after that time out but there will be no confirmation that they were successfully written.“

      Fehlermeldung ist folgende:

      2020/01/01 17:00:44 error writing batch:  {"error":"partial write: points beyond retention policy dropped=5000"}
      

      HW: Lenovo M920q (Proxmox, ioBroker, RaspMatic & Z2M), QNAP (Docker, Influx), Arduino Mega 2560 R3 (I2C DS18B20 + LED)

      SW: CT IoBroker, VM RaspMatic(v3.79.6.20241122)

      1 Antwort Letzte Antwort
      0
      • G Offline
        G Offline
        gender
        schrieb am zuletzt editiert von
        #21

        Ja, das hatte ich in der Doku auch mal gelesen.... und mich dann einfach gefreut, dass es auch so ging (zumal das Hochladen von Millionen Daten in 10000er-Schritten dann doch ne Zumutung wäre.

        Deine Fehlermeldung sagt mir leider nichts und ist mir auch unbekannt (mal abgesehen davon, dass ich auch noch nicht verstanden habe, was es mit dieser retention policy bei influx auf sich hat, die man ja irgendwie optional angeben/einstellen kann, soweit ich das verstanden habe.

        Ich habe die Datenbank auf jeden Fall einfach von Iobroker beim erstem Sammeln von Daten erstellen lassen.

        S 1 Antwort Letzte Antwort
        0
        • G gender

          Ja, das hatte ich in der Doku auch mal gelesen.... und mich dann einfach gefreut, dass es auch so ging (zumal das Hochladen von Millionen Daten in 10000er-Schritten dann doch ne Zumutung wäre.

          Deine Fehlermeldung sagt mir leider nichts und ist mir auch unbekannt (mal abgesehen davon, dass ich auch noch nicht verstanden habe, was es mit dieser retention policy bei influx auf sich hat, die man ja irgendwie optional angeben/einstellen kann, soweit ich das verstanden habe.

          Ich habe die Datenbank auf jeden Fall einfach von Iobroker beim erstem Sammeln von Daten erstellen lassen.

          S Offline
          S Offline
          schittl
          schrieb am zuletzt editiert von
          #22

          @gender Influx DB wurde bei mir auch beim ersten Sammeln automatisch erstellt. Kann mir auch nicht vorstellen, das man es einzeln machen muss. Komisch ist ja das er immer bei 360000 aufhört. Ist bestimmt irgendeine neue Grenze. Welche Version nutzt Du bzw. in welcher Version klappte dein Import? Vielleicht kennt jemand noch einen anderen Ansatz z.B Telegraf? Wäre schon Schade, wenn ich die Daten von ca. 2 Jahren verlieren würde (bittl mehr als 8 GB)

          HW: Lenovo M920q (Proxmox, ioBroker, RaspMatic & Z2M), QNAP (Docker, Influx), Arduino Mega 2560 R3 (I2C DS18B20 + LED)

          SW: CT IoBroker, VM RaspMatic(v3.79.6.20241122)

          1 Antwort Letzte Antwort
          0
          • G Offline
            G Offline
            gender
            schrieb am zuletzt editiert von
            #23

            Ich habe Version 1.7.9, installiert im Docker.
            Habe das auch erst vor ein paar Tagen alles installiert bzw. umgestellt auf Influxdb, daher sollte die Version aktuell sein.

            Ich hatte auf jeden Fall Probleme mit den "ack"-Werten, da die in meinen MariaDB-Tabellen aus 0en oder 1en bestehen und Influx daraus dann einen Zahlenwert und keinen Boolean gemacht hat.
            Daher wird oben im Python-Skript einfach alles auf true gesetzt (und die Zeilen mit 0er-Werten hatte ich ausgeschlossen).

            S 1 Antwort Letzte Antwort
            0
            • G gender

              Ich habe Version 1.7.9, installiert im Docker.
              Habe das auch erst vor ein paar Tagen alles installiert bzw. umgestellt auf Influxdb, daher sollte die Version aktuell sein.

              Ich hatte auf jeden Fall Probleme mit den "ack"-Werten, da die in meinen MariaDB-Tabellen aus 0en oder 1en bestehen und Influx daraus dann einen Zahlenwert und keinen Boolean gemacht hat.
              Daher wird oben im Python-Skript einfach alles auf true gesetzt (und die Zeilen mit 0er-Werten hatte ich ausgeschlossen).

              S Offline
              S Offline
              schittl
              schrieb am zuletzt editiert von
              #24

              @gender Bei mir auch die neueste nur in einem Container unter Proxmox... Dann verstehe ich das ja gar nicht mehr... Trotzdem danke und vielleicht hat jemand ja noch eine andere Idee bzw. kann mal kurz aufzeigen wie er die Migration großer DB's vorgenommen hat...

              Könnte mir ja auch vorstellen ne Batchverarbeitung zu schreiben nur fehlt mir dazu erst der Ansatz/Knowhow... Idee wäre jeden Datenpunkt einzeln in eine Datei zu schreiben (geht sowas mit dem Javascript Adapter getHistory etc.?) und dann einzeln per Bash etc. in Influx einzulesen...

              HW: Lenovo M920q (Proxmox, ioBroker, RaspMatic & Z2M), QNAP (Docker, Influx), Arduino Mega 2560 R3 (I2C DS18B20 + LED)

              SW: CT IoBroker, VM RaspMatic(v3.79.6.20241122)

              1 Antwort Letzte Antwort
              0
              • S Offline
                S Offline
                schittl
                schrieb am zuletzt editiert von
                #25

                Wollte hier nur mal hinterlassen, dass ich die Daten (über 90% ca. 4 GB) migrieren konnte mit den Beispiel, was weiter oben von @gender genannt wurde. Der einzige Fehler den man nicht machen darf ist, dass Du erst die Daten migrieren musst ehe du in ioBroker den Adapter aktivierst / einrichtest. Ansonsten ging das sehr gut:

                1. Alle Daten in mehreren csv-Dateien aus mysql exportiert (so ca. 2000000 pro Datei)
                2. Per python csv_to_line.py alle Dateien konvertiert in txt (wichtig dabei # DML und # CONTEXT-DATABASE: NameDatabase ergänzen in den ersten 2 Zeilen)
                3. Per influx-Befehl die konvertierten Dateien eingelesen
                4. ioBroker-Adapter aktivieren

                HW: Lenovo M920q (Proxmox, ioBroker, RaspMatic & Z2M), QNAP (Docker, Influx), Arduino Mega 2560 R3 (I2C DS18B20 + LED)

                SW: CT IoBroker, VM RaspMatic(v3.79.6.20241122)

                1 Antwort Letzte Antwort
                0
                • U Offline
                  U Offline
                  UlliJ
                  schrieb am zuletzt editiert von
                  #26

                  Moin zusammen,

                  mein erster post in diesem Forum...
                  Zu aller erst vielen Dank für diese tolle Plattform und den vielen die mit enormem Aufwand das smarte home nach vorne treiben, tolle Ideen weiter geben, absolut geile Visualisierungen bauen usw. Ich habe bislang selten ein Forum gesehen, in dem so konstruktiv und freundlich miteinander umgegangen wird...weiter so 👍

                  Seit ca 2 Jahren betreibe ich Iobroker und habe bislang die Daten zuerst in MySQL und später dann in PostgreSQL abgelegt. Über die Zeit wurden die ganzen Abfragen und Diagramme mit Grafana nach und nach deutlich träger. Nach kurzer Testphase mit InfluxDB war die Frage geklärt😎
                  Mit dem Wechsel auf InfluxDB stellt sich die Frage für den nicht Programmierer wie die Daten von alt nach neu...

                  Die genannten Lösungen haben bei mir nicht zuverlässig funktioniert also die nächste Klappe

                  1. In einem SQL GUI (phpmyadmin, HeidiSQL oder DBeaver, etc) oder auf der SQL-Konsole die zu exportierenden Daten in einem View zur Verfügung stellen
                    Der View enthält alles was Influx benötigt
                  create view xxx as
                  SELECT datapoints.name,
                  	ts_number.ack    as "ack",
                      (ts_number.q*1.0)       as "q",
                  	sources.name as "from",
                      (ts_number.val*1.0)     as "value",
                      (ts_number.ts*1000000) as "time"
                  from ts_number 
                  left join datapoints
                  on ts_number.id=datapoints.id
                  left join sources
                  on ts_number._from=sources.id
                  where datapoints.id>=70 and datapoints.id<=73 and q=0
                  order by ts_number.ts desc;
                  

                  "xxx" Name des View anpassen
                  in der WHERE Klausel die Datenpunkte entsprechend anpassen, einen Bereich wie hier oder auch eine einzelne id
                  Für kleine Datenbestände kann die WHERE Klausel auch auf "WHERE q=0" gesetzt werden und über den View in einem Rutsch in Influx übertragen werden.
                  Beispiel:
                  59cd61b2-aab9-4cf5-8c14-f16d1e595125-grafik.png

                  1. Python installieren und ggf. die Zusätze installieren
                  • psycopg2 für PostgreSQL bzw. MySQLdb für MySQL
                  • influxdb-client und influxdb.
                  1. das Skript von Muntazir Fadhel "Easily Migrate Postgres/MySQL Records to InfluxDB" nehmen (modifizierte Variante hier)
                  ### MySQL DB info ###
                  #import MySQLdb
                  #conn = MySQLdb.connect(host="localhost",  # your host, usually localhost
                  #                     user="john",         # your username
                  #                     passwd="megajonhy",  # your password
                  #                     db="jonhydb")        # name of the data base
                  
                  
                  ### PostgreSQL DB info ###
                  import psycopg2
                  import psycopg2.extras
                  
                  #####
                  # connection data for PostgreSQL
                  conn = psycopg2.connect("dbname=xxx user=xxx password=xxx host=xxx.xxx.xxx.xxx port =5432")
                  #####
                  
                  # InfluxDB info #
                  from influxdb import InfluxDBClient
                  #
                  #####connection data for InfluxDB#####
                  influxClient = InfluxDBClient(host='xxx.xxx.xxx.xxx', port=8086, username='xxx', password='xxx', database='xxx')
                  #####
                  #influxClient.delete_database(influx_db_name)
                  #influxClient.create_database(influx_db_name)
                  
                  # dictates how columns will be mapped to key/fields in InfluxDB
                  schema = {
                      "time_column": "time", # the column that will be used as the time stamp in influx
                      "columns_to_fields" : ["ack","q", "from","value"], # columns that will map to fields 
                      # "columns_to_tags" : ["",...], # columns that will map to tags
                      "table_name_to_measurement" : "name", # table name that will be mapped to measurement
                      }
                  
                  '''
                  Generates an collection of influxdb points from the given SQL records
                  '''
                  def generate_influx_points(records):
                      influx_points = []
                      for record in records:
                          #tags = {}, 
                          fields = {}
                          #for tag_label in schema['columns_to_tags']:
                          #   tags[tag_label] = record[tag_label]
                          for field_label in schema['columns_to_fields']:
                              fields[field_label] = record[field_label]
                          influx_points.append({
                              "measurement": record[schema['table_name_to_measurement']],
                              #"tags": tags,
                              "time": record[schema['time_column']],
                              "fields": fields
                          })
                      return influx_points
                  
                  
                  
                  # query relational DB for all records
                  curr = conn.cursor('cursor', cursor_factory=psycopg2.extras.RealDictCursor)
                  # curr = conn.cursor(dictionary=True)
                  #####
                  # SQL query for PostgreSQL, syntax for MySQL differs
                  # query provide desired columns as a view on the sql server
                  
                  # request data from SQL, adjust ...from <view name>
                  curr.execute("Select * from xxx;")
                  #####
                  row_count = 0
                  # process 1000 records at a time
                  while True:
                      print("Processing row #" + str(row_count + 1))
                      selected_rows = curr.fetchmany(1000)
                      influxClient.write_points(generate_influx_points(selected_rows))
                      row_count += 1000
                      if len(selected_rows) < 1000:
                          break
                  conn.close()
                  
                  

                  Im Skript sind

                  • in Zeile 15 die Datenbankverbindung für Postgre anzupassen,
                  • in Zeile 22 die Datenbankverbindung für Influx
                  • weiter unten Zeile 65 statt xxx ist der Name des Views einzutragen der auf der Datenbank mit der Abfrage aus 1 erzeugt wurde
                    curr.execute("Select * from xxx;")
                  • der Abschnitt "schema" gibt das mapping der Spalten vor und muss nur angepasst werden wenn die Spaltenbezeichner in der Abfrage aus 1 geändert werden
                  • für die MySQL user sind die Verbindungsdaten in den Zeilen 2-6 auskommentiert und an eure Umgebung anzupassen.
                  1. In der Python Umgebung das Skript starten...alle 1000 Zeilen/Datensätze kommt eine Fortschrittsmeldung. Während die Abfrage auf der SQL Seite läuft steht da "Processing row # 1"

                  Bei umfangreichen Abfragen und je nach Umgebung kann das Ganze ein wenig dauern. Teilweise dauerten bei mir die SQL Abfragen bis zu 20min bei 20Mio Datensätzen (Nuc i3 mit 12GB für die SQL VM unter proxmox). Das Skript läuft zuverlässig und mehrere 10 Mio Datensätze von A nach B wurden migriert. Vielleicht kann es jemand brauchen und nochmals Danke für das tolle Projekt und die hilfsbereiten Menschen hier
                  VG
                  Ulli

                  Proxmox auf iNuc, VM's IOB, Raspberrymatic, lxc für InfluxDB2, Grafana, u.a. *** Homematic & Homematic IP, Shellies etc

                  arteckA JackGruberJ Thomas HerrmannT 3 Antworten Letzte Antwort
                  2
                  • U UlliJ

                    Moin zusammen,

                    mein erster post in diesem Forum...
                    Zu aller erst vielen Dank für diese tolle Plattform und den vielen die mit enormem Aufwand das smarte home nach vorne treiben, tolle Ideen weiter geben, absolut geile Visualisierungen bauen usw. Ich habe bislang selten ein Forum gesehen, in dem so konstruktiv und freundlich miteinander umgegangen wird...weiter so 👍

                    Seit ca 2 Jahren betreibe ich Iobroker und habe bislang die Daten zuerst in MySQL und später dann in PostgreSQL abgelegt. Über die Zeit wurden die ganzen Abfragen und Diagramme mit Grafana nach und nach deutlich träger. Nach kurzer Testphase mit InfluxDB war die Frage geklärt😎
                    Mit dem Wechsel auf InfluxDB stellt sich die Frage für den nicht Programmierer wie die Daten von alt nach neu...

                    Die genannten Lösungen haben bei mir nicht zuverlässig funktioniert also die nächste Klappe

                    1. In einem SQL GUI (phpmyadmin, HeidiSQL oder DBeaver, etc) oder auf der SQL-Konsole die zu exportierenden Daten in einem View zur Verfügung stellen
                      Der View enthält alles was Influx benötigt
                    create view xxx as
                    SELECT datapoints.name,
                    	ts_number.ack    as "ack",
                        (ts_number.q*1.0)       as "q",
                    	sources.name as "from",
                        (ts_number.val*1.0)     as "value",
                        (ts_number.ts*1000000) as "time"
                    from ts_number 
                    left join datapoints
                    on ts_number.id=datapoints.id
                    left join sources
                    on ts_number._from=sources.id
                    where datapoints.id>=70 and datapoints.id<=73 and q=0
                    order by ts_number.ts desc;
                    

                    "xxx" Name des View anpassen
                    in der WHERE Klausel die Datenpunkte entsprechend anpassen, einen Bereich wie hier oder auch eine einzelne id
                    Für kleine Datenbestände kann die WHERE Klausel auch auf "WHERE q=0" gesetzt werden und über den View in einem Rutsch in Influx übertragen werden.
                    Beispiel:
                    59cd61b2-aab9-4cf5-8c14-f16d1e595125-grafik.png

                    1. Python installieren und ggf. die Zusätze installieren
                    • psycopg2 für PostgreSQL bzw. MySQLdb für MySQL
                    • influxdb-client und influxdb.
                    1. das Skript von Muntazir Fadhel "Easily Migrate Postgres/MySQL Records to InfluxDB" nehmen (modifizierte Variante hier)
                    ### MySQL DB info ###
                    #import MySQLdb
                    #conn = MySQLdb.connect(host="localhost",  # your host, usually localhost
                    #                     user="john",         # your username
                    #                     passwd="megajonhy",  # your password
                    #                     db="jonhydb")        # name of the data base
                    
                    
                    ### PostgreSQL DB info ###
                    import psycopg2
                    import psycopg2.extras
                    
                    #####
                    # connection data for PostgreSQL
                    conn = psycopg2.connect("dbname=xxx user=xxx password=xxx host=xxx.xxx.xxx.xxx port =5432")
                    #####
                    
                    # InfluxDB info #
                    from influxdb import InfluxDBClient
                    #
                    #####connection data for InfluxDB#####
                    influxClient = InfluxDBClient(host='xxx.xxx.xxx.xxx', port=8086, username='xxx', password='xxx', database='xxx')
                    #####
                    #influxClient.delete_database(influx_db_name)
                    #influxClient.create_database(influx_db_name)
                    
                    # dictates how columns will be mapped to key/fields in InfluxDB
                    schema = {
                        "time_column": "time", # the column that will be used as the time stamp in influx
                        "columns_to_fields" : ["ack","q", "from","value"], # columns that will map to fields 
                        # "columns_to_tags" : ["",...], # columns that will map to tags
                        "table_name_to_measurement" : "name", # table name that will be mapped to measurement
                        }
                    
                    '''
                    Generates an collection of influxdb points from the given SQL records
                    '''
                    def generate_influx_points(records):
                        influx_points = []
                        for record in records:
                            #tags = {}, 
                            fields = {}
                            #for tag_label in schema['columns_to_tags']:
                            #   tags[tag_label] = record[tag_label]
                            for field_label in schema['columns_to_fields']:
                                fields[field_label] = record[field_label]
                            influx_points.append({
                                "measurement": record[schema['table_name_to_measurement']],
                                #"tags": tags,
                                "time": record[schema['time_column']],
                                "fields": fields
                            })
                        return influx_points
                    
                    
                    
                    # query relational DB for all records
                    curr = conn.cursor('cursor', cursor_factory=psycopg2.extras.RealDictCursor)
                    # curr = conn.cursor(dictionary=True)
                    #####
                    # SQL query for PostgreSQL, syntax for MySQL differs
                    # query provide desired columns as a view on the sql server
                    
                    # request data from SQL, adjust ...from <view name>
                    curr.execute("Select * from xxx;")
                    #####
                    row_count = 0
                    # process 1000 records at a time
                    while True:
                        print("Processing row #" + str(row_count + 1))
                        selected_rows = curr.fetchmany(1000)
                        influxClient.write_points(generate_influx_points(selected_rows))
                        row_count += 1000
                        if len(selected_rows) < 1000:
                            break
                    conn.close()
                    
                    

                    Im Skript sind

                    • in Zeile 15 die Datenbankverbindung für Postgre anzupassen,
                    • in Zeile 22 die Datenbankverbindung für Influx
                    • weiter unten Zeile 65 statt xxx ist der Name des Views einzutragen der auf der Datenbank mit der Abfrage aus 1 erzeugt wurde
                      curr.execute("Select * from xxx;")
                    • der Abschnitt "schema" gibt das mapping der Spalten vor und muss nur angepasst werden wenn die Spaltenbezeichner in der Abfrage aus 1 geändert werden
                    • für die MySQL user sind die Verbindungsdaten in den Zeilen 2-6 auskommentiert und an eure Umgebung anzupassen.
                    1. In der Python Umgebung das Skript starten...alle 1000 Zeilen/Datensätze kommt eine Fortschrittsmeldung. Während die Abfrage auf der SQL Seite läuft steht da "Processing row # 1"

                    Bei umfangreichen Abfragen und je nach Umgebung kann das Ganze ein wenig dauern. Teilweise dauerten bei mir die SQL Abfragen bis zu 20min bei 20Mio Datensätzen (Nuc i3 mit 12GB für die SQL VM unter proxmox). Das Skript läuft zuverlässig und mehrere 10 Mio Datensätze von A nach B wurden migriert. Vielleicht kann es jemand brauchen und nochmals Danke für das tolle Projekt und die hilfsbereiten Menschen hier
                    VG
                    Ulli

                    arteckA Offline
                    arteckA Offline
                    arteck
                    Developer Most Active
                    schrieb am zuletzt editiert von
                    #27

                    @UlliJ DANKÖÖÖÖÖÖÖÖÖÖÖÖÖÖÖÖ ich war zu faul zu

                    zigbee hab ich, zwave auch, nuc's genauso und HA auch

                    U 1 Antwort Letzte Antwort
                    1
                    • arteckA arteck

                      @UlliJ DANKÖÖÖÖÖÖÖÖÖÖÖÖÖÖÖÖ ich war zu faul zu

                      U Offline
                      U Offline
                      UlliJ
                      schrieb am zuletzt editiert von
                      #28

                      @arteck Gerne doch😉 habe auch mehr Probleme auf der Skriptseite, die Datenbank geht eigentlich ganz gut...bis man mit Influx anfängt🤔

                      Proxmox auf iNuc, VM's IOB, Raspberrymatic, lxc für InfluxDB2, Grafana, u.a. *** Homematic & Homematic IP, Shellies etc

                      1 Antwort Letzte Antwort
                      0
                      • U UlliJ

                        Moin zusammen,

                        mein erster post in diesem Forum...
                        Zu aller erst vielen Dank für diese tolle Plattform und den vielen die mit enormem Aufwand das smarte home nach vorne treiben, tolle Ideen weiter geben, absolut geile Visualisierungen bauen usw. Ich habe bislang selten ein Forum gesehen, in dem so konstruktiv und freundlich miteinander umgegangen wird...weiter so 👍

                        Seit ca 2 Jahren betreibe ich Iobroker und habe bislang die Daten zuerst in MySQL und später dann in PostgreSQL abgelegt. Über die Zeit wurden die ganzen Abfragen und Diagramme mit Grafana nach und nach deutlich träger. Nach kurzer Testphase mit InfluxDB war die Frage geklärt😎
                        Mit dem Wechsel auf InfluxDB stellt sich die Frage für den nicht Programmierer wie die Daten von alt nach neu...

                        Die genannten Lösungen haben bei mir nicht zuverlässig funktioniert also die nächste Klappe

                        1. In einem SQL GUI (phpmyadmin, HeidiSQL oder DBeaver, etc) oder auf der SQL-Konsole die zu exportierenden Daten in einem View zur Verfügung stellen
                          Der View enthält alles was Influx benötigt
                        create view xxx as
                        SELECT datapoints.name,
                        	ts_number.ack    as "ack",
                            (ts_number.q*1.0)       as "q",
                        	sources.name as "from",
                            (ts_number.val*1.0)     as "value",
                            (ts_number.ts*1000000) as "time"
                        from ts_number 
                        left join datapoints
                        on ts_number.id=datapoints.id
                        left join sources
                        on ts_number._from=sources.id
                        where datapoints.id>=70 and datapoints.id<=73 and q=0
                        order by ts_number.ts desc;
                        

                        "xxx" Name des View anpassen
                        in der WHERE Klausel die Datenpunkte entsprechend anpassen, einen Bereich wie hier oder auch eine einzelne id
                        Für kleine Datenbestände kann die WHERE Klausel auch auf "WHERE q=0" gesetzt werden und über den View in einem Rutsch in Influx übertragen werden.
                        Beispiel:
                        59cd61b2-aab9-4cf5-8c14-f16d1e595125-grafik.png

                        1. Python installieren und ggf. die Zusätze installieren
                        • psycopg2 für PostgreSQL bzw. MySQLdb für MySQL
                        • influxdb-client und influxdb.
                        1. das Skript von Muntazir Fadhel "Easily Migrate Postgres/MySQL Records to InfluxDB" nehmen (modifizierte Variante hier)
                        ### MySQL DB info ###
                        #import MySQLdb
                        #conn = MySQLdb.connect(host="localhost",  # your host, usually localhost
                        #                     user="john",         # your username
                        #                     passwd="megajonhy",  # your password
                        #                     db="jonhydb")        # name of the data base
                        
                        
                        ### PostgreSQL DB info ###
                        import psycopg2
                        import psycopg2.extras
                        
                        #####
                        # connection data for PostgreSQL
                        conn = psycopg2.connect("dbname=xxx user=xxx password=xxx host=xxx.xxx.xxx.xxx port =5432")
                        #####
                        
                        # InfluxDB info #
                        from influxdb import InfluxDBClient
                        #
                        #####connection data for InfluxDB#####
                        influxClient = InfluxDBClient(host='xxx.xxx.xxx.xxx', port=8086, username='xxx', password='xxx', database='xxx')
                        #####
                        #influxClient.delete_database(influx_db_name)
                        #influxClient.create_database(influx_db_name)
                        
                        # dictates how columns will be mapped to key/fields in InfluxDB
                        schema = {
                            "time_column": "time", # the column that will be used as the time stamp in influx
                            "columns_to_fields" : ["ack","q", "from","value"], # columns that will map to fields 
                            # "columns_to_tags" : ["",...], # columns that will map to tags
                            "table_name_to_measurement" : "name", # table name that will be mapped to measurement
                            }
                        
                        '''
                        Generates an collection of influxdb points from the given SQL records
                        '''
                        def generate_influx_points(records):
                            influx_points = []
                            for record in records:
                                #tags = {}, 
                                fields = {}
                                #for tag_label in schema['columns_to_tags']:
                                #   tags[tag_label] = record[tag_label]
                                for field_label in schema['columns_to_fields']:
                                    fields[field_label] = record[field_label]
                                influx_points.append({
                                    "measurement": record[schema['table_name_to_measurement']],
                                    #"tags": tags,
                                    "time": record[schema['time_column']],
                                    "fields": fields
                                })
                            return influx_points
                        
                        
                        
                        # query relational DB for all records
                        curr = conn.cursor('cursor', cursor_factory=psycopg2.extras.RealDictCursor)
                        # curr = conn.cursor(dictionary=True)
                        #####
                        # SQL query for PostgreSQL, syntax for MySQL differs
                        # query provide desired columns as a view on the sql server
                        
                        # request data from SQL, adjust ...from <view name>
                        curr.execute("Select * from xxx;")
                        #####
                        row_count = 0
                        # process 1000 records at a time
                        while True:
                            print("Processing row #" + str(row_count + 1))
                            selected_rows = curr.fetchmany(1000)
                            influxClient.write_points(generate_influx_points(selected_rows))
                            row_count += 1000
                            if len(selected_rows) < 1000:
                                break
                        conn.close()
                        
                        

                        Im Skript sind

                        • in Zeile 15 die Datenbankverbindung für Postgre anzupassen,
                        • in Zeile 22 die Datenbankverbindung für Influx
                        • weiter unten Zeile 65 statt xxx ist der Name des Views einzutragen der auf der Datenbank mit der Abfrage aus 1 erzeugt wurde
                          curr.execute("Select * from xxx;")
                        • der Abschnitt "schema" gibt das mapping der Spalten vor und muss nur angepasst werden wenn die Spaltenbezeichner in der Abfrage aus 1 geändert werden
                        • für die MySQL user sind die Verbindungsdaten in den Zeilen 2-6 auskommentiert und an eure Umgebung anzupassen.
                        1. In der Python Umgebung das Skript starten...alle 1000 Zeilen/Datensätze kommt eine Fortschrittsmeldung. Während die Abfrage auf der SQL Seite läuft steht da "Processing row # 1"

                        Bei umfangreichen Abfragen und je nach Umgebung kann das Ganze ein wenig dauern. Teilweise dauerten bei mir die SQL Abfragen bis zu 20min bei 20Mio Datensätzen (Nuc i3 mit 12GB für die SQL VM unter proxmox). Das Skript läuft zuverlässig und mehrere 10 Mio Datensätze von A nach B wurden migriert. Vielleicht kann es jemand brauchen und nochmals Danke für das tolle Projekt und die hilfsbereiten Menschen hier
                        VG
                        Ulli

                        JackGruberJ Offline
                        JackGruberJ Offline
                        JackGruber
                        schrieb am zuletzt editiert von
                        #29

                        @UlliJ danke für die Idee mit dem Script.
                        Hab das script etwas angepasst, damit alle Daten übernommen werden und diese auch auf einem leistungsschwachen Rasperry Pi ausgelesen werden können.
                        Hab so ca. 22 Millionen Datensätze in die InfluxDB gepackt.

                        GitHub iobroker_mysql_2_influxdb

                        U M umbmU 3 Antworten Letzte Antwort
                        0
                        • JackGruberJ JackGruber

                          @UlliJ danke für die Idee mit dem Script.
                          Hab das script etwas angepasst, damit alle Daten übernommen werden und diese auch auf einem leistungsschwachen Rasperry Pi ausgelesen werden können.
                          Hab so ca. 22 Millionen Datensätze in die InfluxDB gepackt.

                          GitHub iobroker_mysql_2_influxdb

                          U Offline
                          U Offline
                          UlliJ
                          schrieb am zuletzt editiert von
                          #30

                          @JackGruber danke für die coole Ergänzung, habe ich gerade erst gesehen. Das hätte meine Möglichkeiten schon wieder weit überfordert😬

                          Proxmox auf iNuc, VM's IOB, Raspberrymatic, lxc für InfluxDB2, Grafana, u.a. *** Homematic & Homematic IP, Shellies etc

                          1 Antwort Letzte Antwort
                          0
                          • JackGruberJ JackGruber

                            @UlliJ danke für die Idee mit dem Script.
                            Hab das script etwas angepasst, damit alle Daten übernommen werden und diese auch auf einem leistungsschwachen Rasperry Pi ausgelesen werden können.
                            Hab so ca. 22 Millionen Datensätze in die InfluxDB gepackt.

                            GitHub iobroker_mysql_2_influxdb

                            M Offline
                            M Offline
                            MezzoDO
                            schrieb am zuletzt editiert von
                            #31

                            Ich habe nun versucht mit dem Git-Projekt von @JackGruber meine Daten zu Influx zu migrieren.

                            Nach anfänglichen Schwierigkeiten scheint es augenscheinlich durch zu laufen.

                            Tatsächlich werden die Daten jedoch nicht migriert und ich kenne mich mit Python nicht aus, um mir z. B. Debug-Points o.ä. zu setzen...

                            Was kann ich tun, um zu sehen, wo es denn nun hakt?

                            JackGruberJ 1 Antwort Letzte Antwort
                            0
                            • M MezzoDO

                              Ich habe nun versucht mit dem Git-Projekt von @JackGruber meine Daten zu Influx zu migrieren.

                              Nach anfänglichen Schwierigkeiten scheint es augenscheinlich durch zu laufen.

                              Tatsächlich werden die Daten jedoch nicht migriert und ich kenne mich mit Python nicht aus, um mir z. B. Debug-Points o.ä. zu setzen...

                              Was kann ich tun, um zu sehen, wo es denn nun hakt?

                              JackGruberJ Offline
                              JackGruberJ Offline
                              JackGruber
                              schrieb am zuletzt editiert von
                              #32

                              @MezzoDO was ist genau das problem?
                              Gibt es ausgaben oder fehlermeldungen?

                              M 1 Antwort Letzte Antwort
                              0
                              • JackGruberJ JackGruber

                                @MezzoDO was ist genau das problem?
                                Gibt es ausgaben oder fehlermeldungen?

                                M Offline
                                M Offline
                                MezzoDO
                                schrieb am zuletzt editiert von
                                #33

                                @JackGruber
                                ich habe es mittlerweile geschafft, die Daten zu importieren.
                                Allerdings besteht nun, wie oben schon beschrieben das Problem mit den Bool-Werten.
                                Wenn ich einerseits eine leere Datenbank mit meinem Import befülle ist "ack" ein Integer.
                                Lasse ich die InfluxDB erst durch ioBroker anlegen ist es korrekterweise ein bool, aber dann bekomme ich die Fehlermeldung:

                                influxdb.exceptions.InfluxDBClientError: 400: {"error":"partial write: field type conflict: input field \"ack\" on measurement \"info.0.sysinfo.cpu.currentLoad.currentload\" is type integer, already exists as type boolean dropped=1000"}
                                

                                Ich glaube also dass ich im besten Falle den Datentypen beim Import angeben sollte, oder?
                                Nur: wie?

                                JackGruberJ 1 Antwort Letzte Antwort
                                0
                                • M MezzoDO

                                  @JackGruber
                                  ich habe es mittlerweile geschafft, die Daten zu importieren.
                                  Allerdings besteht nun, wie oben schon beschrieben das Problem mit den Bool-Werten.
                                  Wenn ich einerseits eine leere Datenbank mit meinem Import befülle ist "ack" ein Integer.
                                  Lasse ich die InfluxDB erst durch ioBroker anlegen ist es korrekterweise ein bool, aber dann bekomme ich die Fehlermeldung:

                                  influxdb.exceptions.InfluxDBClientError: 400: {"error":"partial write: field type conflict: input field \"ack\" on measurement \"info.0.sysinfo.cpu.currentLoad.currentload\" is type integer, already exists as type boolean dropped=1000"}
                                  

                                  Ich glaube also dass ich im besten Falle den Datentypen beim Import angeben sollte, oder?
                                  Nur: wie?

                                  JackGruberJ Offline
                                  JackGruberJ Offline
                                  JackGruber
                                  schrieb am zuletzt editiert von
                                  #34

                                  @MezzoDO ok, das ack feld ist bereits als boolen angelegt, kommt nun aber als int.
                                  Ihc könnte es anpassen, dass dies beim import convertiert wird.

                                  M 1 Antwort Letzte Antwort
                                  0
                                  • JackGruberJ JackGruber

                                    @MezzoDO ok, das ack feld ist bereits als boolen angelegt, kommt nun aber als int.
                                    Ihc könnte es anpassen, dass dies beim import convertiert wird.

                                    M Offline
                                    M Offline
                                    MezzoDO
                                    schrieb am zuletzt editiert von MezzoDO
                                    #35

                                    @JackGruber said in Frage : Migrate MySQL nach Influxdb:

                                    @MezzoDO ok, das ack feld ist bereits als boolen angelegt, kommt nun aber als int.
                                    Ihc könnte es anpassen, dass dies beim import convertiert wird.

                                    Das wäre grandios 🙂
                                    Mein Script, was ich einsetze:

                                    ### MySQL DB info ###
                                    #import MySQLdb
                                    import pymysql
                                    conn = pymysql.connect(host="localhost",  # your host, usually localhost
                                                         user="xxx",         # your username
                                                         passwd="xxx",  # your password
                                                         db="iobroker")        # name of the data base
                                     
                                     
                                    ### PostgreSQL DB info ###
                                    import psycopg2
                                    import psycopg2.extras
                                     
                                    #####
                                    # connection data for PostgreSQL
                                    #conn = psycopg2.connect("dbname=xxx user=xxx password=xxx host=xxx.xxx.xxx.xxx port =5432")
                                    #####
                                     
                                    # InfluxDB info #
                                    from influxdb import InfluxDBClient
                                    #
                                    #####connection data for InfluxDB#####
                                    influxClient = InfluxDBClient(host='localhost', port=8086, username='xxx', password='xxx', database='iobroker')
                                    #####
                                    #influxClient.delete_database(influx_db_name)
                                    #influxClient.create_database(influx_db_name)
                                     
                                    # dictates how columns will be mapped to key/fields in InfluxDB
                                    schema = {
                                        "time_column": "time", # the column that will be used as the time stamp in influx
                                        "columns_to_fields" : ["ack","q", "from","value"], # columns that will map to fields 
                                        # "columns_to_tags" : ["",...], # columns that will map to tags
                                        "table_name_to_measurement" : "name", # table name that will be mapped to measurement
                                        }
                                     
                                    '''
                                    Generates an collection of influxdb points from the given SQL records
                                    '''
                                    def generate_influx_points(records):
                                        influx_points = []
                                        for record in records:
                                            #tags = {}, 
                                            fields = {}
                                            #for tag_label in schema['columns_to_tags']:
                                            #   tags[tag_label] = record[tag_label]
                                            for field_label in schema['columns_to_fields']:
                                                fields[field_label] = record[field_label]
                                            influx_points.append({
                                                "measurement": record[schema['table_name_to_measurement']],
                                                #"tags": tags,
                                                "time": record[schema['time_column']],
                                                "fields": fields
                                            })
                                        return influx_points
                                     
                                     
                                     
                                    # query relational DB for all records
                                    #curr = conn.cursor('cursor', cursor_factory=psycopg2.extras.RealDictCursor)
                                    curr = conn.cursor(cursor=pymysql.cursors.DictCursor) 
                                    # curr = conn.cursor(dictionary=True)
                                    #####
                                    # SQL query for PostgreSQL, syntax for MySQL differs
                                    # query provide desired columns as a view on the sql server
                                     
                                    # request data from SQL, adjust ...from <view name>
                                    curr.execute("Select * from InfluxData;")
                                    #####
                                    row_count = 0
                                    # process 1000 records at a time
                                    while True:
                                        print("Processing row #" + str(row_count + 1))
                                        selected_rows = curr.fetchmany(1000)
                                        influxClient.write_points(generate_influx_points(selected_rows))
                                        row_count += 1000
                                        if len(selected_rows) < 1000:
                                            break
                                    conn.close()
                                     
                                    
                                    JackGruberJ 1 Antwort Letzte Antwort
                                    0
                                    • M MezzoDO

                                      @JackGruber said in Frage : Migrate MySQL nach Influxdb:

                                      @MezzoDO ok, das ack feld ist bereits als boolen angelegt, kommt nun aber als int.
                                      Ihc könnte es anpassen, dass dies beim import convertiert wird.

                                      Das wäre grandios 🙂
                                      Mein Script, was ich einsetze:

                                      ### MySQL DB info ###
                                      #import MySQLdb
                                      import pymysql
                                      conn = pymysql.connect(host="localhost",  # your host, usually localhost
                                                           user="xxx",         # your username
                                                           passwd="xxx",  # your password
                                                           db="iobroker")        # name of the data base
                                       
                                       
                                      ### PostgreSQL DB info ###
                                      import psycopg2
                                      import psycopg2.extras
                                       
                                      #####
                                      # connection data for PostgreSQL
                                      #conn = psycopg2.connect("dbname=xxx user=xxx password=xxx host=xxx.xxx.xxx.xxx port =5432")
                                      #####
                                       
                                      # InfluxDB info #
                                      from influxdb import InfluxDBClient
                                      #
                                      #####connection data for InfluxDB#####
                                      influxClient = InfluxDBClient(host='localhost', port=8086, username='xxx', password='xxx', database='iobroker')
                                      #####
                                      #influxClient.delete_database(influx_db_name)
                                      #influxClient.create_database(influx_db_name)
                                       
                                      # dictates how columns will be mapped to key/fields in InfluxDB
                                      schema = {
                                          "time_column": "time", # the column that will be used as the time stamp in influx
                                          "columns_to_fields" : ["ack","q", "from","value"], # columns that will map to fields 
                                          # "columns_to_tags" : ["",...], # columns that will map to tags
                                          "table_name_to_measurement" : "name", # table name that will be mapped to measurement
                                          }
                                       
                                      '''
                                      Generates an collection of influxdb points from the given SQL records
                                      '''
                                      def generate_influx_points(records):
                                          influx_points = []
                                          for record in records:
                                              #tags = {}, 
                                              fields = {}
                                              #for tag_label in schema['columns_to_tags']:
                                              #   tags[tag_label] = record[tag_label]
                                              for field_label in schema['columns_to_fields']:
                                                  fields[field_label] = record[field_label]
                                              influx_points.append({
                                                  "measurement": record[schema['table_name_to_measurement']],
                                                  #"tags": tags,
                                                  "time": record[schema['time_column']],
                                                  "fields": fields
                                              })
                                          return influx_points
                                       
                                       
                                       
                                      # query relational DB for all records
                                      #curr = conn.cursor('cursor', cursor_factory=psycopg2.extras.RealDictCursor)
                                      curr = conn.cursor(cursor=pymysql.cursors.DictCursor) 
                                      # curr = conn.cursor(dictionary=True)
                                      #####
                                      # SQL query for PostgreSQL, syntax for MySQL differs
                                      # query provide desired columns as a view on the sql server
                                       
                                      # request data from SQL, adjust ...from <view name>
                                      curr.execute("Select * from InfluxData;")
                                      #####
                                      row_count = 0
                                      # process 1000 records at a time
                                      while True:
                                          print("Processing row #" + str(row_count + 1))
                                          selected_rows = curr.fetchmany(1000)
                                          influxClient.write_points(generate_influx_points(selected_rows))
                                          row_count += 1000
                                          if len(selected_rows) < 1000:
                                              break
                                      conn.close()
                                       
                                      
                                      JackGruberJ Offline
                                      JackGruberJ Offline
                                      JackGruber
                                      schrieb am zuletzt editiert von
                                      #36

                                      @MezzoDO

                                      Ersetze in deinem Script Zeile 47 mit folgenden zwei zeilen. Auf das einrücken achten!:

                                                  if field_label == "ack":
                                                      record[field_label] = bool(record[field_label])
                                      
                                      simatecS 1 Antwort Letzte Antwort
                                      0
                                      • JackGruberJ JackGruber

                                        @MezzoDO

                                        Ersetze in deinem Script Zeile 47 mit folgenden zwei zeilen. Auf das einrücken achten!:

                                                    if field_label == "ack":
                                                        record[field_label] = bool(record[field_label])
                                        
                                        simatecS Offline
                                        simatecS Offline
                                        simatec
                                        Developer Most Active
                                        schrieb am zuletzt editiert von
                                        #37

                                        @JackGruber
                                        Ich komme leider auch nicht weiter ...
                                        Habe deinen Script von Github installiert und bekomme folgende Ausgabe:

                                        pi@raspberrypi:~/iobroker_mysql_2_influxdb $ python3 migrate.py all
                                        Migrate 'all' datapoint(s) ...
                                        
                                        Total metrics in ts_number: 0
                                        Total metrics in ts_bool: 0
                                        Total metrics in ts_string: 0
                                        Migrated: 0
                                        
                                        

                                        Das ganze passiert innerhalb einer Sekunde .

                                        Die Influxdb habe ich über den iobroker Adapter angelegt.

                                        • Besuche meine Github Seite
                                        • Beitrag hat geholfen oder willst du mich unterstützen
                                        • HowTo Restore ioBroker
                                        JackGruberJ 1 Antwort Letzte Antwort
                                        0
                                        • simatecS simatec

                                          @JackGruber
                                          Ich komme leider auch nicht weiter ...
                                          Habe deinen Script von Github installiert und bekomme folgende Ausgabe:

                                          pi@raspberrypi:~/iobroker_mysql_2_influxdb $ python3 migrate.py all
                                          Migrate 'all' datapoint(s) ...
                                          
                                          Total metrics in ts_number: 0
                                          Total metrics in ts_bool: 0
                                          Total metrics in ts_string: 0
                                          Migrated: 0
                                          
                                          

                                          Das ganze passiert innerhalb einer Sekunde .

                                          Die Influxdb habe ich über den iobroker Adapter angelegt.

                                          JackGruberJ Offline
                                          JackGruberJ Offline
                                          JackGruber
                                          schrieb am zuletzt editiert von
                                          #38

                                          Hi @simatec, hab einen mini Fehler im Script gefunden. Durch einen BUG musste das ALL großgeschrieben werden.
                                          Hab das Script angepasst, einfach neu herunterladen oder das ALL großschreiben.

                                          simatecS 1 Antwort Letzte Antwort
                                          0
                                          Antworten
                                          • In einem neuen Thema antworten
                                          Anmelden zum Antworten
                                          • Älteste zuerst
                                          • Neuste zuerst
                                          • Meiste Stimmen


                                          Support us

                                          ioBroker
                                          Community Adapters
                                          Donate
                                          FAQ Cloud / IOT
                                          HowTo: Node.js-Update
                                          HowTo: Backup/Restore
                                          Downloads
                                          BLOG

                                          566

                                          Online

                                          32.4k

                                          Benutzer

                                          81.4k

                                          Themen

                                          1.3m

                                          Beiträge
                                          Community
                                          Impressum | Datenschutz-Bestimmungen | Nutzungsbedingungen
                                          ioBroker Community 2014-2025
                                          logo
                                          • Anmelden

                                          • Du hast noch kein Konto? Registrieren

                                          • Anmelden oder registrieren, um zu suchen
                                          • Erster Beitrag
                                            Letzter Beitrag
                                          0
                                          • Aktuell
                                          • Tags
                                          • Ungelesen 0
                                          • Kategorien
                                          • Unreplied
                                          • Beliebt
                                          • GitHub
                                          • Docu
                                          • Hilfe