numaflow icon indicating copy to clipboard operation
numaflow copied to clipboard

Pod repeatedly fails due to "nats: consumer not found" error

Open juliev0 opened this issue 9 months ago • 16 comments

Describe the bug

This is after looking through all of the artifacts during this CI failure.

Scenario: new isbsvc test-isbservice-rollout-1 created with new pipeline test-pipeline-rollout-2.

The sink vertex pod named "out" continually restarts. We have the log from the first failure and not the others unfortunately, but presumably they are the same?:

{"level":"info","ts":"2025-03-04T15:35:05.204127832Z","logger":"numaflow.Sink-processor","caller":"commands/processor.go:48","msg":"Starting vertex data processor","version":"Version: v1.4.3-rc3+3b92155, BuildDate: 2025-02-24T00:39:14Z, GitCommit: 3b921554682af1b663f451aefe8ceb106bffebc8, GitTag: , GitTreeState: clean, GoVersion: go1.23.4, Compiler: gc, Platform: linux/amd64"}
Error: failed to get consumer info, nats: consumer not found
Usage:
  numaflow processor [flags]

Flags:
  -h, --help                 help for processor
      --isbsvc-type string   ISB Service type, e.g. jetstream
      --type string          Processor type, 'source', 'sink' or 'udf'

panic: failed to get consumer info, nats: consumer not found

goroutine 1 [running]:
github.com/numaproj/numaflow/cmd/commands.Execute(...)
	/Users/jwang21/workspace/numaproj/numaflow/cmd/commands/root.go:33
main.main()
	/Users/jwang21/workspace/numaproj/numaflow/cmd/main.go:24 +0x3c

Pod restarted 6 times and failed every time.

Prior to this, the Job Pod failed and then succeeded. Unfortunately, we don't have the log from the successful run, only the failed one:

{"level":"info","ts":"2025-03-04T15:34:32.556800234Z","logger":"numaflow.isbsvc-create","caller":"isbsvc/jetstream_service.go:89","msg":"Succeeded to create a side inputs KV","pipeline":"test-pipeline-rollout-2","kvName":"numaplane-system-test-pipeline-rollout-2_SIDE_INPUTS"}
{"level":"info","ts":"2025-03-04T15:34:33.061041705Z","logger":"numaflow.isbsvc-create","caller":"isbsvc/jetstream_service.go:161","msg":"Succeeded to create a stream","pipeline":"test-pipeline-rollout-2","stream":"numaplane-system-test-pipeline-rollout-2-cat-0"}
{"level":"info","ts":"2025-03-04T15:34:33.516849285Z","logger":"numaflow.isbsvc-create","caller":"isbsvc/jetstream_service.go:172","msg":"Succeeded to create a consumer for a stream","pipeline":"test-pipeline-rollout-2","stream":"numaplane-system-test-pipeline-rollout-2-cat-0","consumer":"numaplane-system-test-pipeline-rollout-2-cat-0"}
{"level":"error","ts":"2025-03-04T15:34:38.526356783Z","logger":"numaflow.isbsvc-create","caller":"commands/isbsvc_create.go:93","msg":"Failed to create buffers, buckets and side inputs store.","pipeline":"test-pipeline-rollout-2","error":"failed to create stream \"numaplane-system-test-pipeline-rollout-2-out-0\" and buffers, context deadline exceeded","stacktrace":"github.com/numaproj/numaflow/cmd/commands.NewISBSvcCreateCommand.func1\n\t/Users/jwang21/workspace/numaproj/numaflow/cmd/commands/isbsvc_create.go:93\ngithub.com/spf13/cobra.(*Command).execute\n\t/Users/jwang21/go/pkg/mod/github.com/spf13/[email protected]/command.go:985\ngithub.com/spf13/cobra.(*Command).ExecuteC\n\t/Users/jwang21/go/pkg/mod/github.com/spf13/[email protected]/command.go:1117\ngithub.com/spf13/cobra.(*Command).Execute\n\t/Users/jwang21/go/pkg/mod/github.com/spf13/[email protected]/command.go:1041\ngithub.com/numaproj/numaflow/cmd/commands.Execute\n\t/Users/jwang21/workspace/numaproj/numaflow/cmd/commands/root.go:32\nmain.main\n\t/Users/jwang21/workspace/numaproj/numaflow/cmd/main.go:24\nruntime.main\n\t/usr/local/Cellar/go/1.23.4/libexec/src/runtime/proc.go:272"}
{"level":"error","ts":"2025-03-04T15:34:38.638514817Z","logger":"numaflow.isbsvc-create","caller":"nats/nats_client.go:69","msg":"Nats default: disconnected","pipeline":"test-pipeline-rollout-2","stacktrace":"github.com/numaproj/numaflow/pkg/shared/clients/nats.NewNATSClient.func3\n\t/Users/jwang21/workspace/numaproj/numaflow/pkg/shared/clients/nats/nats_client.go:69\ngithub.com/nats-io/nats%2ego.(*Conn).close.func1\n\t/Users/jwang21/go/pkg/mod/github.com/nats-io/[email protected]/nats.go:5332\ngithub.com/nats-io/nats%2ego.(*asyncCallbacksHandler).asyncCBDispatcher\n\t/Users/jwang21/go/pkg/mod/github.com/nats-io/[email protected]/nats.go:3011"}
{"level":"info","ts":"2025-03-04T15:34:38.656961721Z","logger":"numaflow.isbsvc-create","caller":"nats/nats_client.go:63","msg":"Nats default: connection closed","pipeline":"test-pipeline-rollout-2"}
Error: failed to create stream "numaplane-system-test-pipeline-rollout-2-out-0" and buffers, context deadline exceeded
Usage:
  numaflow isbsvc-create [flags]

Flags:
      --buckets strings                  Buckets to create
      --buffers strings                  Buffers to create
  -h, --help                             help for isbsvc-create
      --isbsvc-type string               ISB Service type, e.g. jetstream
      --serving-source-streams strings   Serving source streams to create
      --side-inputs-store string         Name of the side inputs store

panic: failed to create stream "numaplane-system-test-pipeline-rollout-2-out-0" and buffers, context deadline exceeded

goroutine 1 [running]:
github.com/numaproj/numaflow/cmd/commands.Execute(...)
	/Users/jwang21/workspace/numaproj/numaflow/cmd/commands/root.go:33
main.main()
	/Users/jwang21/workspace/numaproj/numaflow/cmd/main.go:24 +0x3c

I am attaching all artifacts and logs that we have:

pod-logs-progressive-functional (7).zip resource-changes-progressive-functional (5).zip

The timeline is this:

2025-03-04T15:33:43.781272599Z pipeline created

2025-03-04T15:34:29.771129989Z Create Job starts running

2025-03-04T15:34:49: Create Job Pod restarts after failure and succeeds

2025-03-04T15:35:05.204127832Z test-pipeline-rollout-2 out-0 runs

2025-03-04T15:35:08 test-pipeline-rollout-2 out-0 panics

2025-03-04T15:38:04Z test-pipeline-rollout-2 out-0 has now restarted 5 times

To Reproduce Steps to reproduce the behavior:

This may not be easily reproducible. This CI test usually passes.


Message from the maintainers:

Impacted by this bug? Give it a 👍. We often sort issues this way to know what to prioritize.

For quick help and support, join our slack channel.

juliev0 avatar Mar 05 '25 00:03 juliev0

I know this has only happened once and may be hard to reproduce. I'm okay if we don't look into it yet, but I wanted to create a record for it if it happens again.

juliev0 avatar Mar 05 '25 00:03 juliev0

Ive just seen the same issue happen to a vertex in one of our pipelines recreating the pod did not resolve it. Eventually removing and recreating the whole pipeline resolved it. If this happens again is there any information I should gather to help investigate this before recreating the pipeline?

saty9 avatar Jun 04 '25 09:06 saty9

@saty9, you are running the ISB (jetstream) with persistent storage and replica = 3, right?

vigith avatar Jun 04 '25 15:06 vigith

config:
    jetstream:
      url: nats://isbsvc-default-js-svc.preview.svc:4222
      auth:
        ********
      streamConfig: |
        consumer:
          ackwait: 60s
          maxackpending: 25000
        otbucket:
          history: 1
          maxbytes: 0
          maxvaluesize: 0
          replicas: 3
          storage: 0
          ttl: 3h
        procbucket:
          history: 1
          maxbytes: 0
          maxvaluesize: 0
          replicas: 3
          storage: 0
          ttl: 72h
        stream:
          duplicates: 60s
          maxage: 72h
          maxbytes: -1
          maxmsgs: 100000
          replicas: 3
          retention: 0
          storage: 0
  • [x] jetstream
  • [x] 3 replicas
  • [ ] persistent storage.. don't think we set that

I think we use the default config from the chart but grabbed this from the web ui.

Should we configure persistence for the isbsvc?

saty9 avatar Jun 04 '25 15:06 saty9

Should we configure persistence for the isbsvc?

I think the default should have it. Do you mind sharing your isb config?

vigith avatar Jun 04 '25 15:06 vigith

apiVersion: numaflow.numaproj.io/v1alpha1
kind: InterStepBufferService
metadata:
  name: default
spec:
  jetstream:
    version: 2.10.17 
    persistence:
      volumeSize: 3Gi

We are currently on numaflow v1.5.0-rc5

saty9 avatar Jun 05 '25 09:06 saty9

I was having the same issue as @saty9 and now it just flip flops between that and this error in the logs:

2025-06-05T11:11:33.868581Z  INFO numaflow_core::pipeline: Starting map forwarder
2025-06-05T11:11:33.878303Z  INFO async_nats: event: connected
2025-06-05T11:11:43.869830Z ERROR numaflow_core: Error running pipeline e=Watermark("Failed to get kv bucket preview-preview-api-pr-9605-metrics-m-parts-api-m-ppm-srv-m-ppm_OT: failed to get the bucket: request error: timed out: request timed out: deadline has elapsed")
2025-06-05T11:11:43.869899Z  INFO async_nats: event: closed
2025-06-05T11:11:43.870112Z  INFO numaflow_core: Gracefully Exiting...

Same setup. Default settings jetstream isbsvc

Koalk avatar Jun 05 '25 11:06 Koalk

Possibly related was this log from the isbsvc [7] 2025/06/05 12:54:33.993698 [WRN] JetStream request queue has high pending count: 18062. We are seeing a lot of these with the number getting larger is it possible we have hit some limit on how many streams jetstream can handle and after that it cant cope with all the requests from consumers and numaflow trying to make more streams?

saty9 avatar Jun 05 '25 13:06 saty9

@saty9

  • could you please share your pipeline spec?
  • what is the TPS and payload size?
  • how is the resource usage on ISB, is it pegging on CPU or Memory?

vigith avatar Jun 05 '25 15:06 vigith

  • TPS?
  • payload size when entering pipeline < 300 bytes but cant see a metric exposed for what they are inside numaflow I would expect them to be bigger 1 section in particular is >1MB messages
  • CPU usage for individual isbsvc pods has been peaking at 6
  • memory usage has been peaking at 15GB (I couldnt see a clear way to set resource requests and limits for the service?) Will make a sanitized pipeline in a few minutes

saty9 avatar Jun 05 '25 15:06 saty9

TPS = Transactions per second.

writing bigger payloads (>5KB) to JetStream could bring some inconsistencies to the Raft layer of JetStream. Is it possible to compress the payload before sending to the next vertex?

I am working https://github.com/numaproj/numaflow/pull/2704 but this is for the entire pipeline and might not make sense if the incoming payloads are smaller.

vigith avatar Jun 05 '25 15:06 vigith

current message rate of our whole nats server (we use jetstream for our sources) is less than 2 messsages/second (averaged across a minute) that will include inputs and outputs to the pipelines so I doubt its rate of messages entering the pipeline thats the issue. Is there a metric created by numaflow for tps internal to the pipeline? We might be able to avoid sending such a large message but were hoping to be able to avoid that work till later in the process.

saty9 avatar Jun 05 '25 15:06 saty9

I am surprised to see [7] 2025/06/05 12:54:33.993698 [WRN] JetStream request queue has high pending count: 18062 error if the TPS is so low. Also, why is your ISB at 6 core and 15G for such a low TPS.

use the latest numaflow 1.5.0-rc release, we have metrics https://numaflow.numaproj.io/operations/metrics/metrics/

vigith avatar Jun 05 '25 16:06 vigith

is rc5 not latest?

Yeah that message was getting into the millions I think something is thrashing the isbsvc trying to create streams and consumers and its that thats causing the massive memory and cpu usage. we are creating quite a lot of pipelines as we create multiple for each pr but I still wouldnt expect this behaviour.

saty9 avatar Jun 05 '25 16:06 saty9

are you sharing ISBs? if yes, try to have different ISBs for different pipelines

vigith avatar Jun 05 '25 16:06 vigith

If you could provide some guidance on limiting CPU and memory of the isbsvc we could try making a separate service for each pr

saty9 avatar Jun 05 '25 16:06 saty9