druid icon indicating copy to clipboard operation
druid copied to clipboard

Introduce Segment Schema Publishing and Polling for Efficient Datasource Schema Building

Open findingrish opened this issue 1 year ago • 0 comments

Description

Issue: https://github.com/apache/druid/issues/14989

The initial step in optimizing segment metadata was to centralize the construction of datasource schema in the Coordinator (https://github.com/apache/druid/pull/14985). Thereafter, we addressed the problem of publishing schema for realtime segments (https://github.com/apache/druid/pull/15475). Subsequently, our goal is to eliminate the requirement for regularly executing queries to obtain segment schema information.

This is the final change which involves publishing segment schema for finalized segments from task and periodically polling them in the Coordinator.

Design

Database

Schema Table

Table Name: SegmentSchema Purpose: Store unique schema for segment.

Columns

Column Name Data Type Description
id autoincrement primary key
created_date varchar creation time, allows filtering schema created after a point
fingerprint varchar unique identifier for the schema
payload blob includes rowSignature, aggregatorFactories

Segments Table

New columns will be added to the already existing Segments table.

Columns

Column Name Data Type Description
num_rows long number of rows in the segment
schema_id long foreign key, references id in the schema table

Task

Changes are required in the task to publish schema along with segment metadata.

  • Introduce a new class SchemaPayload to encapsulate RowSignature and AggregatorFactories.
  • Introduce a new class SegmentSchemaMetadata to encapsulate SchemaPayload and numRows.
  • Introduce a new class MinimalSegmentSchemas to encapsulate schema and numRows information for multiple segments.
  • Update SegmentInsertAction, SegmentTransactionalReplaceAction, SegmentTransactionalAppendAction & SegmentTransactionalInsertAction to take in segment schema.
  • Changes in AbstractBatchIndexTask#buildPublishAction to take segment schema.
  • Changes in SegmentAndCommitMetadata to take segment schema.
  • Changes in TransactionalSegmentPublisher to take segment schema for publishing to the DB.

Streaming

  • Changes in StreamAppenderator to get the RowSignature, AggregatorFactories and numRows for the segment.

Batch

  • AppenderatorImpl#push to build the segment schema and add it to SegmentsAndCommitMetadata.
  • BatchAppenderator#push to build the segment schema and add it to SegmentsAndCommitMetadata.
IndexTask
  • Changes in BatchAppenderatorDriver#publishAll to pass segment schema for publishing.
  • Change in IndexTask#generateAndPublishSegments to fetch segment schema from pushed segments and publish.
ParallelIndexSupervisorTask
  • Changes in ParallelIndexSupervisorTask#publishSegments to combine segment schema from segments and publish them.
  • SinglePhaseSubTask
  • PartialSegmentMergeTask

MSQ

  • Changes in SegmentGeneratorFrameProcessor to return segment schema along with segment metadata.
  • Changes in SegmentGeneratorFrameProcessorFactory and ControllerImpl.

Overlord

Changes are required in the Overlord (IndexerSQLMetadataStorageCoordintor) to persist the schema along with segment metadata in the database.

Coordinator

Schema Poll

Changes in SqlSegmentsMetadataManager to poll schema created since the last poll. Also poll schema_id and num_rows additionally from segments table. Update schema cache.

Schema Caching

Maintain a cache of segment schema. Refer SegmentSchemaCache.
It caches following information,

Information Writer Cleanup
Segment Stats. SegmentId -> schemaId, numRows Replaced on each DB poll Not required.
Schema for finalised segments. SchemaId -> SchemaPayload Newer entries added on each DB poll. Cleanup job.
Realtime segment schema. SegmentId -> SegmentSchemaMetadata Whenever Peons push schema update. When the segment is removed.
SMQResults which are not published. SegmentId -> SegmentSchemaMetadata Added after SMQ query is executed. If SegmentSchemaBackFill queue successfully writes the schema to the database, it is removed from this map.
SMQResults which have been published. SegmentId -> SegmentSchemaMetadata Added after segment schema is published to the DB. Cleared after each DB Poll.

SegmentMetadataCache changes

Changes in AbstractSegmentMetadataCache class to add new method which will be overridden by child classes,

  • additionalInitializationCondition
  • removeSegmentAction
  • smqAction

Changes in CoordinatorSegmentMetadataCache to override methods from AbstractSegmentMetadataCache,

  • Implement additionalInitializationCondition to wait for the segmentSchemaCache to be initialized.
  • Implement removeSegmentAction to remove the schema from the schema cache.
  • Override smqAction to additionally publish and cache the schema.

Schema Backfill

Added a new class SegmentSchemaBackFillQueue which accepts segment schema and publishes them in batch.

Schema Cleanup

CoordinatorDuty to clean up schema which is not referenced by any segment. If the cleanup job ends up clearing stale schemas, the next DB polls fetches all the segment schemas in the DB.

Testing

  • The changes have been tested locally with the wikipedia dataset.
  • Unit test has been added.
  • All of the existing integration tests have been tested with feature enabled (https://github.com/apache/druid/pull/15817/commits/e8a6d9b603e64a6c5dbb32389eec50efe0808854).
  • Integration test with the group name centralized-table-schema runs successfully.
  • The changes are yet to be tested in a Druid cluster.

Upgrade considerations

The general upgrade order should be followed. The new code is behind a feature flag, so it is compatible with existing setups. Task with new changes can communicate with old version of Overlord.

Release Notes

This feature addresses multiple challenges outlined in the linked issue. To enable it, set druid.centralizedDatasourceSchema.enabled. If MM is used then set druid.indexer.fork.property.druid.centralizedDatasourceSchema.enabled.

When the feature is enabled,

  • Realtime segment schema change would be periodically pushed to the Coordinator,
  • Finalized segment schema would be written to the metadata database.
  • Coordinator would poll the schema along with segment metadata.
  • Coordinator would build the datasource schema and broker would fetch it from the Coordinator.

To rollback, simply turn off the feature flag.

Important metrics to track,

Metric Purpose
metadatacache/schemaPoll/count Number of coordinator polls to fetch datasource schema.
metadatacache/schemaPoll/failed Number of failed coordinator polls to fetch datasource schema.
metadatacache/schemaPoll/time Time taken for coordinator polls to fetch datasource schema.
metadatacache/init/time Time taken to initialize the coordinator segment metadata cache. Depends on the number of segments.
metadatacache/refresh/count Number of segments to refresh in coordinator segment metadata cache.
metadatacache/refresh/time Time taken to refresh segments in coordinator segment metadata cache.

This PR has:

  • [X] been self-reviewed.
    • [ ] using the concurrency checklist (Remove this item if the PR doesn't have any relation to concurrency.)
  • [X] added documentation for new or modified features or behaviors.
  • [X] a release note entry in the PR description.
  • [X] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • [ ] added or updated version, license, or notice information in licenses.yaml
  • [X] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • [X] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • [X] added integration tests.
  • [ ] been tested in a test Druid cluster.

findingrish avatar Feb 01 '24 04:02 findingrish