druid
druid copied to clipboard
Introduce Segment Schema Publishing and Polling for Efficient Datasource Schema Building
Description
Issue: https://github.com/apache/druid/issues/14989
The initial step in optimizing segment metadata was to centralize the construction of datasource schema in the Coordinator (https://github.com/apache/druid/pull/14985). Thereafter, we addressed the problem of publishing schema for realtime segments (https://github.com/apache/druid/pull/15475). Subsequently, our goal is to eliminate the requirement for regularly executing queries to obtain segment schema information.
This is the final change which involves publishing segment schema for finalized segments from task and periodically polling them in the Coordinator.
Design
Database
Schema Table
Table Name: SegmentSchema
Purpose: Store unique schema for segment.
Columns
Column Name | Data Type | Description |
---|---|---|
id | autoincrement | primary key |
created_date | varchar | creation time, allows filtering schema created after a point |
fingerprint | varchar | unique identifier for the schema |
payload | blob | includes rowSignature, aggregatorFactories |
Segments Table
New columns will be added to the already existing Segments
table.
Columns
Column Name | Data Type | Description |
---|---|---|
num_rows | long | number of rows in the segment |
schema_id | long | foreign key, references id in the schema table |
Task
Changes are required in the task to publish schema along with segment metadata.
- Introduce a new class
SchemaPayload
to encapsulate RowSignature and AggregatorFactories. - Introduce a new class
SegmentSchemaMetadata
to encapsulateSchemaPayload
andnumRows
. - Introduce a new class
MinimalSegmentSchemas
to encapsulate schema and numRows information for multiple segments. - Update
SegmentInsertAction
,SegmentTransactionalReplaceAction
,SegmentTransactionalAppendAction
&SegmentTransactionalInsertAction
to take in segment schema. - Changes in AbstractBatchIndexTask#buildPublishAction to take segment schema.
- Changes in SegmentAndCommitMetadata to take segment schema.
- Changes in TransactionalSegmentPublisher to take segment schema for publishing to the DB.
Streaming
- Changes in
StreamAppenderator
to get the RowSignature, AggregatorFactories and numRows for the segment.
Batch
- AppenderatorImpl#push to build the segment schema and add it to SegmentsAndCommitMetadata.
- BatchAppenderator#push to build the segment schema and add it to SegmentsAndCommitMetadata.
IndexTask
- Changes in BatchAppenderatorDriver#publishAll to pass segment schema for publishing.
- Change in IndexTask#generateAndPublishSegments to fetch segment schema from pushed segments and publish.
ParallelIndexSupervisorTask
- Changes in ParallelIndexSupervisorTask#publishSegments to combine segment schema from segments and publish them.
- SinglePhaseSubTask
- PartialSegmentMergeTask
MSQ
- Changes in
SegmentGeneratorFrameProcessor
to return segment schema along with segment metadata. - Changes in
SegmentGeneratorFrameProcessorFactory
andControllerImpl
.
Overlord
Changes are required in the Overlord (IndexerSQLMetadataStorageCoordintor
) to persist the schema along with segment metadata in the database.
Coordinator
Schema Poll
Changes in SqlSegmentsMetadataManager
to poll schema created since the last poll.
Also poll schema_id
and num_rows
additionally from segments table.
Update schema cache.
Schema Caching
Maintain a cache of segment schema. Refer SegmentSchemaCache
.
It caches following information,
Information | Writer | Cleanup |
---|---|---|
Segment Stats. SegmentId -> schemaId, numRows | Replaced on each DB poll | Not required. |
Schema for finalised segments. SchemaId -> SchemaPayload | Newer entries added on each DB poll. | Cleanup job. |
Realtime segment schema. SegmentId -> SegmentSchemaMetadata | Whenever Peons push schema update. | When the segment is removed. |
SMQResults which are not published. SegmentId -> SegmentSchemaMetadata | Added after SMQ query is executed. | If SegmentSchemaBackFill queue successfully writes the schema to the database, it is removed from this map. |
SMQResults which have been published. SegmentId -> SegmentSchemaMetadata | Added after segment schema is published to the DB. | Cleared after each DB Poll. |
SegmentMetadataCache changes
Changes in AbstractSegmentMetadataCache
class to add new method which will be overridden by child classes,
-
additionalInitializationCondition
-
removeSegmentAction
-
smqAction
Changes in CoordinatorSegmentMetadataCache
to override methods from AbstractSegmentMetadataCache
,
- Implement
additionalInitializationCondition
to wait for the segmentSchemaCache to be initialized. - Implement
removeSegmentAction
to remove the schema from the schema cache. - Override
smqAction
to additionally publish and cache the schema.
Schema Backfill
Added a new class SegmentSchemaBackFillQueue
which accepts segment schema and publishes them in batch.
Schema Cleanup
CoordinatorDuty to clean up schema which is not referenced by any segment. If the cleanup job ends up clearing stale schemas, the next DB polls fetches all the segment schemas in the DB.
Testing
- The changes have been tested locally with the wikipedia dataset.
- Unit test has been added.
- All of the existing integration tests have been tested with feature enabled (https://github.com/apache/druid/pull/15817/commits/e8a6d9b603e64a6c5dbb32389eec50efe0808854).
- Integration test with the group name
centralized-table-schema
runs successfully. - The changes are yet to be tested in a Druid cluster.
Upgrade considerations
The general upgrade order should be followed. The new code is behind a feature flag, so it is compatible with existing setups. Task with new changes can communicate with old version of Overlord.
Release Notes
This feature addresses multiple challenges outlined in the linked issue. To enable it, set druid.centralizedDatasourceSchema.enabled
.
If MM is used then set druid.indexer.fork.property.druid.centralizedDatasourceSchema.enabled
.
When the feature is enabled,
- Realtime segment schema change would be periodically pushed to the Coordinator,
- Finalized segment schema would be written to the metadata database.
- Coordinator would poll the schema along with segment metadata.
- Coordinator would build the datasource schema and broker would fetch it from the Coordinator.
To rollback, simply turn off the feature flag.
Important metrics to track,
Metric | Purpose |
---|---|
metadatacache/schemaPoll/count |
Number of coordinator polls to fetch datasource schema. |
metadatacache/schemaPoll/failed |
Number of failed coordinator polls to fetch datasource schema. |
metadatacache/schemaPoll/time |
Time taken for coordinator polls to fetch datasource schema. |
metadatacache/init/time |
Time taken to initialize the coordinator segment metadata cache. Depends on the number of segments. |
metadatacache/refresh/count |
Number of segments to refresh in coordinator segment metadata cache. |
metadatacache/refresh/time |
Time taken to refresh segments in coordinator segment metadata cache. |
This PR has:
- [X] been self-reviewed.
- [ ] using the concurrency checklist (Remove this item if the PR doesn't have any relation to concurrency.)
- [X] added documentation for new or modified features or behaviors.
- [X] a release note entry in the PR description.
- [X] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
- [ ] added or updated version, license, or notice information in licenses.yaml
- [X] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
- [X] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
- [X] added integration tests.
- [ ] been tested in a test Druid cluster.