NEWS
Frage : Migrate MySQL nach Influxdb
-
@UlliJ danke für die Idee mit dem Script.
Hab das script etwas angepasst, damit alle Daten übernommen werden und diese auch auf einem leistungsschwachen Rasperry Pi ausgelesen werden können.
Hab so ca. 22 Millionen Datensätze in die InfluxDB gepackt.GitHub iobroker_mysql_2_influxdb
-
@JackGruber danke für die coole Ergänzung, habe ich gerade erst gesehen. Das hätte meine Möglichkeiten schon wieder weit überfordert
-
Ich habe nun versucht mit dem Git-Projekt von @JackGruber meine Daten zu Influx zu migrieren.
Nach anfänglichen Schwierigkeiten scheint es augenscheinlich durch zu laufen.
Tatsächlich werden die Daten jedoch nicht migriert und ich kenne mich mit Python nicht aus, um mir z. B. Debug-Points o.ä. zu setzen...
Was kann ich tun, um zu sehen, wo es denn nun hakt?
-
@MezzoDO was ist genau das problem?
Gibt es ausgaben oder fehlermeldungen? -
@JackGruber
ich habe es mittlerweile geschafft, die Daten zu importieren.
Allerdings besteht nun, wie oben schon beschrieben das Problem mit den Bool-Werten.
Wenn ich einerseits eine leere Datenbank mit meinem Import befülle ist "ack" ein Integer.
Lasse ich die InfluxDB erst durch ioBroker anlegen ist es korrekterweise ein bool, aber dann bekomme ich die Fehlermeldung:influxdb.exceptions.InfluxDBClientError: 400: {"error":"partial write: field type conflict: input field \"ack\" on measurement \"info.0.sysinfo.cpu.currentLoad.currentload\" is type integer, already exists as type boolean dropped=1000"}
Ich glaube also dass ich im besten Falle den Datentypen beim Import angeben sollte, oder?
Nur: wie? -
@MezzoDO ok, das ack feld ist bereits als boolen angelegt, kommt nun aber als int.
Ihc könnte es anpassen, dass dies beim import convertiert wird. -
@JackGruber said in Frage : Migrate MySQL nach Influxdb:
@MezzoDO ok, das ack feld ist bereits als boolen angelegt, kommt nun aber als int.
Ihc könnte es anpassen, dass dies beim import convertiert wird.Das wäre grandios
Mein Script, was ich einsetze:### MySQL DB info ### #import MySQLdb import pymysql conn = pymysql.connect(host="localhost", # your host, usually localhost user="xxx", # your username passwd="xxx", # your password db="iobroker") # name of the data base ### PostgreSQL DB info ### import psycopg2 import psycopg2.extras ##### # connection data for PostgreSQL #conn = psycopg2.connect("dbname=xxx user=xxx password=xxx host=xxx.xxx.xxx.xxx port =5432") ##### # InfluxDB info # from influxdb import InfluxDBClient # #####connection data for InfluxDB##### influxClient = InfluxDBClient(host='localhost', port=8086, username='xxx', password='xxx', database='iobroker') ##### #influxClient.delete_database(influx_db_name) #influxClient.create_database(influx_db_name) # dictates how columns will be mapped to key/fields in InfluxDB schema = { "time_column": "time", # the column that will be used as the time stamp in influx "columns_to_fields" : ["ack","q", "from","value"], # columns that will map to fields # "columns_to_tags" : ["",...], # columns that will map to tags "table_name_to_measurement" : "name", # table name that will be mapped to measurement } ''' Generates an collection of influxdb points from the given SQL records ''' def generate_influx_points(records): influx_points = [] for record in records: #tags = {}, fields = {} #for tag_label in schema['columns_to_tags']: # tags[tag_label] = record[tag_label] for field_label in schema['columns_to_fields']: fields[field_label] = record[field_label] influx_points.append({ "measurement": record[schema['table_name_to_measurement']], #"tags": tags, "time": record[schema['time_column']], "fields": fields }) return influx_points # query relational DB for all records #curr = conn.cursor('cursor', cursor_factory=psycopg2.extras.RealDictCursor) curr = conn.cursor(cursor=pymysql.cursors.DictCursor) # curr = conn.cursor(dictionary=True) ##### # SQL query for PostgreSQL, syntax for MySQL differs # query provide desired columns as a view on the sql server # request data from SQL, adjust ...from <view name> curr.execute("Select * from InfluxData;") ##### row_count = 0 # process 1000 records at a time while True: print("Processing row #" + str(row_count + 1)) selected_rows = curr.fetchmany(1000) influxClient.write_points(generate_influx_points(selected_rows)) row_count += 1000 if len(selected_rows) < 1000: break conn.close()
-
Ersetze in deinem Script Zeile 47 mit folgenden zwei zeilen. Auf das einrücken achten!:
if field_label == "ack": record[field_label] = bool(record[field_label])
-
@JackGruber
Ich komme leider auch nicht weiter ...
Habe deinen Script von Github installiert und bekomme folgende Ausgabe:pi@raspberrypi:~/iobroker_mysql_2_influxdb $ python3 migrate.py all Migrate 'all' datapoint(s) ... Total metrics in ts_number: 0 Total metrics in ts_bool: 0 Total metrics in ts_string: 0 Migrated: 0
Das ganze passiert innerhalb einer Sekunde .
Die Influxdb habe ich über den iobroker Adapter angelegt.
-
Hi @simatec, hab einen mini Fehler im Script gefunden. Durch einen BUG musste das
ALL
großgeschrieben werden.
Hab das Script angepasst, einfach neu herunterladen oder dasALL
großschreiben. -
@JackGruber
Danke für den Tipp ... Ich hatte es auch einzeln versucht und dann auch jedes Objekt einzeln importiert.
War zwar etwas Aufwand aber ging super -
Wie und wo kann ich das Script benutzen? In iobroker javascript oder Konsole? iobroker läuft bei mir im Docker Container....
-
@base python auf deinem PC installieren, das script wie auf der git seite beschrieben installieren und benutzen.
-
ich bekomme in diese Zeile einen Syntax Error:
print(f"Processing row {processed_rows + 1:,} to {processed_rows + len(selected_rows):,} from LIMIT {start_row:,} / {start_row + query_max_rows:,} " + table + " - " + metric['name'] + " (" + str(metric_nr) + "/" + str(metric_count) + ")")
er meckert die Anführungsstriche vor table an
-
@base
So war es bei mir auch.
Hast du mal versucht, die Daten einzeln pro Objekt zu importieren? Das half bei mir. -
@simatec habe dein Script von oben ausprobiert, dass holt aber nicht alle Objekte...
was muss ich da ändern?ok, habs gefunden. Muss natürlich die SQL Abfrage anpassen. Jetzt klappts
-
@base was für eine python version nutzt du?
Bei mir läuft das script ohne probleme durch.
-
@JackGruber hab jetzt Version 3.7.9, hatte vorher 3.9, da ging gar nichts.
Den Befehl pip install -r requirements.txt habe ich in der Eingabeaufforderung im entsprechenden Ordner durchgeführt, war das so richtig?
-
@base Ja, genau, dann sollte er alle nötigen module installieren.
Hm, muss ich mir mal mit python 3.9 anschauen ... -
@JackGruber wie gesagt, 3.9 hat gar nicht funktioniert bei mir und bei der 3.7.9 bekomme ich die Fehlermeldung