OpenMetadata icon indicating copy to clipboard operation
OpenMetadata copied to clipboard

Support pluggable parser for querylog source & Mysql & Postgres implmentations

Open geoHeil opened this issue 3 years ago • 1 comments

The current query log parser expects a CSV like https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/examples/sample_data/glue/query_log.csv. This works well for MySQL - but not for Postgres.

https://openmetadata.slack.com/archives/C02B6955S4S/p1659545895833039?thread_ts=1659538699.033179&cid=C02B6955S4S

Postgres is storing different columns in different structures.

Set the following parameters in Postgres to enable the CSV-based query log (in your docker-compose file)

command: ["postgres", 
    "-c", "log_statement=all", 
    "-c", "logging_collector=on",
    "-c", "log_directory=pg_log",
   "-c", "log_destination=stderr,csvlog",
    "-c", "log_truncate_on_rotation=on",
    "-c", "log_rotation_age=1d",
    "-c", "log_filename=postgresql-fixed.log",
    "-c", "log_rotation_size=0",
    "-c", "log_duration=on"
    ]

geoHeil avatar Aug 03 '22 21:08 geoHeil

This could be a first start in understanding the PG query log schema:

import pandas as pd

df = pd.read_csv('path/to/pg_log/postgresql-fixed.csv', header=None)

https://www.postgresql.org/docs/current/runtime-config-logging.html#RUNTIME-CONFIG-LOGGING-CSVLOG

CREATE TABLE postgres_log
(
  log_time timestamp(3) with time zone,
  user_name text,
  database_name text,
  process_id integer,
  connection_from text,
  session_id text,
  session_line_num bigint,
  command_tag text,
  session_start_time timestamp with time zone,
  virtual_transaction_id text,
  transaction_id bigint,
  error_severity text,
  sql_state_code text,
  message text,
  detail text,
  hint text,
  internal_query text,
  internal_query_pos integer,
  context text,
  query text,
  query_pos integer,
  location text,
  application_name text,
  backend_type text,
  leader_pid integer,
  query_id bigint,
  PRIMARY KEY (session_id, session_line_num)
);

ref = pd.read_csv('https://raw.githubusercontent.com/open-metadata/OpenMetadata/main/ingestion/examples/sample_data/glue/query_log.csv')

d = df.copy()
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)
pd.set_option('display.max_colwidth', None)
d.columns = ["log_time", "user_name", "database_name", "process_id", "connection_from", "session_id", "session_line_num", "command_tag", "session_start_time", "virtual_transaction_id", "transaction_id", "error_severity", "sql_state_code", "message", "detail", "hint", "internal_query", "internal_query_pos", "context", "query", "query_pos", "location", "application_name", "backend_type", "leader_pid", "query_id"]
d.sql_state_code = (d.sql_state_code != 0).astype(bool)
d = d[(~d.user_name.isnull()) & (d.command_tag == 'SELECT')].drop(['query'], axis=1).rename(columns={'message':'query', 'log_time':'end_time', 'sql_state_code':'aborted'})
d['query'] = d['query'].str.replace('execute <unnamed>: ', '')


dcl = d[['query',#'command_tag', 
    'database_name', #'schema_name',  # missing must be engineered later
    'user_name', 'end_time', 'aborted',#'start_time', 'end_time', 'aborted'
    ]].copy()
dcl['duration'] = dcl['query'].shift(-1)
dcl = dcl[~dcl['query'].str.contains('duration')]

dcl['end_time'] = pd.to_datetime(dcl['end_time'])

dcl['duration'] =  pd.to_timedelta(dcl['duration'].str[10:])#.extract(r'(\d+.\d+)').astype('float')
dcl['start_time'] = dcl['end_time'] - duration
dcl['schema_name'] = 'dummy'

fmt = '%Y-%m-%d %H:%M:%S.%f'
dcl['start_time'] = dcl['start_time'].dt.strftime(fmt)
dcl['end_time'] = dcl['end_time'].dt.strftime(fmt)

dcl = dcl[['query', 'database_name', 'schema_name',
           'user_name', 'start_time',
       'end_time', 'aborted']].rename(columns={'query':'query_text'}).copy()

Interestingly:

.rename(columns={'query':'query_text'})

is (undocumented, not matching the reference CSV) but necessary to work.

However, so far some columns are still missing (and it is not yet clear to me where to derive these from):

  • schema_name are still missing.

I have tried to parse:

  • start_time
  • end_time
  • aborted'

but would kindly ask you to validate the correctness.

geoHeil avatar Aug 04 '22 08:08 geoHeil