questdb.io
questdb.io copied to clipboard
Update - script for migrating from QuestDB open source to QuestDB cloud
Describe the requested changes:
Quick Python script shared by Cloud user:
import psycopg2
from questdb.ingress import Sender, TimestampNanos
from datetime import datetime, timedelta, timezone
import os
from dotenv import load_dotenv
load_dotenv()
TABLE_NAME = 'ohlc_exchanges_historical'
CHUNK_SIZE = 500000
MAX_TS = 1659044005
def uploadToQuestDB(list):
auth = (
os.environ.get("KID"),
os.environ.get("D_KEY"),
os.environ.get("X_KEY"),
os.environ.get("Y_KEY"))
with Sender('invezo-db-001-5c31b0ca.ilp.c7at.questdb.com', 30796, auth=auth, tls=True) as sender:
for item in list:
sender.row(
TABLE_NAME,
symbols=item["symbols"],
columns=item["columns"],
at=TimestampNanos.from_datetime(item['ts'].replace(tzinfo=timezone.utc))
)
sender.flush()
connection = psycopg2.connect(dsn=os.environ.get("OLD_DSN"))
connection.autocommit = True
pg_cursor = connection.cursor()
sql = "select count() from '{}' where ts < CAST({} * 1000000L AS TIMESTAMP) ".format(TABLE_NAME,MAX_TS)
pg_cursor.execute(sql)
table_count = pg_cursor.fetchall()[0][0]
lowerBound = 0
sql = "show columns from '{}'".format(TABLE_NAME)
pg_cursor.execute(sql)
cols = {
"ts": None,
"symbols": None,
"columns": None,
}
columns = pg_cursor.fetchall()
for (idx,column) in enumerate(columns):
column_details = {"idx": idx, "name": column[0]}
if(column[1] == "TIMESTAMP"):
cols["ts"] = column_details
elif(column[1] == "SYMBOL"):
if(cols["symbols"] == None):
cols["symbols"] = [column_details]
else:
cols["symbols"].append(column_details)
else:
if(cols["columns"] == None):
cols["columns"] = [column_details]
else:
cols["columns"].append(column_details)
def getColumns(item):
if(cols["columns"] == None): return None
a = {}
for i in cols["columns"]:
a[i["name"]] = item[i["idx"]]
return a
def getSymbols(item):
if(cols["symbols"] == None): return None
a = {}
for i in cols["symbols"]:
a[i["name"]] = item[i["idx"]]
return a
def getTs(item):
if(cols["ts"] == None): return None
return item[cols["ts"]["idx"]]
while(lowerBound < table_count):
upper = lowerBound + CHUNK_SIZE
if(upper > table_count): upper = table_count
print(lowerBound,upper)
sql = """select * from '{}' where ts < CAST({} * 1000000L AS TIMESTAMP) limit {},{}""".format(TABLE_NAME,MAX_TS,lowerBound,upper)
lowerBound += CHUNK_SIZE
pg_cursor.execute(sql)
data = []
count = False
for i in pg_cursor:
data.append(i)
formatted = []
last_block_height = 0
for block in data:
new_item = {
"ts": getTs(block),
"symbols": getSymbols(block),
"columns": getColumns(block)
}
formatted.append(new_item)
uploadToQuestDB(formatted)```
1. To check any refactoring is required for the code
2. Any instructions to use the code?
3.
**Location:**
- A new page in guide?
users tend to use flush
after every row, not sure it is a pattern we want to enforce. Ideally they don't flush, and it is done on a timer, or they count rows and flush every few.