[FEAT] Include metadata and addtional columns for ETL destination table & Migration of Single column data from 'String' to 'JSON'
What
- Added
metadatacolumn and additional columns to our ETL destination table. - Added migration of the single column data from
stringtojson
Why
- Previously users were able to access the minimal amount of metadata, especially cost. So a metadata column was included.
- Migration to json data type columns because it will enable the users to access the data faster and easier.
How
Can this PR break any existing features. If yes, please list possible items. If no, please explain why. (PS: Admins do not merge the PR without this section filled)
- No, this PR cannot break any existing features because the migration code functions in way to only add columns. Even if the users use a previous version of the code after migrating, it only stores data is the available column.
Database Migrations
Env Config
Relevant Docs
Related Issues or PRs
- https://zipstack.atlassian.net/browse/UN-1648
Dependencies Versions
Notes on Testing
- Tested the table migration and storing data as json in available databases.
Screenshots
Checklist
I have read and understood the Contribution Guidelines.
Summary by CodeRabbit
- New Features
- Destination tables now include metadata, status, error message, and customizable user fields, with automatic schema upgrades across supported databases.
- Errors are captured alongside outputs, marking each record as SUCCESS or ERROR for clearer tracking.
- Aggregated usage metrics are saved in metadata for better insight into processing activity.
- Improved handling of JSON and complex data types across databases for more reliable storage, with safe fallbacks when serialization fails.
- Dual-column storage (_v2) added for richer preservation of original data where supported.
Walkthrough
Added table-level metadata, status, error, and three user fields; introduced FileProcessingStatus enum; extended DB connectors with type mappings and migration helpers; enhanced DatabaseUtils for JSON/Enum and dual-column writes; Destination now forwards usage metadata and errors; workflow always calls handle_output with error.
Changes
| Cohort / File(s) | Change Summary |
|---|---|
Constantsbackend/workflow_manager/endpoint_v2/constants.py |
Added METADATA, ERROR_MESSAGE, STATUS, USER_FIELD_1, USER_FIELD_2, USER_FIELD_3; expanded PERMANENT_COLUMNS. |
Enumsbackend/workflow_manager/endpoint_v2/enums.py |
Added FileProcessingStatus enum (SUCCESS, ERROR). |
Database utilitiesbackend/workflow_manager/endpoint_v2/database_utils.py |
Added _create_safe_error_json; JSON and Enum handling (Snowflake VARIANT and generic paths); dual-column (_v2) write logic; extended get_columns_and_values signature to accept table_info, metadata, error and populate METADATA, ERROR_MESSAGE, STATUS. |
Destination flowbackend/workflow_manager/endpoint_v2/destination.py |
Integrated UsageHelper; added get_combined_metadata(); handle_output and insert_into_db accept error; inspect/migrate schema, pass table_info and combined metadata to DB insertion; ensure engine close and propagate failures. |
Workflow finalizationbackend/workflow_manager/workflow_v2/file_execution_tasks.py |
Always invoke DestinationConnector.handle_output(..., error=processing_error). |
DB base & migration utilitiesunstract/connectors/src/unstract/connectors/databases/unstract_db.py |
Added abstract get_string_type and prepare_multi_column_migration; get_create_table_base_query uses TableColumns.PERMANENT_COLUMNS; added is_string_column and migrate_table_to_v2 with logging and exception translation. |
DB connectors โ mapping & migrationunstract/connectors/src/unstract/connectors/databases/* |
For BigQuery, MSSQL, MySQL, Oracle, PostgreSQL, Redshift, Snowflake: added get_string_type and prepare_multi_column_migration; extended type mappings to include dict/list โ JSON/VARIANT/JSONB/CLOB/etc.; updated get_create_table_base_query to include metadata, user_field_1/2/3, status, error_message; Redshift returns multiple ALTER statements. |
MySQL handler mappingunstract/connectors/src/unstract/connectors/databases/mysql_handler.py |
Extended sql_to_db_mapping to map Python dict and list to MySQL JSON. |
Sequence Diagram(s)
sequenceDiagram
participant Workflow
participant Dest as DestinationConnector
participant Usage as UsageHelper
participant DBUtils as DatabaseUtils
participant DBC as DBConnector
Workflow->>Dest: handle_output(..., input_file_path, file_execution_id, error?)
Dest->>Usage: get_aggregated_token_count(file_execution_id)
Usage-->>Dest: usage_metadata
Dest->>Dest: get_combined_metadata() => workflow metadata + usage_metadata
Dest->>DBC: inspect_table_schema(table)
DBC-->>Dest: table_info
Dest->>DBC: migrate_table_to_v2(table, column) [if needed]
Dest->>DBUtils: get_columns_and_values(mode,data,file_path,execution_id,..., table_info, metadata, error)
DBUtils-->>Dest: columns_and_values (includes metadata,status,error,_v2 handling)
Dest->>DBC: insert(columns_and_values)
DBC-->>Dest: insert result / error
Estimated code review effort
๐ฏ 4 (Complex) | โฑ๏ธ ~45 minutes
๐ Recent review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to Reviews > Disable Cache setting
Knowledge Base: Disabled due to Reviews > Disable Knowledge Base setting
๐ฅ Commits
Reviewing files that changed from the base of the PR and between 72d58dc51db981f6424535fbe23da978c1812b51 and 4c2468c80b71e8aabd8a4fcd28f95e0ab99d5416.
๐ Files selected for processing (1)
backend/workflow_manager/workflow_v2/file_execution_tasks.py(1 hunks)
๐ง Files skipped from review as they are similar to previous changes (1)
- backend/workflow_manager/workflow_v2/file_execution_tasks.py
โฐ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build
โจ Finishing Touches
- [ ] ๐ Generate Docstrings
๐งช Generate unit tests
- [ ] Create PR with unit tests
- [ ] Post copyable unit tests in a comment
- [ ] Commit unit tests in branch
UN-1648-create-metadata-for-ETL-destination-table
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.
๐ชง Tips
Chat
There are 3 ways to chat with CodeRabbit:
- Review comments: Directly reply to a review comment made by CodeRabbit. Example:
I pushed a fix in commit <commit_id>, please review it.Open a follow-up GitHub issue for this discussion.
- Files and specific lines of code (under the "Files changed" tab): Tag
@coderabbitaiin a new review comment at the desired location with your query. - PR comments: Tag
@coderabbitaiin a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:@coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.@coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
Support
Need help? Create a ticket on our support page for assistance with any issues or questions.
CodeRabbit Commands (Invoked using PR/Issue comments)
Type @coderabbitai help to get the list of available commands.
Other keywords and placeholders
- Add
@coderabbitai ignoreor@coderabbit ignoreanywhere in the PR description to prevent this PR from being reviewed. - Add
@coderabbitai summaryto generate the high-level summary at a specific location in the PR description. - Add
@coderabbitaianywhere in the PR title to generate the title automatically.
CodeRabbit Configuration File (.coderabbit.yaml)
- You can programmatically configure CodeRabbit by adding a
.coderabbit.yamlfile to the root of your repository. - Please see the configuration documentation for more information.
- If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation:
# yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json
Status, Documentation and Community
- Visit our Status Page to check the current availability of CodeRabbit.
- Visit our Documentation for detailed information on how to use CodeRabbit.
- Join our Discord Community to get help, request features, and share feedback.
- Follow us on X/Twitter for updates and announcements.
| filepath | function | $$\textcolor{#23d18b}{\tt{passed}}$$ | SUBTOTAL |
|---|---|---|---|
| $$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ | $$\textcolor{#23d18b}{\tt{test\_logs}}$$ | $$\textcolor{#23d18b}{\tt{1}}$$ | $$\textcolor{#23d18b}{\tt{1}}$$ |
| $$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ | $$\textcolor{#23d18b}{\tt{test\_cleanup}}$$ | $$\textcolor{#23d18b}{\tt{1}}$$ | $$\textcolor{#23d18b}{\tt{1}}$$ |
| $$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ | $$\textcolor{#23d18b}{\tt{test\_cleanup\_skip}}$$ | $$\textcolor{#23d18b}{\tt{1}}$$ | $$\textcolor{#23d18b}{\tt{1}}$$ |
| $$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ | $$\textcolor{#23d18b}{\tt{test\_client\_init}}$$ | $$\textcolor{#23d18b}{\tt{1}}$$ | $$\textcolor{#23d18b}{\tt{1}}$$ |
| $$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ | $$\textcolor{#23d18b}{\tt{test\_get\_image\_exists}}$$ | $$\textcolor{#23d18b}{\tt{1}}$$ | $$\textcolor{#23d18b}{\tt{1}}$$ |
| $$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ | $$\textcolor{#23d18b}{\tt{test\_get\_image}}$$ | $$\textcolor{#23d18b}{\tt{1}}$$ | $$\textcolor{#23d18b}{\tt{1}}$$ |
| $$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ | $$\textcolor{#23d18b}{\tt{test\_get\_container\_run\_config}}$$ | $$\textcolor{#23d18b}{\tt{1}}$$ | $$\textcolor{#23d18b}{\tt{1}}$$ |
| $$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ | $$\textcolor{#23d18b}{\tt{test\_get\_container\_run\_config\_without\_mount}}$$ | $$\textcolor{#23d18b}{\tt{1}}$$ | $$\textcolor{#23d18b}{\tt{1}}$$ |
| $$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ | $$\textcolor{#23d18b}{\tt{test\_run\_container}}$$ | $$\textcolor{#23d18b}{\tt{1}}$$ | $$\textcolor{#23d18b}{\tt{1}}$$ |
| $$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ | $$\textcolor{#23d18b}{\tt{test\_get\_image\_for\_sidecar}}$$ | $$\textcolor{#23d18b}{\tt{1}}$$ | $$\textcolor{#23d18b}{\tt{1}}$$ |
| $$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ | $$\textcolor{#23d18b}{\tt{test\_sidecar\_container}}$$ | $$\textcolor{#23d18b}{\tt{1}}$$ | $$\textcolor{#23d18b}{\tt{1}}$$ |
| $$\textcolor{#23d18b}{\tt{TOTAL}}$$ | $$\textcolor{#23d18b}{\tt{11}}$$ | $$\textcolor{#23d18b}{\tt{11}}$$ |
@pk-zipstack It is relatively ok to allow a bit of duplication in this case, as each connector handles datatypes like JSONB etc, differently. Thus it is probably less load cognitively to have all connector code in the same file in the long run.
Let's see how it evolves at which point we can move common logic upstream to base UnstractDB class.
