questdb.io icon indicating copy to clipboard operation
questdb.io copied to clipboard

Update - script for migrating from QuestDB open source to QuestDB cloud

Open amyshwang opened this issue 2 years ago • 1 comments

Describe the requested changes:

Quick Python script shared by Cloud user:

import psycopg2
from questdb.ingress import Sender, TimestampNanos
from datetime import datetime, timedelta, timezone
import os
from dotenv import load_dotenv
load_dotenv()

TABLE_NAME = 'ohlc_exchanges_historical'
CHUNK_SIZE = 500000
MAX_TS = 1659044005

def uploadToQuestDB(list):
    auth = (
        os.environ.get("KID"),
        os.environ.get("D_KEY"),
        os.environ.get("X_KEY"),
        os.environ.get("Y_KEY"))
    with Sender('invezo-db-001-5c31b0ca.ilp.c7at.questdb.com', 30796, auth=auth, tls=True) as sender:
        for item in list:
            sender.row(
                TABLE_NAME,
                symbols=item["symbols"],
                columns=item["columns"],
                at=TimestampNanos.from_datetime(item['ts'].replace(tzinfo=timezone.utc))
            )
        sender.flush()

connection = psycopg2.connect(dsn=os.environ.get("OLD_DSN"))
connection.autocommit = True
pg_cursor = connection.cursor()
sql = "select count() from '{}' where ts < CAST({} * 1000000L AS TIMESTAMP) ".format(TABLE_NAME,MAX_TS)
pg_cursor.execute(sql)
table_count = pg_cursor.fetchall()[0][0]
lowerBound = 0

sql = "show columns from '{}'".format(TABLE_NAME)
pg_cursor.execute(sql)
cols = {
    "ts": None,
    "symbols": None,
    "columns": None,
}
columns = pg_cursor.fetchall()
for (idx,column) in enumerate(columns):

    column_details = {"idx": idx, "name": column[0]}

    if(column[1] == "TIMESTAMP"):
        cols["ts"] = column_details
    elif(column[1] == "SYMBOL"):
        if(cols["symbols"] == None):
            cols["symbols"] = [column_details]
        else:
            cols["symbols"].append(column_details)
    else:
        if(cols["columns"] == None):
            cols["columns"] = [column_details]
        else:
            cols["columns"].append(column_details)

def getColumns(item):
    if(cols["columns"] == None): return None
    a = {}
    for i in cols["columns"]:
        a[i["name"]] = item[i["idx"]]
    return a

def getSymbols(item):
    if(cols["symbols"] == None): return None
    a = {}
    for i in cols["symbols"]:
        a[i["name"]] = item[i["idx"]]
    return a

def getTs(item):
    if(cols["ts"] == None): return None
    return item[cols["ts"]["idx"]]

while(lowerBound < table_count):
    upper = lowerBound + CHUNK_SIZE
    if(upper > table_count): upper = table_count
    print(lowerBound,upper)
    sql = """select * from '{}' where ts < CAST({} * 1000000L AS TIMESTAMP) limit {},{}""".format(TABLE_NAME,MAX_TS,lowerBound,upper)

    lowerBound += CHUNK_SIZE
    pg_cursor.execute(sql)
    data = []
    count = False
    for i in pg_cursor:
        data.append(i)

    formatted = []
    last_block_height = 0
    for block in data:
        new_item = {
            "ts": getTs(block),
            "symbols": getSymbols(block),
            "columns": getColumns(block)
        }
        formatted.append(new_item)
    uploadToQuestDB(formatted)```



1. To check any refactoring is required for the code
2. Any instructions to use the code?
3. 
**Location:**

- A new page in guide?

amyshwang avatar Aug 12 '22 09:08 amyshwang

users tend to use flush after every row, not sure it is a pattern we want to enforce. Ideally they don't flush, and it is done on a timer, or they count rows and flush every few.

marregui avatar Aug 12 '22 10:08 marregui