milvus icon indicating copy to clipboard operation
milvus copied to clipboard

[Enhancement]: Support Streaming Service in Milvus

Open chyezh opened this issue 1 year ago • 5 comments

Motivation

  • Enhance cloud-native pooling capabilities: decouple I/O and computation, separate batch and stream data processing into different components.
  • Enhance write path functionality: WAL pre-write data checking and deduplication, even post-write data secondary index synchronization. The flushing and syncing process will be simplified in the pipeline.
  • Optimize Timetick allocation: only ensure channel dimension monotonically increases.
  • Embedded pub-sub log service:
    • Merge and dispatch channel data managed unifiedly.
    • Streaming API only provides Vchannel granularity operations, pchannel becomes an internal concept.
    • Reduce dependency on third-party message queues, KV storage or distributed file systems can both be used for log storage, increasing channel upper limit possibility.
    • Internal message optimization: caching messages to reduce remote fetch operations, merging of empty window timetick messages reduces persistent data, improves log data catch up read speed, and reduces recovery time.

Architecture

image The following changes will be made:

  • Write with pub API of stream node instead of MQ In proxy
  • Don’t process TimeTick in the proxy communicate with RootCoord
  • Allocate segment ID in stream node instead of request segment ID allocator in proxy with Datacoord
  • Subscribe stream node instead MQ in QueryNode
  • The flush function will be moved to the stream node, the Indexnode will be merge into Datanode

Components Responsibility

  • StreamCoord :
    • Meta manager for segments and pchannels, with the partial function being akin to the datacoord segment manager.
    • Pchannel holder is responsible for assigning pchannels to stream nodes or balancing pchannels on stream nodes.
    • Manage Sessions for all stream nodes
  • StreamNode
    • Provide a pub-sub API for producing or consuming a Write-Ahead Log (WAL) that includes consuming from the latest position or a specific position.
    • WAL persistence process flow with pre- and post-hooks interface.
    • Subscribe to WAL and flush data to object storage
    • Query stream data in the future
  • DataCoord
    • Coordinate and allocate tasks to computing nodes, such as indexing, importing, and compaction tasks
    • task meta manager
    • Manage sessions for all computing nodes•
  • DataNode
    • Execute Task
  • QueryCoord/ Querynode
    • Keep to the same with 2.4 version

Goals

  • Track1: pub-sub WAL service, support rocksmq/puslar/kafka as WAL data storage.
  • Track2: support flushing in stream node, merge indexnode into datanode.

RoadMap

Streaming Service Implementation

  • [x] new message interface, provide fast filtering by using properties of message. Avoid too much unserialized operation applied to milvus streamingnode. #33286
    • [x] Properties for filtering and sorted: timetick, vchannel name, message type. #33286
  • [x] streaming service client
    • [x] streaming node manage service client #34653
    • [x] streaming node pub-sub service client #34653
    • [x] streaming coord discovery client #34654
    • [x] grpc utilities, such resolver, service discoverer, error handling interceptor, balancer #34436
    • [x] streaming service client #34656
  • [x] streaming coord service
    • [x] channel management at streaming coord. #34435
    • [x] channel assignment balancer. #34435
      • [x] channel count fair balance strategy. #34435
    • [x] streaming coord grpc server. #34655
  • [x] streaming node service
    • [x] support Streaming producing and consuming operation at streamingnode. #34166
    • [x] support Management operation at streamingnode. #34166
    • [x] streaming node grpc server. #34655
  • [x] wal-based streaming node implementation
    • [x] wal interface definition. #33745
    • [x] wal adapt underlying implementation. #34122
    • [x] wal append support interceptor. #34238
    • [x] timetick is managed on streamingnode now. #34238
    • [x] scanner operation support message timetick sorting and filtering. #34122
    • [x] wal management on streaming node. #34153
    • [x] wal transaction implementation. #35289
  • [ ] Remove mqwrapper,we need a wal but not a message queue in new architecture. we didn't rely on message queue's consuming api, so there's no stateful message queue side consumer on new architecture. Like pulsar, we use reader api but not consumer api. implement all underlying mq or log storage into wal.
    • [x] Pulsar #34046
    • [x] Rocksmq #34046
    • [ ] Kafka
    • [ ] RocketMQ #33962
  • [ ] timestamp assignment related optimization.
    • [x] Continuous timetick message (no message between two time tick message) can be optimized as no-persistent message. #35287
    • [ ] Timestamp sync up can be more shorter (200ms by default now), we can optimize it into 10ms level without too much overhead.
    • [ ] Timestamp can be merged with other message, if there's incoming messages.
  • [ ] Write ahead cache at streamingnode server side. We can no longer always consuming every message from underlying message queue, many message can be consumed from local storage of streamingnode by cached writing log entry.
  • [ ] Support message deduplication.

Use Streaming Service To Produce

  • [x] Proxy DML(insert/delete/upsert/flush) use streaming service to produce. #35406
  • [x] RootCoord DDL(createcollection/dropcollection/createpartition/droppartition) use streaming service to produce. #35406

Use Streaming Service To Consume In Query

  • [ ] Move the general search utilities from querynode into new module. #37531, #37722
  • [ ] Shard leader use streaming service to grow.

Use Streaming Service To Consume In Flush

  • [x] Decoupling dependencies between flush compaction imports on datanode https://github.com/milvus-io/milvus/pull/33958 https://github.com/milvus-io/milvus/pull/33992
  • [x] refacor rocksmq and move to pkg module https://github.com/milvus-io/milvus/pull/33881
  • [x] add msgpack and message interface adaptor for consuming side, to make compatible with old flusher codes. #34874
  • [x] add segment allocation interceptor for streamingnode #34996
  • [x] https://github.com/milvus-io/milvus/pull/34942

Utility

  • [x] Support More Metrics: #36523

Rolling upgrade

Incoming

Limitation

  • Only one pchannel can be assigned to a single stream node, implying that pchannel supports only single point write.

chyezh avatar May 22 '24 11:05 chyezh

what about name it as streaming service?

xiaofan-luan avatar Jun 18 '24 05:06 xiaofan-luan

Streaming Service Upgrading In Milvus 2.5

Dependency Specification

Version 2.4 relies on the pub/sub capability of MQ for both reading and writing paths to support data persistence and querying of streaming data respectively.

Write path image

Read path image

TimeTick lifetime image

In version 2.5, the pub/sub API is provided by StreamNode, and MQ reading and writing are encapsulated within StreamNode. Write path image

Read path image

TimeTick lifetime image Significant changes in dependency order between versions 2.4 and 2.5 imply that the existing upgrade plan cannot meet the requirements.

Upgrade plan

. [Plan 1] Upgrade with downtime

  1. Stop writing on the client side.
  2. Execute flushAll to trigger flushing all data from MQ to disk
  3. Stop the 2.4 version cluster
  4. Start the 2.5 version cluster

[Plan 2] Upgrade with no downtime

  1. Upgrade MixCoord, including RootCoord, QueryCoord, DataCoord, StreamCoord, at this time:

    • In the 2.5 version, RootCoord still needs to execute the TimeTick logic
    • After the upgrade, there are no changes in the read and write paths compared to the 2.4 version
  2. Stop dataNode, the flush process will be terminated, Proxy can still accept all requests.

  3. Start StreamNode, each pchannel will be allocated to the stream node and is prepared for subscription by the stream node client at this point.

  4. Upgrade QueryNode, the new QueryNode will subscribe to vchannel with the stream node client, while the old QueryNode will continue to consume streaming from the MQ client.

  5. Upgrade Proxy, once all proxies are upgraded:

    • Stop sending TT logic on the RootCoord
    • Enable the insertion of data by the stream node client on the Proxy
  6. Stop IndexNode

Pros and cons:

  • We can remove all deprecated codes once taking plan 1, even using local WAL implementation instead of rocksmq directly.
  • Plan 2 will be smoother than Plan 1. However, if a query node rolling upgrade takes a while and a significant number of write requests are received during this period, the growing segments will consume more memory. Consequently, additional memory might be required for the QueryNode.

Version 2.5 servers as a transitional version to 3.0, now we can take plan2 to ensure a smooth upgrade from 2.4, making code cleanup after upgrading to 3.0.

jaime0815 avatar Jul 01 '24 08:07 jaime0815

Make sure upgrade can be smoothly is very important.

To simply the work we need to do, maybe we can keep delegator at querynode and do not move it to stream service.

One problem is how many stream node need to upgrade and it's size.

To upgrade smoothly, streaming node need to assign timestamp and try to merge data from TTstream and Proxy insert.

in 2.5 we can keep delegator still at querynode. and move delegator to streaming node at 3.0

xiaofan-luan avatar Jul 01 '24 15:07 xiaofan-luan

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. Rotten issues close after 30d of inactivity. Reopen the issue with /reopen.

stale[bot] avatar Sep 20 '24 07:09 stale[bot]

/reopen

chyezh avatar Sep 20 '24 08:09 chyezh

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. Rotten issues close after 30d of inactivity. Reopen the issue with /reopen.

stale[bot] avatar Dec 20 '24 01:12 stale[bot]

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. Rotten issues close after 30d of inactivity. Reopen the issue with /reopen.

stale[bot] avatar Feb 23 '25 06:02 stale[bot]

Streaming Service is enabled in latest milvus at master branch. except new search/query arch based on streaming service.

chyezh avatar Mar 07 '25 06:03 chyezh