fluvio icon indicating copy to clipboard operation
fluvio copied to clipboard

[Bug]: fluvio produce is not batching when using interactive mode and `--delivery-semantic at-least-once`

Open morenol opened this issue 2 years ago • 10 comments

morenol avatar Apr 14 '23 18:04 morenol

Hi @morenol , I'm looking into this with fluvio cli. I saw that there is batch created with at-least-once, and multiple backoff after that:

TRACE send{topic=greetings}: fluvio::producer::accumulator: Batch is full. Creating a new batch for partition partition_id=0
2023-05-16T05:09:09.716210Z DEBUG run: fluvio::producer::partition_producer: new batch event
2023-05-16T05:09:09.732963Z TRACE run:send_receive_with_retry{retries=Take { iter: ExponentialBackoff(ExponentialBackoff { base_millis: 20, current_millis: 20, max_delay: Some(200s) }), n: 4 }}:send_and_receive{self=MultiplexerSocket 10}:send_request{self=fd(10) req=RequestMessage { header: RequestHeader { api_key: 0, api_version: 8, correlation_id: 1, client_id: "FLUVIO_CLI" }, request: ProduceRequest { transactional_id: None, isolation: ReadUncommitted, timeout: 1.5s, topics: [TopicProduceData { name: "greetings", partitions: [PartitionProduceData { partition_index: 0, records: RecordSet { batches: [Batch { base_offset: 0, batch_len: 14, header: BatchHeader { partition_leader_epoch: -1, magic: 2, crc: 0, attributes: 0, last_offset_delta: 0, first_timestamp: 1684213749715, max_time_stamp: 1684213749716, producer_id: -1, producer_epoch: -1, first_sequence: -1 }, records: RawRecords(b"\0\0\0\x01\x12\0\x02\0\0\x06Tan\0") }] } }], data: PhantomData<fluvio_protocol::record::data::RecordSet<fluvio_protocol::record::batch::RawRecords>> }], smartmodules: [], data: PhantomData<fluvio_protocol::record::data::RecordSet<fluvio_protocol::record::batch::RawRecords>> } }}: fluvio_protocol::codec: size="encoding data with write size"

Not sure if I got it correctly, or may you give a more detail description? Thanks

TanNgocDo avatar May 16 '23 05:05 TanNgocDo

@TanNgocDo from what I remember. When we use atLeastOnce after every producer push on cli, we wait for the record send output. Therefore we wait for the record be successfully produced before adding more records to the batch

https://github.com/infinyon/fluvio/blob/0377bae0ae3e1b32bb3e4b823bdfbb1f2e88a5a5/crates/fluvio-cli/src/client/produce/mod.rs#L380

So, I think that this is mostly an issue on the CLI impl

morenol avatar May 16 '23 13:05 morenol

Thanks @morenol , it seems to be relating to this: https://github.com/infinyon/fluvio/issues/2808 ? the throughput of atleastone is not good comparing with atmostonce. Of course, will need to test more.

TanNgocDo avatar May 16 '23 13:05 TanNgocDo

For sure, it should be related to that

morenol avatar May 16 '23 13:05 morenol

Looking on that 👍

TanNgocDo avatar May 16 '23 14:05 TanNgocDo

@morenol , as I debug, the wait() method will go to https://github.com/infinyon/fluvio/blob/0377bae0ae3e1b32bb3e4b823bdfbb1f2e88a5a5/crates/fluvio/src/producer/record.rs#L135 Which is blocked and will be notified by : https://github.com/infinyon/fluvio/blob/0377bae0ae3e1b32bb3e4b823bdfbb1f2e88a5a5/crates/fluvio/src/producer/partition_producer.rs#L247 It means this wait() method can be only notified by flush(): https://github.com/infinyon/fluvio/blob/0377bae0ae3e1b32bb3e4b823bdfbb1f2e88a5a5/crates/fluvio/src/producer/partition_producer.rs#L181 And to trigger flush(), the linger_sleep must be surpassed: https://github.com/infinyon/fluvio/blob/0377bae0ae3e1b32bb3e4b823bdfbb1f2e88a5a5/crates/fluvio/src/producer/partition_producer.rs#L143 So whatever the value of linger_time we set, we need to wait till the interval is ended => linger_time is not useful here. To sum up: with current implementation: wait() -> block ->linger_time ends -> flush the batch with only one record -> get response from socket->end of wait(). I'm not sure if this is the right behavior here: at-least-one: should we only send one record per batch = no batching ?, at least the linger_time is not useful with the current implementation of at-least-once(the bigger linger is ,the longer blocking time to send out the batch with one record). So do we need to update the documentation here: https://www.fluvio.io/docs/concepts/delivery-semantics/ ? If we fix it by batching multiple records in at-least-once: should we change from "at-least-once delivery means that for each record handed to the producer potentially multiple attempts are made at delivering it, such that at least one succeeds" to "at-least-once delivery means that for each batch ..." Or keep the current implementation and disable linger_time (for al-least-once).... ?

TanNgocDo avatar May 19 '23 10:05 TanNgocDo

@TanNgocDo I think that the ideal solution should be to fix it by batching, regarding the update of the documentation that seems accurate to me though maybe the fact that for every record it is retried could already imply that. Seems to me that the fact that if it is done per batch or record is an implementation detail and is not needed for the general documentation

morenol avatar May 19 '23 15:05 morenol

Thanks @morenol, Wait() is locking point that prevents other record added to a batch.Shoud we do one of these option:?

  1. remove wait() : this will prevent us from getting future meta data... Or
  2. Store of list output and wait() for all output(similar with reading from file -f) Or
  3. Other solution... ?

TanNgocDo avatar May 21 '23 13:05 TanNgocDo

I think that option 2 should be ok

morenol avatar May 22 '23 19:05 morenol

I have just added a small draft change(to make sure in the right direction) for stdin(need to refactor to reduce redundant code), and in interactive mode, it seems that we don't know how it ends input. Maybe a special key ? or keep the list number of record(add more flag to specify the number of record ?) or control the batchsize to maintain list of records for a batch ?

TanNgocDo avatar May 23 '23 11:05 TanNgocDo