@JackGruber said in Frage : Migrate MySQL nach Influxdb:
@MezzoDO ok, das ack feld ist bereits als boolen angelegt, kommt nun aber als int.
Ihc könnte es anpassen, dass dies beim import convertiert wird.
Das wäre grandios ![:) 🙂](https://forum.iobroker.net/plugins/nodebb-plugin-emoji/emoji/android/1f642.png?v=23itn9135ab)
Mein Script, was ich einsetze:
### MySQL DB info ###
#import MySQLdb
import pymysql
conn = pymysql.connect(host="localhost", # your host, usually localhost
user="xxx", # your username
passwd="xxx", # your password
db="iobroker") # name of the data base
### PostgreSQL DB info ###
import psycopg2
import psycopg2.extras
#####
# connection data for PostgreSQL
#conn = psycopg2.connect("dbname=xxx user=xxx password=xxx host=xxx.xxx.xxx.xxx port =5432")
#####
# InfluxDB info #
from influxdb import InfluxDBClient
#
#####connection data for InfluxDB#####
influxClient = InfluxDBClient(host='localhost', port=8086, username='xxx', password='xxx', database='iobroker')
#####
#influxClient.delete_database(influx_db_name)
#influxClient.create_database(influx_db_name)
# dictates how columns will be mapped to key/fields in InfluxDB
schema = {
"time_column": "time", # the column that will be used as the time stamp in influx
"columns_to_fields" : ["ack","q", "from","value"], # columns that will map to fields
# "columns_to_tags" : ["",...], # columns that will map to tags
"table_name_to_measurement" : "name", # table name that will be mapped to measurement
}
'''
Generates an collection of influxdb points from the given SQL records
'''
def generate_influx_points(records):
influx_points = []
for record in records:
#tags = {},
fields = {}
#for tag_label in schema['columns_to_tags']:
# tags[tag_label] = record[tag_label]
for field_label in schema['columns_to_fields']:
fields[field_label] = record[field_label]
influx_points.append({
"measurement": record[schema['table_name_to_measurement']],
#"tags": tags,
"time": record[schema['time_column']],
"fields": fields
})
return influx_points
# query relational DB for all records
#curr = conn.cursor('cursor', cursor_factory=psycopg2.extras.RealDictCursor)
curr = conn.cursor(cursor=pymysql.cursors.DictCursor)
# curr = conn.cursor(dictionary=True)
#####
# SQL query for PostgreSQL, syntax for MySQL differs
# query provide desired columns as a view on the sql server
# request data from SQL, adjust ...from <view name>
curr.execute("Select * from InfluxData;")
#####
row_count = 0
# process 1000 records at a time
while True:
print("Processing row #" + str(row_count + 1))
selected_rows = curr.fetchmany(1000)
influxClient.write_points(generate_influx_points(selected_rows))
row_count += 1000
if len(selected_rows) < 1000:
break
conn.close()