NEWS
Frage : Migrate MySQL nach Influxdb
-
Hmmmm....hab nur jetzt, wo ich in die InfluxDB schreiben will, echt bei jedem Objekt, das ich logge, Fehlermeldungen im LOG
influxdb.0 2021-06-08 22:11:28.416 warn Add alias.0.Spritpreise.JETDiesel to conflicting Points (4 now) influxdb.0 2021-06-08 22:11:28.413 warn Error on writePoint("{"value":"1,29","time":"2021-06-08T20:11:28.392Z","from":"system.adapter.tankerkoenig.0","q":0,"ack":true}): Error: {"error":"partial write: field type conflict: input field \"value\" on measurement \"alias.0.Spritpreise.JETDiesel\" is type string, already exists as type float dropped=1"} / "{\"error\":\"partial write: field type conflict: input field \\\"value\\\" on measurement \\\"alias.0.Spritpreise.JETDiesel\\\" is type string, already exists as type float dropped=1\"}\n" influxdb.0 2021-06-08 22:11:28.310 warn Add alias.0.Spritpreise.ESSODiesel to conflicting Points (4 now) influxdb.0 2021-06-08 22:11:28.308 warn Error on writePoint("{"value":"1,32","time":"2021-06-08T20:11:28.238Z","from":"system.adapter.tankerkoenig.0","q":0,"ack":true}): Error: {"error":"partial write: field type conflict: input field \"value\" on measurement \"alias.0.Spritpreise.ESSODiesel\" is type string, already exists as type float dropped=1"} / "{\"error\":\"partial write: field type conflict: input field \\\"value\\\" on measurement \\\"alias.0.Spritpreise.ESSODiesel\\\" is type string, already exists as type float dropped=1\"}\n" influxdb.0 2021-06-08 22:10:16.094 warn Add alias.0.Spritpreise.JETDiesel to conflicting Points (4 now) influxdb.0 2021-06-08 22:10:16.093 warn Error on writePoint("{"value":"1,29","time":"2021-06-08T20:01:28.348Z","from":"system.adapter.tankerkoenig.0","q":0,"ack":true}): Error: {"error":"partial write: field type conflict: input field \"value\" on measurement \"alias.0.Spritpreise.JETDiesel\" is type string, already exists as type float dropped=1"} / "{\"error\":\"partial write: field type conflict: input field \\\"value\\\" on measurement \\\"alias.0.Spritpreise.JETDiesel\\\" is type string, already exists as type float dropped=1\"}\n" influxdb.0 2021-06-08 22:10:16.070 warn Add alias.0.Spritpreise.JETDiesel to conflicting Points (4 now) influxdb.0 2021-06-08 22:10:16.069 warn Error on writePoint("{"value":"1,29","time":"2021-06-08T19:56:28.376Z","from":"system.adapter.tankerkoenig.0","q":0,"ack":true}): Error: {"error":"partial write: field type conflict: input field \"value\" on measurement \"alias.0.Spritpreise.JETDiesel\" is type string, already exists as type float dropped=1"} / "{\"error\":\"partial write: field type conflict: input field \\\"value\\\" on measurement \\\"alias.0.Spritpreise.JETDiesel\\\" is type string, already exists as type float dropped=1\"}\n" influxdb.0 2021-06-08 22:10:16.039 warn Add alias.0.Spritpreise.JETDiesel to conflicting Points (4 now) influxdb.0 2021-06-08 22:10:16.038 warn Error on writePoint("{"value":"1,29","time":"2021-06-08T19:51:28.283Z","from":"system.adapter.tankerkoenig.0","q":0,"ack":true}): Error: {"error":"partial write: field type conflict: input field \"value\" on measurement \"alias.0.Spritpreise.JETDiesel\" is type string, already exists as type float dropped=1"} / "{\"error\":\"partial write: field type conflict: input field \\\"value\\\" on measurement \\\"alias.0.Spritpreise.JETDiesel\\\" is type string, already exists as type float dropped=1\"}\n" influxdb.0 2021-06-08 22:10:16.007 warn Try to write 3 Points separate to find the conflicting one influxdb.0 2021-06-08 22:10:16.006 warn Error on writePoints for alias.0.Spritpreise.JETDiesel: Error: {"error":"partial write: field type conflict: input field \"value\" on measurement \"alias.0.Spritpreise.JETDiesel\" is type string, already exists as type float dropped=3"} influxdb.0 2021-06-08 22:10:15.982 warn Add alias.0.Spritpreise.ESSODiesel to conflicting Points (4 now) influxdb.0 2021-06-08 22:10:15.980 warn Error on writePoint("{"value":"1,32","time":"2021-06-08T20:01:28.220Z","from":"system.adapter.tankerkoenig.0","q":0,"ack":true}): Error: {"error":"partial write: field type conflict: input field \"value\" on measurement \"alias.0.Spritpreise.ESSODiesel\" is type string, already exists as type float dropped=1"} / "{\"error\":\"partial write: field type conflict: input field \\\"value\\\" on measurement \\\"alias.0.Spritpreise.ESSODiesel\\\" is type string, already exists as type float dropped=1\"}\n" influxdb.0 2021-06-08 22:10:15.952 warn Add alias.0.Spritpreise.ESSODiesel to conflicting Points (4 now) influxdb.0 2021-06-08 22:10:15.945 warn Error on writePoint("{"value":"1,32","time":"2021-06-08T19:56:28.229Z","from":"system.adapter.tankerkoenig.0","q":0,"ack":true}): Error: {"error":"partial write: field type conflict: input field \"value\" on measurement \"alias.0.Spritpreise.ESSODiesel\" is type string, already exists as type float dropped=1"} / "{\"error\":\"partial write: field type conflict: input field \\\"value\\\" on measurement \\\"alias.0.Spritpreise.ESSODiesel\\\" is type string, already exists as type float dropped=1\"}\n" influxdb.0 2021-06-08 22:10:15.476 warn Error on writePoint("{"value":true,"time":"2021-06-08T20:06:06.182Z","from":"system.adapter.sonoff.0","q":0,"ack":true}): Error: {"error":"partial write: field type conflict: input field \"value\" on measurement \"alias.0.Steckdosen.Serverschrank\" is type boolean, already exists as type float dropped=1"} / "{\"error\":\"partial write: field type conflict: input field \\\"value\\\" on measurement \\\"alias.0.Steckdosen.Serverschrank\\\" is type boolean, already exists as type float dropped=1\"}\n"
Hab nur einige wenige gepostet...wie bekomm ich das behoben?
-
@kueppert
Es schaut so aus, als ob system.adapter.tankerkoenig mal strings und mal float values enthält und dies natürlich nicht in ein schema passen.Du könntest mir einen export aus der MySQL zukommen lassen. Die ID (Angezeigt beim start des jeweiligen imports) kannst du für eine WHERE bedingung beim Datenexport nutzen.
-
@jackgruber hab vermutlich den Fehler gefunden. Ich hab im Alias den "." zu einem "," konvertiert und den Typ auf string stehen gehabt...hab die Konvertierung jetzt rausgenommen und auf number umgestellt. Sollte das reichen? Entspricht das einem float?
Wenn nein, lösche ich wieder die InfluxDB (das measurement) und importiere nochmal alles neu...
Danke dir und vG. ThorstenNACHTRAG: Tankerkönig schreibt wirklich nen String in seine Objekte beim Spritpreis, ich konvertiere zu number im Alias Kann ich dir "einfach" aus phpMyAdmin einen Export zukommen lassen? Ich weiß nur nicht, wie ich zB nur die Spritpreise da raus bekomme ^^ Hab aktuell 3Mio Datensätze, viele wollte ich in der Influx nach "Migration" löschen. Aber den string vom Tankerkönig bekomm ich ja nicht mit meinen ALias-Werten (number) gemerged so ohne weiteres...
-
Ersetz das XYZ durch die ID aus der python ausgabe und führ beide Querys unter
SQL
im PhPMyAdmin aus.SELECT * FROM ts_string WHERE id = XYZ LIMIT 0, 100
SELECT * FROM datapoints WHERE id = XYZ
Dann für beide abfragen ganz unten auf der Seite
Operationen für das Abfrageergebnis
>Exportieren
wählen.Dann kann ich es mir anschauen und evtl. ne Option reinbauen um Datensätze zu konvertieren.
-
@jackgruber danke dir für deine Rückmeldung. Ich habe herausgefunden, dass die Fehlermeldungen "nur" kamen bei neuen Eintragungen in die InfluxDB, die ich im Alias-DP falsch konvertiert hatte. Hab im ALias auf Zahl umgestellt und voila, InfluxDB schmeißt keine Fehler mehr
Bin gerade dabei in der Konsole alle Maisurements zusammenzuführen, die zusammen gehören (hab zu oft die Bezeichnungen gewechselt). Werde zukünftig nur noch die Alias-Objekte loggen und dahinterliegende Geräte, sofern nötig, austauschen. Dann bleibt es immer bei der gleichen Bezeichnung.
DANKE für dein tolles Skript für den Import in InfluxDB -
Hallo,
erstmal vielen Dank für das Skript.
ts_numeric lässt sich ohne Probleme überführen, aber mit bool gibt es Probleme. Scheinbar sind die Daten in der Quelle nicht als bool sondern als float abgelegt. Daher wird eine Konvertierung nach boolean benötigt. Was muss an dem Skript angepasst werden, damit ts_bool vor der Überführung entsprechen konvertiert wird?
Total metrics in ts_bool: 1 wolf.0.cwl.158(ID: 98) (1/1) Processing row 1 to 1,000 from LIMIT 0 / 100,000 ts_bool - wolf.0.cwl.158 (1/1) InfluxDB error 400: {"error":"partial write: field type conflict: input field \"value\" on measurement \"wolf.0.cwl.158\" is type float, already exists as type boolean dropped=200"}
Grüße
-
@schweigel was für float werte stehen in der ts_bool für den Datenpunkt?
SELECT * FROM ts_bool WHERE id = 98 AND val != 0 AND val != 1 LIMIT 0, 10;
-
@JackGruber Es sind nur 0 und 1 enthalten, deine Abfrage liefert also keine Zeilen zurück.
-
@schweigel
Du hast in der InfluxDB bereits daten vom selben messpunkt, die float (Zahlen mit nachkommastellen) sind. Diese in bool (Wahrheitswerte) zu konvertieren macht eigentlich kein Sin.Aber du kannst versuchen das Script für dich anzupassen.
Füge soetwas wiefields["value"] = float(record["value"])
,nach Zeile 108 ein, dann wird versucht alle Werte in float zu wandeln. -
@JackGruber
Mmh, das hat nicht ganz geklappt.
Habe es versucht, dann kam folgende Fehlermeldung:wolf.0.cwl.158(ID: 98) (1/1) Processing row 1 to 1,000 from LIMIT 0 / 100,000 ts_bool - wolf.0.cwl.158 (1/1) InfluxDB error 400: {"error":"partial write: field type conflict: input field \"value\" on measurement \"wolf.0.cwl.158\" is type float, already exists as type boolean dropped=219"}
Dachte auch das die Daten in MySQL in float sind und ich diese nach InfluxDB als boolean überführen muss, also habe ich die Zeile angepasst:
fields["value"] = bool(record["value"])
Zu meiner Überraschung bekam ich dann folgende Meldung:
wolf.0.cwl.158(ID: 98) (1/1) Processing row 1 to 1,000 from LIMIT 0 / 100,000 ts_bool - wolf.0.cwl.158 (1/1) InfluxDB error 400: {"error":"partial write: field type conflict: input field \"value\" on measurement \"wolf.0.cwl.158\" is type boolean, already exists as type float dropped=148"}
Nun war ich verwundert, ist value nun ein float oder ein boolean in InfluxDB? Also habe ich mir den Aufbau in Influx angesehen mit folgendem Ergebnis:
name: wolf.0.cwl.158 fieldKey fieldType -------- --------- ack boolean from string q float value boolean value float
value gibt es zweimal. Für mein Verständnis sollte es nur einmal da sein. Muss ich da die InfluxDB erst fixen?
-
Habe den Messwert nun aus der DB gelöscht und ihn über den Adapter in iobroker neu angelegt. Nun hat er nur noch einen value als boolean. Damit war der Datentransfer nun möglich.
-
Hatte nun noch weitere Probleme:
Daten waren in MySQL und in InfluxDB als unterschiedliche Formate gespeichert und teilweise hatten sich in MySQL die Datentypen über die Zeit geändert, sodass ein Messwert in mehreren Datentabellen (z.B. ts_number & ts_string) vorhanden war.
Da das Skript den Datentyp über die Quelltabelle ermittelt (ts_number => Zahl, ts_bool => Wahrheitswert, ts_string => Zeichenfolge) führt dieses zu Problemen.
In der Übersichtstabelle (datapoints) gibt es nur eine Angabe des Datentypes. Diese ist in meinen Auge der aktuell gewünschte Datentyp. Daher habe ich das Skript um eine Abfrage des Datentypes aus datapoints erweitert und eine Konvertierung in diesen Datentyp eingebaut.
Nun können Daten z.B. in den Tabellen ts_string und ts_number vorliegen. Bei der Migration werden die Daten dann in den Datentyp konvertiert, der in datapoints angegeben ist.
Damit reicht es das InfluxDB und MySQL für jeden Wert den gleichen Datentypen verwenden.Da ich früher meist als Datentyp "Automatisch" angegeben hatte, konnte dieses hier zu Problemen führen. Nach dem ich alle Datentypen händisch gerade gezogen habe, teilweise direkt in der MySQL-Datenbank, funktionierte die Migration mit der Skriptanpassung problemlos.
Meine Änderungen des Skriptes sind in einem Fork abgelegt und einen Pull request habe ich gestellt.
Aus meiner Sicht könnte man nun noch als Anpassung vornehmen, das er den gewünschten Datentypen nicht aus der MySQL-Datenbank bezieht, sondern aus der InfluxDB, wenn man vorher den Wert dort bereits über iobroker eingefügt hat.
In Summe nochmal danke für das Skript, es hat mir sehr weiter geholfen.
-
@jackgruber sagte in Frage : Migrate MySQL nach Influxdb:
Hab das script etwas angepasst, damit alle Daten übernommen werden und diese auch auf einem leistungsschwachen Rasperry Pi ausgelesen werden können.
Frage:
Läuft das Script auch mit influxdb v2.x?
Hintergrund:
Der externe Zugriff wird ab v2 ja über Token abgesichert.
In der database.json sind aber nur username/password für influxdb hinterlegt. -
Hallo @JackGruber
Erstmal Danke für das Script
Ich habe versucht, nach der Anleitung über das Script die Daten von MySQL nach InfluxDB zu migrieren.
Leider lässt sich das Script migrate.py bei mir nicht ohne Fehlermeldung ausführen.
Es erscheint:
File "migrate.py", line 177 print(f"Processing row {processed_rows + 1:,} to {processed_rows + len(selected_rows):,} from LIMIT {start_row:,} / {start_row + query_max_rows:,} " + ^ SyntaxError: invalid syntax
Wo da in Zeile 177 der Fehler steckt, verschließt sich mir ...
Hat jemand eine Idee, woran das liegen könnte?
-
@nureinbenutzer Update dein Python auf eine aktuelle Python 3 version!
Ich vermute du nutzt eine alte 2.x version. -
@jackgruber Python liegt in Version 3.7.3 vor ...
Das war auch der Grund.
Der Aufruf muss lauten
python3 migrate.py ALL
Danke für den Denkanstoss
-
Hallo.
Ich habe gestern das Script von @JackGruber probiert und es lief auch an.
Habe gedacht ich lasse es über Nacht laufen, aber heute Früh habe ich folgenden Fehler gesehen:Processing row 600,001 to 601,000 from LIMIT 600,000 / 700,000 ts_number - Wetter Luftdruck hPa (53/81) InfluxDB error HTTPConnectionPool(host='localhost', port=8086): Max retries exceeded with url: /write?db=iobroker&rp=autogen (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x74d52778>: Failed to establish a new connection: [Errno 111] Connection refused'))
Jemand eine Idee?
Danke!
-
@ullij Erstmal vielen Dank..
Ich musste das Skript leicht anpassen auf sqlite, hab jetzt aber einen Fehler, der mit der Anpassung selbst eigentlich nichts zu tun hat:
### MySQL DB info ### #import MySQLdb #conn = MySQLdb.connect(host="localhost", # your host, usually localhost # user="john", # your username # passwd="megajonhy", # your password # db="jonhydb") # name of the data base ### PostgreSQL DB info ### #import psycopg2 #import psycopg2.extras import sqlite3 ##### # connection data for PostgreSQL #conn = psycopg2.connect("dbname=xxx user=xxx password=xxx host=xxx.xxx.xxx.xxx port =5432") ##### conn = sqlite3.connect('/opt/iobroker/iobroker-data/sqlite/sqlite.db') # InfluxDB info # from influxdb import InfluxDBClient # #####connection data for InfluxDB##### influxClient = InfluxDBClient(host='localhost', port=8086, username='xxxx', password='xxxx!', database='xxxx') ##### #influxClient.delete_database(influx_db_name) #influxClient.create_database(influx_db_name) # dictates how columns will be mapped to key/fields in InfluxDB schema = { "time_column": "time", # the column that will be used as the time stamp in influx "columns_to_fields" : ["ack","q", "from","value"], # columns that will map to fields # "columns_to_tags" : ["",...], # columns that will map to tags "table_name_to_measurement" : "name", # table name that will be mapped to measurement } ''' Generates an collection of influxdb points from the given SQL records ''' def generate_influx_points(records): influx_points = [] for record in records: #tags = {}, fields = {} #for tag_label in schema['columns_to_tags']: # tags[tag_label] = record[tag_label] for field_label in schema['columns_to_fields']: if field_label == "ack": record[field_label] = bool(record[field_label]) fields[field_label] = record[field_label] influx_points.append({ "measurement": record[schema['table_name_to_measurement']], #"tags": tags, "time": record[schema['time_column']], "fields": fields }) return influx_points # query relational DB for all records curr = conn.cursor() # curr = conn.cursor(dictionary=True) ##### # SQL query for PostgreSQL, syntax for MySQL differs # query provide desired columns as a view on the sql server # request data from SQL, adjust ...from <view name> curr.execute("Select * from xxx;") ##### row_count = 0 # process 1000 records at a time while True: print("Processing row #" + str(row_count + 1)) selected_rows = curr.fetchmany(1000) influxClient.write_points(generate_influx_points(selected_rows)) row_count += 1000 if len(selected_rows) < 1000: break conn.close()
Fehler:
user@UbuntuHomeAutomation2:~$ python3 sql2influx3.py Processing row #1 Traceback (most recent call last): File "sql2influx3.py", line 75, in <module> influxClient.write_points(generate_influx_points(selected_rows)) File "sql2influx3.py", line 50, in generate_influx_points record[field_label] = bool(record[field_label]) TypeError: tuple indices must be integers or slices, not str user@UbuntuHomeAutomation2:~$
Vielleicht hat ja jemand ne Lösung.
-
@thomas-herrmann
wünsche ein frohes neues Jahr.Du meintest sicherlich @JackGruber.
Ich kann Dir bei dem Skript (leider) nicht helfen.
Gruß -
@ullij
Wünsch dir ebenso ein frohes Neues.
Es bezieht sich auf das Skript in deinem Post vom 14. Apr. 2020, 20:42: [https://forum.iobroker.net/topic/12482/frage-migrate-mysql-nach-influxdb/26](Link Adresse)Aber vielleicht hat ja noch jemand eine Idee, woran es liegen kann.