quickwit icon indicating copy to clipboard operation
quickwit copied to clipboard

Indexer consumption Kafka error

Open gnufree opened this issue 3 years ago • 19 comments

Describe the bug I started 10 indexer nodes to consume Kafka data and reported the following error:

2022-05-27T06:28:14.854Z  INFO {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=418922}:{index=clickhouse gen=319}:{actor=Packager}: quickwit_actors::sync_actor: actor-exit actor_id=Packager-nameless-H4KW exit_status=Killed
2022-05-27T06:28:15.217Z ERROR {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=419295}: quickwit_actors::actor_handle: actor-exit-without-success actor="KafkaSource-long-9JuJ" exit_status=DownstreamClosed
2022-05-27T06:28:15.217Z ERROR {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=419295}: quickwit_actors::actor_handle: actor-exit-without-success actor="Indexer-red-PtaJ" exit_status=Failure(Failed to add document.

Caused by:
    An error occurred in a thread: 'An index writer was killed.. A worker thread encounterred an error (io::Error most likely) or panicked.')
2022-05-27T06:28:15.217Z ERROR {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=419295}: quickwit_actors::actor_handle: actor-exit-without-success actor="Packager-nameless-H4KW" exit_status=Killed
2022-05-27T06:28:15.217Z ERROR {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=419295}: quickwit_actors::actor_handle: actor-exit-without-success actor="Uploader-blue-HXWt" exit_status=Killed
2022-05-27T06:28:15.217Z ERROR {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=419295}: quickwit_actors::actor_handle: actor-exit-without-success actor="Publisher-icy-tc56" exit_status=Failure(Failed to publish splits.

Caused by:
    0: Publish checkpoint delta overlaps with the current checkpoint: IncompatibleCheckpointDelta { partition_id: PartitionId("0000000000"), current_position: Offset("00000000025977904298"), delta_position_from: Offset("00000000025977865375") }.
    1: IncompatibleChkptDelta at partition: PartitionId("0000000000") cur_pos:Offset("00000000025977904298") delta_pos:Offset("00000000025977865375"))
2022-05-27T06:28:15.217Z ERROR {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=419295}: quickwit_actors::actor_handle: actor-exit-without-success actor="GarbageCollector-snowy-TxZ8" exit_status=Killed
2022-05-27T06:28:15.217Z ERROR {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=419295}: quickwit_actors::actor_handle: actor-exit-without-success actor="MergeSplitDownloader-weathered-FyOq" exit_status=Killed

Configuration:

  1. quickwit 0.2.1
  2. quickwit.yaml
version: 0
node_id: $POD_NAME
listen_address: 0.0.0.0
rest_listen_port: 7280
#peer_seeds:
#  -
#  -
#data_dir: /data/quickwit
metastore_uri: postgres://quickwit:[email protected]:5432/quickwit
default_index_root_uri: s3://quickwit/indexes/
  1. index.json
version: 0

index_id: clickhouse

index_uri: s3://quickwit/indexes/clickhouse

doc_mapping:
  field_mappings:
    - name: id
      type: u64
      fast: true
    - name: created_at
      type: i64
      fast: true
    - name: _log_
      type: text
      tokenizer: default
      record: position

indexing_settings:
  timestamp_field: created_at

search_settings:
  default_search_fields: [_log_]

sources:
  - source_id: quickwit
    source_type: kafka
    params:
      topic: production
      client_params:
        group.id: quickwit
        bootstrap.servers: 192.168.100.1:9092,192.168.100.2:9092,192.168.100.3:9082

gnufree avatar May 27 '22 06:05 gnufree

Hi @gnufree,

I started 10 indexer nodes

Unfortunately, as of today, Quickwit does not support multiple indexers working in parallel. You have to limit yourself to only one indexer. How much data are you trying to ingest?

We will support multiple indexers via manual static partition assignment in Quickwit 0.4. Full automated distributed indexing will come post Quickwit 0.5.

guilload avatar May 27 '22 11:05 guilload

@guilload ok,Can an indexer consume records with the same ID?

gnufree avatar May 28 '22 01:05 gnufree

@guilload I received the following error reports:

2022-05-28T01:02:44.067Z ERROR {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=24262}: quickwit_actors::actor_handle: actor-timeout actor="GarbageCollector-dawn-0yJO"
2022-05-28T01:02:44.067Z ERROR {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=24262}: quickwit_indexing::actors::indexing_pipeline: indexing pipeline error. index=clickhouse gen=4 healthy=["KafkaSource-purple-VDk3", "Indexer-lively-P1yT", "Packager-old-OvHo", "Uploader-floral-sw22", "Publisher-red-zlYV", "MergePlanner-purple-Lcpd", "MergeSplitDownloader-misty-tgpF", "MergeExecutor-nameless-VDnv", "MergePackager-misty-Q4HC", "MergeUploader-aged-Xx8m", "MergePublisher-lively-puGE"] failure_or_unhealthy_actors=["GarbageCollector-dawn-0yJO"] success=[]

2022-05-28T01:05:04.553Z WARN {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=24268}:{index=clickhouse gen=4}:{actor=Indexer}:{msg_id=525}: quickwit_indexing::actors::indexer: err=RequiredFastField("id")

gnufree avatar May 28 '22 01:05 gnufree

@gnufree Concerning the error, currently a field marked as "fast" must be present in the document. If not, quickwit will not index the document and log this error RequiredFastField("id").

fmassot avatar May 28 '22 22:05 fmassot

Can an indexer consume records with the same ID?

Yes, no problem with that.

fmassot avatar May 28 '22 22:05 fmassot

@gnufree Concerning the error, currently a field marked as "fast" must be present in the document. If not, quickwit will not index the document and log this error RequiredFastField("id").

My index field ID has been set to fastfield, but I feel that indexes cannot consume my Kafka data. What is the reason?

My data volume generates 30million pieces of data per minute。

gnufree avatar May 30 '22 01:05 gnufree

@guilload @fmassot

gnufree avatar May 30 '22 01:05 gnufree

@guilload Now I feel that indexes can not consume so much data, resulting in data loss. What parameters should I adjust to meet my consumption needs.

gnufree avatar May 30 '22 01:05 gnufree

What's more, I can't see the consumption progress of indexer in Kafka. What's the reason.

gnufree avatar May 30 '22 02:05 gnufree

I didn't do benchmarks with kafka, but with ingestion from local fs. So take the following with a grain of salt, in case the main bottleneck is coming from the kafka integration.

30 million per minute is a lot of data. It depends on the actual size of the documents and what fields are indexed, but log like data is closer to 20 million per minute on my machine (default settings, ingest command).

There are settings to assign more resources for faster indexing ( resources.num_threads ): https://quickwit.io/docs/main-branch/configuration/index-config#indexing-settings

I would recommend to multiply resources.heap_size by resources.num_threads.

The speed increase will not scale 1:1. With an ingest command I get 120Mb/s instead of 80Mb/s.

indexing_settings:
  resources:
    num_threads: 8
    heap_size: 16G

With a high enough number of threads, the indexing will not be the bottleneck anymore, but the some other parts of the indexing pipeline, e.g. the JSON parser.

PSeitz avatar May 30 '22 03:05 PSeitz

I didn't do benchmarks with kafka, but with ingestion from local fs. So take the following with a grain of salt, in case the main bottleneck is coming from the kafka integration.

30 million per minute is a lot of data. It depends on the actual size of the documents and what fields are indexed, but log like data is closer to 20 million per minute on my machine (default settings, ingest command).

There are settings to assign more resources for faster indexing ( resources.num_threads ): https://quickwit.io/docs/main-branch/configuration/index-config#indexing-settings

I would recommend to multiply resources.heap_size by resources.num_threads.

The speed increase will not scale 1:1. With an ingest command I get 120Mb/s instead of 80Mb/s.

indexing_settings:
  resources:
    num_threads: 8
    heap_size: 16G

With a high enough number of threads, the indexing will not be the bottleneck anymore, but the some other parts of the indexing pipeline, e.g. the JSON parser.

@PSeitz I added these two parameters, but the effect is not obvious, and the consumption ability still can not keep up

gnufree avatar May 30 '22 06:05 gnufree

What's more, I can't see the consumption progress of indexer in Kafka. What's the reason.

In the logs, you should see the indexer publish splits regularly with the associated checkpoint deltas. What do you observe?

Are splits produced at all? What is the output of the commands quickwit index describe --index clickhouse and aws s3 ls s3://quickwit/indexes/clickhouse?

Are you on our Discord server by any chance? Ping us directly so we can troubleshoot your issue directly.

guilload avatar May 30 '22 20:05 guilload

What's more, I can't see the consumption progress of indexer in Kafka. What's the reason.

In the logs, you should see the indexer publish splits regularly with the associated checkpoint deltas. What do you observe?

Are splits produced at all? What is the output of the commands quickwit index describe --index clickhouse and aws s3 ls s3://quickwit/indexes/clickhouse?

Are you on our Discord server by any chance? Ping us directly so we can troubleshoot your issue directly.

This is what I observed in the log

2022-06-01T01:16:35.376Z  WARN {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=1572}:{index=clickhouse gen=1}:{actor=Indexer}:{msg_id=344322}: quickwit_indexing::actors::indexer: err=RequiredFastField("id")
2022-06-01T01:16:35.376Z  WARN {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=1572}:{index=clickhouse gen=1}:{actor=Indexer}:{msg_id=344322}: quickwit_indexing::actors::indexer: err=RequiredFastField("id")
2022-06-01T01:16:35.376Z  WARN {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=1572}:{index=clickhouse gen=1}:{actor=Indexer}:{msg_id=344322}: quickwit_indexing::actors::indexer: err=RequiredFastField("id")
2022-06-01T01:16:42.880Z  INFO {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=1572}:{index=clickhouse gen=1}:{actor=Packager}:{msg_id=375293 num_splits=1}: quickwit_indexing::actors::packager: create-packaged-split split_id="01G4ED9BBKT9VDCV6J73FYBS83"
2022-06-01T01:16:42.885Z  INFO {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=1572}:{index=clickhouse gen=1}:{actor=Uploader}:{msg_id=791761 num_splits=1}: quickwit_indexing::actors::uploader: start-stage-and-store-splits split_ids=["01G4ED9BBKT9VDCV6J73FYBS83"]
2022-06-01T01:16:42.885Z  INFO {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=1572}:{index=clickhouse gen=1}:{actor=Uploader}:{msg_id=791761 num_splits=1}: quickwit_indexing::actors::uploader: staging-split split_id="01G4ED9BBKT9VDCV6J73FYBS83"
2022-06-01T01:16:42.886Z  INFO {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=1572}:{index=clickhouse gen=1}:{actor=Uploader}:{msg_id=791761 num_splits=1}: quickwit_indexing::actors::uploader: storing-split split_id="01G4ED9BBKT9VDCV6J73FYBS83"
2022-06-01T01:16:42.886Z  INFO {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=1572}:{index=clickhouse gen=1}:{actor=Uploader}:{msg_id=791761 num_splits=1}: quickwit_indexing::split_store::indexing_split_store: store-split-remote-start
2022-06-01T01:16:47.470Z  INFO {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=1572}:{index=clickhouse gen=1}:{actor=Uploader}:{msg_id=791761 num_splits=1}: quickwit_indexing::split_store::indexing_split_store: store-split-remote-success split_size_in_megabytes=289 elapsed_secs=4.5834837 throughput_mb_s=63.05248 is_mature=false
2022-06-01T01:16:47.470Z  INFO {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=1572}:{index=clickhouse gen=1}:{actor=Uploader}:{msg_id=791761 num_splits=1}: quickwit_indexing::split_store::indexing_split_store: store-in-cache
2022-06-01T01:16:47.470Z  INFO {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=1572}:{index=clickhouse gen=1}:{actor=Uploader}:{msg_id=791761 num_splits=1}: quickwit_indexing::actors::uploader: success-stage-and-store-splits
2022-06-01T01:16:47.470Z  INFO {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=1572}:{index=clickhouse gen=1}:{actor=Publisher}:{msg_id=744185}: quickwit_indexing::actors::publisher: new-split-start
2022-06-01T01:16:47.472Z  INFO {actor=quickwit_indexing::actors::indexing_server::IndexingServer}:{msg_id=1}::{msg_id=1572}:{index=clickhouse gen=1}:{actor=Publisher}:{msg_id=744185}: quickwit_metastore::checkpoint: delta=∆(0000000000:(00000000031106902869..00000000031107008375] 0000000001:(00000000031111582419..00000000031129299110] 0000000002:(00000000031120824832..00000000031120858956] 0000000003:(00000000031115034958..00000000031115097287] 0000000004:(00000000031119733961..00000000031119738598] 0000000005:(00000000031104566660..00000000031104658488] 0000000006:(00000000031103048644..00000000031103082569] 0000000007:(00000000031111247186..00000000031111278847] 0000000008:(00000000031106344715..00000000031106411265] 0000000010:(00000000031085478391..00000000031085563334] 0000000011:(00000000031103821028..00000000031103846833] 0000000012:(00000000031103857864..00000000031103894198] 0000000013:(00000000031123071896..00000000031123146196] 0000000015:(00000000031090058417..00000000031090156395] 0000000016:(00000000031101406666..00000000031101441846] 0000000017:(00000000031122896164..00000000031122926365] 0000000018:(00000000031100790813..00000000031100867681]) checkpoint=Ckpt(0000000000:00000000031106902869 0000000001:00000000031111582419 0000000002:00000000031120824832 0000000003:00000000031115034958 0000000004:00000000031119733961 0000000005:00000000031104566660 0000000006:00000000031103048644 0000000007:00000000031111247186 0000000008:00000000031106344715 0000000009:00000000031113992074 0000000010:00000000031085478391 0000000011:00000000031103821028 0000000012:00000000031103857864 0000000013:00000000031123071896 0000000014:00000000031111503844 0000000015:00000000031090058417 0000000016:00000000031101406666 0000000017:00000000031122896164 0000000018:00000000031100790813 0000000019:00000000031092082267)

This is my command output.

2022-06-01T01:20:22.648Z  INFO quickwit: version="0.2.1" commit="a857636"
2022-06-01T01:20:22.650Z  INFO quickwit_config::config: Setting data dir path from CLI args or environment variable data_dir_path=/quickwit/qwdata
2022-06-01T01:20:22.650Z  WARN quickwit_config::config: Seed list is empty.
2022-06-01T01:20:22.650Z  INFO quickwit_cli: Loaded Quickwit config. config_uri=file:///quickwit/config/quickwit.yaml config=QuickwitConfig { version: 0, node_id: "indexex-1", listen_address: "0.0.0.0", rest_listen_port: 7280, peer_seeds: [], data_dir_path: "/quickwit/qwdata", metastore_uri: "postgres://quickwit:[email protected]:5432/quickwit", default_index_root_uri: "s3://quickwit/indexes/", indexer_config: IndexerConfig { split_store_max_num_bytes: Byte(100000000000), split_store_max_num_splits: 1000 }, searcher_config: SearcherConfig { fast_field_cache_capacity: Byte(1000000000), split_footer_cache_capacity: Byte(500000000), max_num_concurrent_split_streams: 100 }, storage_config: None }
2022-06-01T01:20:22.693Z  INFO quickwit_metastore::metastore::postgresql_metastore: The connection pool works fine
2022-06-01T01:20:22.695Z  INFO quickwit_metastore::metastore::postgresql_metastore: Database migrations succeeded migrations_log=""

1. General infos
===============================================================================
Index id:                           clickhouse
Index uri:                          s3://quickwit/indexes/clickhouse
Number of published splits:         134
Number of published documents:      2184678636
Size of published splits:           641491 MB
Timestamp field:                    created_at
Timestamp range:                    Some(1653885754000) -> Some(1654044706000)

2. Statistics on splits
===============================================================================
Document count stats:
Mean ± σ in [min … max]:            16303572 ± 2660112.8 in [665168 … 19522433]
Quantiles [1%, 25%, 50%, 75%, 99%]: [755297.2, 16461736, 16461736, 17323376, 17323376]

Size in MB stats:
Mean ± σ in [min … max]:            4787.246 ± 769.2989 in [205 … 5698]
Quantiles [1%, 25%, 50%, 75%, 99%]: [233.98, 4888.5, 4888.5, 5008.25, 5008.25]

gnufree avatar Jun 01 '22 01:06 gnufree

Everything looks good except for the RequiredFastField("id") errors. I suspect some docs are missing the id field. As far as indexing throughput, it may be the case that the volume of incoming data is too high for a single indexer. Can you try Quickwit 0.3? The JSON parser is more performant in that version. I doubt it'll be sufficient, but it's worth trying.

guilload avatar Jun 01 '22 15:06 guilload

@gnufree it seems like your documents are pretty small, can you share an extract of the documents you are indexing?

Also, can you share your index config too?

fmassot avatar Jun 01 '22 23:06 fmassot

it seems like your documents are pretty small, can you share an extract of the documents you are indexing?

Also, can you share your index config too? @fmassot This is my index configuration:

version: 0

index_id: clickhouse

index_uri: s3://quickwit/indexes/clickhouse

doc_mapping:
  field_mappings:
    - name: id
      type: u64
      fast: true
    - name: created_at
      type: i64
      fast: true
    - name: _cluster_
      type: text
      tokenizer: raw
    - name: _namespace_
      type: text
      tokenizer: raw
    - name: _container_name_
      type: text
      tokenizer: raw
    - name: _pod_name_
      type: text
      tokenizer: raw
    - name: _log_
      type: text
      tokenizer: default
      record: position

indexing_settings:
  timestamp_field: created_at
  resources:
    num_threads: 8
    heap_size: 16G
search_settings:
  default_search_fields: [_log_]

sources:
  - source_id: quickwit-indexer
    source_type: kafka
    params:
      topic: production
      client_params:
        group.id: quickwit-indexer
        bootstrap.servers: 192.168.100.1:9092,192.168.100.2:9092,192.168.100.3:9082

gnufree avatar Jun 08 '22 01:06 gnufree

Everything looks good except for the RequiredFastField("id") errors. I suspect some docs are missing the id field. As far as indexing throughput, it may be the case that the volume of incoming data is too high for a single indexer. Can you try Quickwit 0.3? The JSON parser is more performant in that version. I doubt it'll be sufficient, but it's worth trying.

@guilload 0.3 not allowed to use_ Field created?

Command failed: Failed to parse YAML index config file.

Caused by:
    doc_mapping.field_mappings: Field name `_cluster_` may not start by _ at line 9 column 5

gnufree avatar Jun 08 '22 02:06 gnufree

@gnufree. Yes, fields starting by _ are forbidden. https://quickwit.io/docs/configuration/index-config#field-name-validation-rules

fulmicoton avatar Jun 09 '22 02:06 fulmicoton

I recommend against using

   num_threads: 8
    heap_size: 16G

If you are in the cloud, make sure to use a node with a local SSD. With EBS, the network will rapidly become your limiting factor.

We may have different workaround to increase your indexing throughput, if you are ingesting from kafka. I'm happy to discuss those if you are ok to have a quick call.

fulmicoton avatar Jun 09 '22 02:06 fulmicoton