quickwit
quickwit copied to clipboard
Indexer consumption Kafka error
Describe the bug I started 10 indexer nodes to consume Kafka data and reported the following error:
2022-05-27T06:28:14.854Z INFO {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=418922}:{index=clickhouse gen=319}:{actor=Packager}: quickwit_actors::sync_actor: actor-exit actor_id=Packager-nameless-H4KW exit_status=Killed
2022-05-27T06:28:15.217Z ERROR {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=419295}: quickwit_actors::actor_handle: actor-exit-without-success actor="KafkaSource-long-9JuJ" exit_status=DownstreamClosed
2022-05-27T06:28:15.217Z ERROR {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=419295}: quickwit_actors::actor_handle: actor-exit-without-success actor="Indexer-red-PtaJ" exit_status=Failure(Failed to add document.
Caused by:
An error occurred in a thread: 'An index writer was killed.. A worker thread encounterred an error (io::Error most likely) or panicked.')
2022-05-27T06:28:15.217Z ERROR {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=419295}: quickwit_actors::actor_handle: actor-exit-without-success actor="Packager-nameless-H4KW" exit_status=Killed
2022-05-27T06:28:15.217Z ERROR {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=419295}: quickwit_actors::actor_handle: actor-exit-without-success actor="Uploader-blue-HXWt" exit_status=Killed
2022-05-27T06:28:15.217Z ERROR {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=419295}: quickwit_actors::actor_handle: actor-exit-without-success actor="Publisher-icy-tc56" exit_status=Failure(Failed to publish splits.
Caused by:
0: Publish checkpoint delta overlaps with the current checkpoint: IncompatibleCheckpointDelta { partition_id: PartitionId("0000000000"), current_position: Offset("00000000025977904298"), delta_position_from: Offset("00000000025977865375") }.
1: IncompatibleChkptDelta at partition: PartitionId("0000000000") cur_pos:Offset("00000000025977904298") delta_pos:Offset("00000000025977865375"))
2022-05-27T06:28:15.217Z ERROR {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=419295}: quickwit_actors::actor_handle: actor-exit-without-success actor="GarbageCollector-snowy-TxZ8" exit_status=Killed
2022-05-27T06:28:15.217Z ERROR {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=419295}: quickwit_actors::actor_handle: actor-exit-without-success actor="MergeSplitDownloader-weathered-FyOq" exit_status=Killed
Configuration:
- quickwit 0.2.1
- quickwit.yaml
version: 0
node_id: $POD_NAME
listen_address: 0.0.0.0
rest_listen_port: 7280
#peer_seeds:
# -
# -
#data_dir: /data/quickwit
metastore_uri: postgres://quickwit:[email protected]:5432/quickwit
default_index_root_uri: s3://quickwit/indexes/
- index.json
version: 0
index_id: clickhouse
index_uri: s3://quickwit/indexes/clickhouse
doc_mapping:
field_mappings:
- name: id
type: u64
fast: true
- name: created_at
type: i64
fast: true
- name: _log_
type: text
tokenizer: default
record: position
indexing_settings:
timestamp_field: created_at
search_settings:
default_search_fields: [_log_]
sources:
- source_id: quickwit
source_type: kafka
params:
topic: production
client_params:
group.id: quickwit
bootstrap.servers: 192.168.100.1:9092,192.168.100.2:9092,192.168.100.3:9082
Hi @gnufree,
I started 10 indexer nodes
Unfortunately, as of today, Quickwit does not support multiple indexers working in parallel. You have to limit yourself to only one indexer. How much data are you trying to ingest?
We will support multiple indexers via manual static partition assignment in Quickwit 0.4. Full automated distributed indexing will come post Quickwit 0.5.
@guilload ok,Can an indexer consume records with the same ID?
@guilload I received the following error reports:
2022-05-28T01:02:44.067Z ERROR {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=24262}: quickwit_actors::actor_handle: actor-timeout actor="GarbageCollector-dawn-0yJO"
2022-05-28T01:02:44.067Z ERROR {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=24262}: quickwit_indexing::actors::indexing_pipeline: indexing pipeline error. index=clickhouse gen=4 healthy=["KafkaSource-purple-VDk3", "Indexer-lively-P1yT", "Packager-old-OvHo", "Uploader-floral-sw22", "Publisher-red-zlYV", "MergePlanner-purple-Lcpd", "MergeSplitDownloader-misty-tgpF", "MergeExecutor-nameless-VDnv", "MergePackager-misty-Q4HC", "MergeUploader-aged-Xx8m", "MergePublisher-lively-puGE"] failure_or_unhealthy_actors=["GarbageCollector-dawn-0yJO"] success=[]
2022-05-28T01:05:04.553Z WARN {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=24268}:{index=clickhouse gen=4}:{actor=Indexer}:{msg_id=525}: quickwit_indexing::actors::indexer: err=RequiredFastField("id")
@gnufree Concerning the error, currently a field marked as "fast" must be present in the document. If not, quickwit will not index the document and log this error RequiredFastField("id").
Can an indexer consume records with the same ID?
Yes, no problem with that.
@gnufree Concerning the error, currently a field marked as "fast" must be present in the document. If not, quickwit will not index the document and log this error
RequiredFastField("id").
My index field ID has been set to fastfield, but I feel that indexes cannot consume my Kafka data. What is the reason?
My data volume generates 30million pieces of data per minute。
@guilload @fmassot
@guilload Now I feel that indexes can not consume so much data, resulting in data loss. What parameters should I adjust to meet my consumption needs.
What's more, I can't see the consumption progress of indexer in Kafka. What's the reason.
I didn't do benchmarks with kafka, but with ingestion from local fs. So take the following with a grain of salt, in case the main bottleneck is coming from the kafka integration.
30 million per minute is a lot of data. It depends on the actual size of the documents and what fields are indexed, but log like data is closer to 20 million per minute on my machine (default settings, ingest command).
There are settings to assign more resources for faster indexing ( resources.num_threads ):
https://quickwit.io/docs/main-branch/configuration/index-config#indexing-settings
I would recommend to multiply resources.heap_size by resources.num_threads.
The speed increase will not scale 1:1. With an ingest command I get 120Mb/s instead of 80Mb/s.
indexing_settings:
resources:
num_threads: 8
heap_size: 16G
With a high enough number of threads, the indexing will not be the bottleneck anymore, but the some other parts of the indexing pipeline, e.g. the JSON parser.
I didn't do benchmarks with kafka, but with ingestion from local fs. So take the following with a grain of salt, in case the main bottleneck is coming from the kafka integration.
30 million per minute is a lot of data. It depends on the actual size of the documents and what fields are indexed, but log like data is closer to 20 million per minute on my machine (default settings, ingest command).
There are settings to assign more resources for faster indexing (
resources.num_threads): https://quickwit.io/docs/main-branch/configuration/index-config#indexing-settingsI would recommend to multiply
resources.heap_sizebyresources.num_threads.The speed increase will not scale 1:1. With an ingest command I get 120Mb/s instead of 80Mb/s.
indexing_settings: resources: num_threads: 8 heap_size: 16GWith a high enough number of threads, the indexing will not be the bottleneck anymore, but the some other parts of the indexing pipeline, e.g. the JSON parser.
@PSeitz I added these two parameters, but the effect is not obvious, and the consumption ability still can not keep up
What's more, I can't see the consumption progress of indexer in Kafka. What's the reason.
In the logs, you should see the indexer publish splits regularly with the associated checkpoint deltas. What do you observe?
Are splits produced at all? What is the output of the commands quickwit index describe --index clickhouse and aws s3 ls s3://quickwit/indexes/clickhouse?
Are you on our Discord server by any chance? Ping us directly so we can troubleshoot your issue directly.
What's more, I can't see the consumption progress of indexer in Kafka. What's the reason.
In the logs, you should see the indexer publish splits regularly with the associated checkpoint deltas. What do you observe?
Are splits produced at all? What is the output of the commands
quickwit index describe --index clickhouseandaws s3 ls s3://quickwit/indexes/clickhouse?Are you on our Discord server by any chance? Ping us directly so we can troubleshoot your issue directly.
This is what I observed in the log
2022-06-01T01:16:35.376Z WARN {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=1572}:{index=clickhouse gen=1}:{actor=Indexer}:{msg_id=344322}: quickwit_indexing::actors::indexer: err=RequiredFastField("id")
2022-06-01T01:16:35.376Z WARN {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=1572}:{index=clickhouse gen=1}:{actor=Indexer}:{msg_id=344322}: quickwit_indexing::actors::indexer: err=RequiredFastField("id")
2022-06-01T01:16:35.376Z WARN {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=1572}:{index=clickhouse gen=1}:{actor=Indexer}:{msg_id=344322}: quickwit_indexing::actors::indexer: err=RequiredFastField("id")
2022-06-01T01:16:42.880Z INFO {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=1572}:{index=clickhouse gen=1}:{actor=Packager}:{msg_id=375293 num_splits=1}: quickwit_indexing::actors::packager: create-packaged-split split_id="01G4ED9BBKT9VDCV6J73FYBS83"
2022-06-01T01:16:42.885Z INFO {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=1572}:{index=clickhouse gen=1}:{actor=Uploader}:{msg_id=791761 num_splits=1}: quickwit_indexing::actors::uploader: start-stage-and-store-splits split_ids=["01G4ED9BBKT9VDCV6J73FYBS83"]
2022-06-01T01:16:42.885Z INFO {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=1572}:{index=clickhouse gen=1}:{actor=Uploader}:{msg_id=791761 num_splits=1}: quickwit_indexing::actors::uploader: staging-split split_id="01G4ED9BBKT9VDCV6J73FYBS83"
2022-06-01T01:16:42.886Z INFO {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=1572}:{index=clickhouse gen=1}:{actor=Uploader}:{msg_id=791761 num_splits=1}: quickwit_indexing::actors::uploader: storing-split split_id="01G4ED9BBKT9VDCV6J73FYBS83"
2022-06-01T01:16:42.886Z INFO {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=1572}:{index=clickhouse gen=1}:{actor=Uploader}:{msg_id=791761 num_splits=1}: quickwit_indexing::split_store::indexing_split_store: store-split-remote-start
2022-06-01T01:16:47.470Z INFO {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=1572}:{index=clickhouse gen=1}:{actor=Uploader}:{msg_id=791761 num_splits=1}: quickwit_indexing::split_store::indexing_split_store: store-split-remote-success split_size_in_megabytes=289 elapsed_secs=4.5834837 throughput_mb_s=63.05248 is_mature=false
2022-06-01T01:16:47.470Z INFO {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=1572}:{index=clickhouse gen=1}:{actor=Uploader}:{msg_id=791761 num_splits=1}: quickwit_indexing::split_store::indexing_split_store: store-in-cache
2022-06-01T01:16:47.470Z INFO {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=1572}:{index=clickhouse gen=1}:{actor=Uploader}:{msg_id=791761 num_splits=1}: quickwit_indexing::actors::uploader: success-stage-and-store-splits
2022-06-01T01:16:47.470Z INFO {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=1572}:{index=clickhouse gen=1}:{actor=Publisher}:{msg_id=744185}: quickwit_indexing::actors::publisher: new-split-start
2022-06-01T01:16:47.472Z INFO {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=1572}:{index=clickhouse gen=1}:{actor=Publisher}:{msg_id=744185}: quickwit_metastore::checkpoint: delta=∆(0000000000:(00000000031106902869..00000000031107008375] 0000000001:(00000000031111582419..00000000031129299110] 0000000002:(00000000031120824832..00000000031120858956] 0000000003:(00000000031115034958..00000000031115097287] 0000000004:(00000000031119733961..00000000031119738598] 0000000005:(00000000031104566660..00000000031104658488] 0000000006:(00000000031103048644..00000000031103082569] 0000000007:(00000000031111247186..00000000031111278847] 0000000008:(00000000031106344715..00000000031106411265] 0000000010:(00000000031085478391..00000000031085563334] 0000000011:(00000000031103821028..00000000031103846833] 0000000012:(00000000031103857864..00000000031103894198] 0000000013:(00000000031123071896..00000000031123146196] 0000000015:(00000000031090058417..00000000031090156395] 0000000016:(00000000031101406666..00000000031101441846] 0000000017:(00000000031122896164..00000000031122926365] 0000000018:(00000000031100790813..00000000031100867681]) checkpoint=Ckpt(0000000000:00000000031106902869 0000000001:00000000031111582419 0000000002:00000000031120824832 0000000003:00000000031115034958 0000000004:00000000031119733961 0000000005:00000000031104566660 0000000006:00000000031103048644 0000000007:00000000031111247186 0000000008:00000000031106344715 0000000009:00000000031113992074 0000000010:00000000031085478391 0000000011:00000000031103821028 0000000012:00000000031103857864 0000000013:00000000031123071896 0000000014:00000000031111503844 0000000015:00000000031090058417 0000000016:00000000031101406666 0000000017:00000000031122896164 0000000018:00000000031100790813 0000000019:00000000031092082267)
This is my command output.
2022-06-01T01:20:22.648Z INFO quickwit: version="0.2.1" commit="a857636"
2022-06-01T01:20:22.650Z INFO quickwit_config::config: Setting data dir path from CLI args or environment variable data_dir_path=/quickwit/qwdata
2022-06-01T01:20:22.650Z WARN quickwit_config::config: Seed list is empty.
2022-06-01T01:20:22.650Z INFO quickwit_cli: Loaded Quickwit config. config_uri=file:///quickwit/config/quickwit.yaml config=QuickwitConfig { version: 0, node_id: "indexex-1", listen_address: "0.0.0.0", rest_listen_port: 7280, peer_seeds: [], data_dir_path: "/quickwit/qwdata", metastore_uri: "postgres://quickwit:[email protected]:5432/quickwit", default_index_root_uri: "s3://quickwit/indexes/", indexer_config: IndexerConfig { split_store_max_num_bytes: Byte(100000000000), split_store_max_num_splits: 1000 }, searcher_config: SearcherConfig { fast_field_cache_capacity: Byte(1000000000), split_footer_cache_capacity: Byte(500000000), max_num_concurrent_split_streams: 100 }, storage_config: None }
2022-06-01T01:20:22.693Z INFO quickwit_metastore::metastore::postgresql_metastore: The connection pool works fine
2022-06-01T01:20:22.695Z INFO quickwit_metastore::metastore::postgresql_metastore: Database migrations succeeded migrations_log=""
1. General infos
===============================================================================
Index id: clickhouse
Index uri: s3://quickwit/indexes/clickhouse
Number of published splits: 134
Number of published documents: 2184678636
Size of published splits: 641491 MB
Timestamp field: created_at
Timestamp range: Some(1653885754000) -> Some(1654044706000)
2. Statistics on splits
===============================================================================
Document count stats:
Mean ± σ in [min … max]: 16303572 ± 2660112.8 in [665168 … 19522433]
Quantiles [1%, 25%, 50%, 75%, 99%]: [755297.2, 16461736, 16461736, 17323376, 17323376]
Size in MB stats:
Mean ± σ in [min … max]: 4787.246 ± 769.2989 in [205 … 5698]
Quantiles [1%, 25%, 50%, 75%, 99%]: [233.98, 4888.5, 4888.5, 5008.25, 5008.25]
Everything looks good except for the RequiredFastField("id") errors. I suspect some docs are missing the id field. As far as indexing throughput, it may be the case that the volume of incoming data is too high for a single indexer. Can you try Quickwit 0.3? The JSON parser is more performant in that version. I doubt it'll be sufficient, but it's worth trying.
@gnufree it seems like your documents are pretty small, can you share an extract of the documents you are indexing?
Also, can you share your index config too?
it seems like your documents are pretty small, can you share an extract of the documents you are indexing?
Also, can you share your index config too? @fmassot This is my index configuration:
version: 0
index_id: clickhouse
index_uri: s3://quickwit/indexes/clickhouse
doc_mapping:
field_mappings:
- name: id
type: u64
fast: true
- name: created_at
type: i64
fast: true
- name: _cluster_
type: text
tokenizer: raw
- name: _namespace_
type: text
tokenizer: raw
- name: _container_name_
type: text
tokenizer: raw
- name: _pod_name_
type: text
tokenizer: raw
- name: _log_
type: text
tokenizer: default
record: position
indexing_settings:
timestamp_field: created_at
resources:
num_threads: 8
heap_size: 16G
search_settings:
default_search_fields: [_log_]
sources:
- source_id: quickwit-indexer
source_type: kafka
params:
topic: production
client_params:
group.id: quickwit-indexer
bootstrap.servers: 192.168.100.1:9092,192.168.100.2:9092,192.168.100.3:9082
Everything looks good except for the
RequiredFastField("id")errors. I suspect some docs are missing theidfield. As far as indexing throughput, it may be the case that the volume of incoming data is too high for a single indexer. Can you tryQuickwit 0.3? The JSON parser is more performant in that version. I doubt it'll be sufficient, but it's worth trying.
@guilload 0.3 not allowed to use_ Field created?
Command failed: Failed to parse YAML index config file.
Caused by:
doc_mapping.field_mappings: Field name `_cluster_` may not start by _ at line 9 column 5
@gnufree. Yes, fields starting by _ are forbidden. https://quickwit.io/docs/configuration/index-config#field-name-validation-rules
I recommend against using
num_threads: 8
heap_size: 16G
If you are in the cloud, make sure to use a node with a local SSD. With EBS, the network will rapidly become your limiting factor.
We may have different workaround to increase your indexing throughput, if you are ingesting from kafka. I'm happy to discuss those if you are ok to have a quick call.