axosyslog icon indicating copy to clipboard operation
axosyslog copied to clipboard

logthrdest: support autoscaling partitions

Open MrAnno opened this issue 1 month ago • 3 comments

When worker-partition-key() is used to categorize messages into different batches, the messages are - by default - hashed into workers, which prevents them from being distributed across workers efficiently, based on load.

The new worker-partition-autoscaling(yes) option uses a 1-minute statistic to help distribute high-traffic partitions among multiple workers, allowing each worker to maximize its batch size.

When using this autoscaling option, it is recommended to oversize the number of workers: set it higher than the expected number of partitions.


Upper limit on the partitions table and falling back to hashing when misconfigured?

MrAnno avatar Nov 24 '25 14:11 MrAnno

This Pull Request introduces config grammar changes

axoflow/65c732858e01325a29dec2576cba7c5a408421ee -> MrAnno/partition-stats

--- a/destination
+++ b/destination

 axosyslog-otlp(
+    worker-partition-autoscaling(<yesno>)
+    worker-partition-autoscaling-wfo(<positive-integer>)
 )

 bigquery(
+    worker-partition-autoscaling(<yesno>)
+    worker-partition-autoscaling-wfo(<positive-integer>)
 )

 clickhouse(
+    worker-partition-autoscaling(<yesno>)
+    worker-partition-autoscaling-wfo(<positive-integer>)
 )

 google-pubsub-grpc(
+    worker-partition-autoscaling(<yesno>)
+    worker-partition-autoscaling-wfo(<positive-integer>)
 )

 http(
+    worker-partition-autoscaling(<yesno>)
+    worker-partition-autoscaling-wfo(<positive-integer>)
 )

 kafka-c(
+    worker-partition-autoscaling(<yesno>)
+    worker-partition-autoscaling-wfo(<positive-integer>)
 )

 loki(
+    worker-partition-autoscaling(<yesno>)
+    worker-partition-autoscaling-wfo(<positive-integer>)
 )

 mongodb(
+    worker-partition-autoscaling(<yesno>)
+    worker-partition-autoscaling-wfo(<positive-integer>)
 )

 opentelemetry(
+    worker-partition-autoscaling(<yesno>)
+    worker-partition-autoscaling-wfo(<positive-integer>)
 )

 redis(
+    worker-partition-autoscaling(<yesno>)
+    worker-partition-autoscaling-wfo(<positive-integer>)
 )

 syslog-ng-otlp(
+    worker-partition-autoscaling(<yesno>)
+    worker-partition-autoscaling-wfo(<positive-integer>)
 )

github-actions[bot] avatar Nov 24 '25 14:11 github-actions[bot]

http(
      url("http://localhost:8080")
      method("POST")
      batch-lines(10000)
      batch-timeout(2000)
      workers(10)
      worker-partition-key("$PROGRAM")
      flush-on-worker-key-change(yes)
      worker-partition-autoscaling(yes)
);

syslogng_output_batch_size_events_bucket{le="2"} 32
syslogng_output_batch_size_events_bucket{le="8"} 3
syslogng_output_batch_size_events_bucket{le="16"} 3
syslogng_output_batch_size_events_bucket{le="32"} 1
syslogng_output_batch_size_events_bucket{le="256"} 3
syslogng_output_batch_size_events_bucket{le="512"} 2
syslogng_output_batch_size_events_bucket{le="2048"} 1
syslogng_output_batch_size_events_bucket{le="4096"} 35
syslogng_output_batch_size_events_bucket{le="8192"} 494

syslogng_memory_queue_processed_events_total{worker="7"} 52375
syslogng_memory_queue_processed_events_total{worker="8"} 52384
syslogng_memory_queue_processed_events_total{worker="9"} 52385
syslogng_memory_queue_processed_events_total{worker="0"} 52390
syslogng_memory_queue_processed_events_total{worker="1"} 52378
syslogng_memory_queue_processed_events_total{worker="2"} 52507
syslogng_memory_queue_processed_events_total{worker="3"} 1197297
syslogng_memory_queue_processed_events_total{worker="4"} 1254968
syslogng_memory_queue_processed_events_total{worker="5"} 1207284
syslogng_memory_queue_processed_events_total{worker="6"} 1159332

MrAnno avatar Nov 26 '25 23:11 MrAnno

My valgrind is broken, I'll ask someone to help me check memory stuff together.

MrAnno avatar Nov 27 '25 09:11 MrAnno