kq: 支持 kafka.Writer BatchTimeout/BatchBytes 配置
kq: 支持 kafka.Writer BatchTimeout/BatchBytes 配置
- 新增 WithBatchTimeout 选项,支持配置批量发送超时时间
- 新增 WithBatchBytes 选项,支持配置批量发送字节大小
- 补充完整的单元测试,覆盖单个配置和组合配置场景
涉及文件:
- kq/pusher.go: 添加两个配置选项和对应的 PushOption 函数
- kq/pusher_test.go: 添加对应的单元测试用例
Greptile Overview
Updated On: 2025-11-10 06:45:25 UTC
Greptile Summary
This PR enhances the Kafka queue (kq) package by adding support for two critical Kafka writer batch configuration options: BatchTimeout and BatchBytes. The changes introduce WithBatchTimeout() and WithBatchBytes() configuration functions that allow users to fine-tune Kafka message batching behavior for performance optimization.
The implementation follows the existing functional options pattern used throughout the codebase. Two new fields are added to the pushOptions struct (batchTimeout and batchBytes), and the NewPusher() function applies these configurations to the underlying kafka.Writer with proper zero-value validation. This allows users to control both the maximum time to wait before sending a batch (BatchTimeout) and the maximum batch size in bytes (BatchBytes), which are essential parameters for balancing throughput and latency in Kafka message production.
The changes are well-integrated with the existing architecture and maintain backward compatibility by using zero-value checks before applying the new configurations.
Important Files Changed
| Filename | Score | Overview |
|---|---|---|
| kq/pusher.go | 4/5 | Added BatchTimeout and BatchBytes configuration support with proper validation |
| kq/pusher_test.go | 5/5 | Added comprehensive unit tests for new batch configuration options |
Confidence score: 4/5
- This PR is safe to merge with low risk of production issues
- Score reflects solid implementation following established patterns, but minor concerns about validation logic and potential edge cases with zero/negative values
- Pay close attention to the validation logic in pusher.go to ensure proper handling of edge cases
Sequence Diagram
sequenceDiagram
participant User
participant Pusher
participant KafkaWriter
participant ChunkExecutor
User->>+Pusher: NewPusher(addrs, topic, WithBatchTimeout(), WithBatchBytes())
Pusher->>Pusher: "Create kafka.Writer with addresses and topic"
Pusher->>Pusher: "Apply batch timeout and batch bytes options"
Pusher->>+ChunkExecutor: "Create ChunkExecutor (if not sync mode)"
ChunkExecutor-->>-Pusher: "Return executor"
Pusher-->>-User: "Return Pusher instance"
User->>+Pusher: Push(ctx, value)
Pusher->>Pusher: "Generate timestamp key"
Pusher->>+Pusher: PushWithKey(ctx, key, value)
Pusher->>Pusher: "Create kafka.Message with key and value"
Pusher->>Pusher: "Inject trace context into message"
alt executor exists (async mode)
Pusher->>+ChunkExecutor: Add(message, messageSize)
ChunkExecutor-->>-Pusher: "Return error or nil"
else sync mode
Pusher->>+KafkaWriter: WriteMessages(ctx, message)
KafkaWriter-->>-Pusher: "Return error or nil"
end
Pusher-->>-Pusher: "Return result"
Pusher-->>-User: "Return error or nil"
Note over ChunkExecutor, KafkaWriter: "ChunkExecutor batches messages and flushes to KafkaWriter"
ChunkExecutor->>+KafkaWriter: WriteMessages(ctx, batchedMessages)
KafkaWriter-->>-ChunkExecutor: "Return error or nil"
User->>+Pusher: Close()
alt executor exists
Pusher->>+ChunkExecutor: Flush()
ChunkExecutor-->>-Pusher: "Complete pending writes"
end
Pusher->>+KafkaWriter: Close()
KafkaWriter-->>-Pusher: "Return error or nil"
Pusher-->>-User: "Return error or nil"