OpenMetadata
OpenMetadata copied to clipboard
Support pluggable parser for querylog source & Mysql & Postgres implmentations
The current query log parser expects a CSV like https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/examples/sample_data/glue/query_log.csv. This works well for MySQL - but not for Postgres.
https://openmetadata.slack.com/archives/C02B6955S4S/p1659545895833039?thread_ts=1659538699.033179&cid=C02B6955S4S
Postgres is storing different columns in different structures.
Set the following parameters in Postgres to enable the CSV-based query log (in your docker-compose file)
command: ["postgres",
"-c", "log_statement=all",
"-c", "logging_collector=on",
"-c", "log_directory=pg_log",
"-c", "log_destination=stderr,csvlog",
"-c", "log_truncate_on_rotation=on",
"-c", "log_rotation_age=1d",
"-c", "log_filename=postgresql-fixed.log",
"-c", "log_rotation_size=0",
"-c", "log_duration=on"
]
This could be a first start in understanding the PG query log schema:
import pandas as pd
df = pd.read_csv('path/to/pg_log/postgresql-fixed.csv', header=None)
https://www.postgresql.org/docs/current/runtime-config-logging.html#RUNTIME-CONFIG-LOGGING-CSVLOG
CREATE TABLE postgres_log
(
log_time timestamp(3) with time zone,
user_name text,
database_name text,
process_id integer,
connection_from text,
session_id text,
session_line_num bigint,
command_tag text,
session_start_time timestamp with time zone,
virtual_transaction_id text,
transaction_id bigint,
error_severity text,
sql_state_code text,
message text,
detail text,
hint text,
internal_query text,
internal_query_pos integer,
context text,
query text,
query_pos integer,
location text,
application_name text,
backend_type text,
leader_pid integer,
query_id bigint,
PRIMARY KEY (session_id, session_line_num)
);
ref = pd.read_csv('https://raw.githubusercontent.com/open-metadata/OpenMetadata/main/ingestion/examples/sample_data/glue/query_log.csv')
d = df.copy()
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)
pd.set_option('display.max_colwidth', None)
d.columns = ["log_time", "user_name", "database_name", "process_id", "connection_from", "session_id", "session_line_num", "command_tag", "session_start_time", "virtual_transaction_id", "transaction_id", "error_severity", "sql_state_code", "message", "detail", "hint", "internal_query", "internal_query_pos", "context", "query", "query_pos", "location", "application_name", "backend_type", "leader_pid", "query_id"]
d.sql_state_code = (d.sql_state_code != 0).astype(bool)
d = d[(~d.user_name.isnull()) & (d.command_tag == 'SELECT')].drop(['query'], axis=1).rename(columns={'message':'query', 'log_time':'end_time', 'sql_state_code':'aborted'})
d['query'] = d['query'].str.replace('execute <unnamed>: ', '')
dcl = d[['query',#'command_tag',
'database_name', #'schema_name', # missing must be engineered later
'user_name', 'end_time', 'aborted',#'start_time', 'end_time', 'aborted'
]].copy()
dcl['duration'] = dcl['query'].shift(-1)
dcl = dcl[~dcl['query'].str.contains('duration')]
dcl['end_time'] = pd.to_datetime(dcl['end_time'])
dcl['duration'] = pd.to_timedelta(dcl['duration'].str[10:])#.extract(r'(\d+.\d+)').astype('float')
dcl['start_time'] = dcl['end_time'] - duration
dcl['schema_name'] = 'dummy'
fmt = '%Y-%m-%d %H:%M:%S.%f'
dcl['start_time'] = dcl['start_time'].dt.strftime(fmt)
dcl['end_time'] = dcl['end_time'].dt.strftime(fmt)
dcl = dcl[['query', 'database_name', 'schema_name',
'user_name', 'start_time',
'end_time', 'aborted']].rename(columns={'query':'query_text'}).copy()
Interestingly:
.rename(columns={'query':'query_text'})
is (undocumented, not matching the reference CSV) but necessary to work.
However, so far some columns are still missing (and it is not yet clear to me where to derive these from):
- schema_name are still missing.
I have tried to parse:
- start_time
- end_time
- aborted'
but would kindly ask you to validate the correctness.