connect icon indicating copy to clipboard operation
connect copied to clipboard

Asyn preserver blocking shutdowns (http_client input with streaming enabled)

Open Jeffail opened this issue 1 year ago • 1 comments

Log dump of the blocked pipeline:

INFO Main config updated, attempting to update pipeline.  @service=benthos
DEBU Waiting for pending acks to resolve before shutting down.  @service=benthos label="" path=root.input
DEBU Pending acks resolved.                        @service=benthos label="" path=root.input
ERRO Encountered error whilst attempting to shut down gracefully: context deadline exceeded  @service=benthos
INFO Some components prevented forced termination as they were either blocked from delivering data or from acknowledging delivered data within the shutdown timeout. This could potentially cause duplicate messages to be delivered on the next run.  @service=benthos
DEBU goroutine profile: total 11
1 @ 0x40f069 0x46dd09 0xb2b113 0x471701
#	0x46dd08	os/signal.signal_recv+0x28	/usr/local/go/src/runtime/sigqueue.go:152
#	0xb2b112	os/signal.loop+0x12		/usr/local/go/src/os/signal/signal_unix.go:23

1 @ 0x433291 0x46b79d 0x9ce771 0x9ce5a5 0x9cb046 0xa7c6ce 0xb3d067 0xb3ca3d 0xad35aa 0xadd3b1 0x471701
#	0x46b79c	runtime/pprof.runtime_goroutineProfileWithLabels+0x1c						/usr/local/go/src/runtime/mprof.go:844
#	0x9ce770	runtime/pprof.writeRuntimeProfile+0xb0								/usr/local/go/src/runtime/pprof/pprof.go:734
#	0x9ce5a4	runtime/pprof.writeGoroutine+0x44								/usr/local/go/src/runtime/pprof/pprof.go:694
#	0x9cb045	runtime/pprof.(*Profile).WriteTo+0x145								/usr/local/go/src/runtime/pprof/pprof.go:329
#	0xa7c6cd	github.com/benthosdev/benthos/v4/internal/stream.(*Type).Stop+0x30d				/go/src/github.com/benthosdev/benthos/internal/stream/type.go:266
#	0xb3d066	github.com/benthosdev/benthos/v4/internal/cli/common.(*SwappableStopper).Replace+0xe6		/go/src/github.com/benthosdev/benthos/internal/cli/common/swappable.go:85
#	0xb3ca3c	github.com/benthosdev/benthos/v4/internal/cli/common.initNormalMode.func2+0xbc			/go/src/github.com/benthosdev/benthos/internal/cli/common/service.go:205
#	0xad35a9	github.com/benthosdev/benthos/v4/internal/config.(*Reader).TriggerMainUpdate+0xd29		/go/src/github.com/benthosdev/benthos/internal/config/reader.go:385
#	0xadd3b0	github.com/benthosdev/benthos/v4/internal/config.(*Reader).BeginFileWatching.func3+0x7d0	/go/src/github.com/benthosdev/benthos/internal/config/watcher.go:176

1 @ 0x43e42e 0x436c57 0x46bbc5 0x4edd67 0x4ef05a 0x4ef048 0x4fa9b2 0x4fa9aa 0xace692 0x471701
#	0x46bbc4	internal/poll.runtime_pollWait+0x84			/usr/local/go/src/runtime/netpoll.go:343
#	0x4edd66	internal/poll.(*pollDesc).wait+0x26			/usr/local/go/src/internal/poll/fd_poll_runtime.go:84
#	0x4ef059	internal/poll.(*pollDesc).waitRead+0x279		/usr/local/go/src/internal/poll/fd_poll_runtime.go:89
#	0x4ef047	internal/poll.(*FD).Read+0x267				/usr/local/go/src/internal/poll/fd_unix.go:164
#	0x4fa9b1	os.(*File).read+0x51					/usr/local/go/src/os/file_posix.go:29
#	0x4fa9a9	os.(*File).Read+0x49					/usr/local/go/src/os/file.go:118
#	0xace691	github.com/fsnotify/fsnotify.(*Watcher).readEvents+0xd1	/go/pkg/mod/github.com/fsnotify/[email protected]/backend_inotify.go:483

1 @ 0x43e42e 0x436c57 0x46bbc5 0x4edd67 0x4ef05a 0x4ef048 0x6b0885 0x6c2165 0x7166db 0x524138 0x7168be 0x713e90 0x71a158 0x71a161 0x756d97 0x4e5ef0 0x79b0a5 0x79b074 0x79b7e5 0x7be6bf 0x7bdc45 0x471701
#	0x46bbc4	internal/poll.runtime_pollWait+0x84		/usr/local/go/src/runtime/netpoll.go:343
#	0x4edd66	internal/poll.(*pollDesc).wait+0x26		/usr/local/go/src/internal/poll/fd_poll_runtime.go:84
#	0x4ef059	internal/poll.(*pollDesc).waitRead+0x279	/usr/local/go/src/internal/poll/fd_poll_runtime.go:89
#	0x4ef047	internal/poll.(*FD).Read+0x267			/usr/local/go/src/internal/poll/fd_unix.go:164
#	0x6b0884	net.(*netFD).Read+0x24				/usr/local/go/src/net/fd_posix.go:55
#	0x6c2164	net.(*conn).Read+0x44				/usr/local/go/src/net/net.go:179
#	0x7166da	crypto/tls.(*atLeastReader).Read+0x3a		/usr/local/go/src/crypto/tls/conn.go:805
#	0x524137	bytes.(*Buffer).ReadFrom+0x97			/usr/local/go/src/bytes/buffer.go:211
#	0x7168bd	crypto/tls.(*Conn).readFromUntil+0xdd		/usr/local/go/src/crypto/tls/conn.go:827
#	0x713e8f	crypto/tls.(*Conn).readRecordOrCCS+0x24f	/usr/local/go/src/crypto/tls/conn.go:625
#	0x71a157	crypto/tls.(*Conn).readRecord+0x157		/usr/local/go/src/crypto/tls/conn.go:587
#	0x71a160	crypto/tls.(*Conn).Read+0x160			/usr/local/go/src/crypto/tls/conn.go:1369
#	0x756d96	bufio.(*Reader).Read+0x196			/usr/local/go/src/bufio/bufio.go:244
#	0x4e5eef	io.ReadAtLeast+0x8f				/usr/local/go/src/io/io.go:335
#	0x79b0a4	io.ReadFull+0x64				/usr/local/go/src/io/io.go:354
#	0x79b073	net/http.http2readFrameHeader+0x33		/usr/local/go/src/net/http/h2_bundle.go:1635
#	0x79b7e4	net/http.(*http2Framer).ReadFrame+0x84		/usr/local/go/src/net/http/h2_bundle.go:1899
#	0x7be6be	net/http.(*http2clientConnReadLoop).run+0x11e	/usr/local/go/src/net/http/h2_bundle.go:9339
#	0x7bdc44	net/http.(*http2ClientConn).readLoop+0x64	/usr/local/go/src/net/http/h2_bundle.go:9234

1 @ 0x43e42e 0x436c57 0x46bbc5 0x4edd67 0x4f324c 0x4f323a 0x6b28a9 0x6cce3e 0x6cbcd0 0x7dbb64 0x7db7d1 0xaca668 0xb3a1ed 0x471701
#	0x46bbc4	internal/poll.runtime_pollWait+0x84							/usr/local/go/src/runtime/netpoll.go:343
#	0x4edd66	internal/poll.(*pollDesc).wait+0x26							/usr/local/go/src/internal/poll/fd_poll_runtime.go:84
#	0x4f324b	internal/poll.(*pollDesc).waitRead+0x2ab						/usr/local/go/src/internal/poll/fd_poll_runtime.go:89
#	0x4f3239	internal/poll.(*FD).Accept+0x299							/usr/local/go/src/internal/poll/fd_unix.go:611
#	0x6b28a8	net.(*netFD).accept+0x28								/usr/local/go/src/net/fd_unix.go:172
#	0x6cce3d	net.(*TCPListener).accept+0x1d								/usr/local/go/src/net/tcpsock_posix.go:152
#	0x6cbccf	net.(*TCPListener).Accept+0x2f								/usr/local/go/src/net/tcpsock.go:315
#	0x7dbb63	net/http.(*Server).Serve+0x363								/usr/local/go/src/net/http/server.go:3056
#	0x7db7d0	net/http.(*Server).ListenAndServe+0x70							/usr/local/go/src/net/http/server.go:2985
#	0xaca667	github.com/benthosdev/benthos/v4/internal/api.(*Type).ListenAndServe+0xe7		/go/src/github.com/benthosdev/benthos/internal/api/api.go:289
#	0xb3a1ec	github.com/benthosdev/benthos/v4/internal/cli/common.newStoppableManager.func1+0x2c	/go/src/github.com/benthosdev/benthos/internal/cli/common/manager.go:228

1 @ 0x43e42e 0x44e8e5 0x1c90239 0x471701
#	0x1c90238	github.com/golang/glog.(*fileSink).flushDaemon+0xb8	/go/pkg/mod/github.com/golang/[email protected]/glog_file.go:351

1 @ 0x43e42e 0x44e8e5 0x201437f 0x471701
#	0x201437e	go.opencensus.io/stats/view.(*worker).start+0x9e	/go/pkg/mod/[email protected]/stats/view/worker.go:292

1 @ 0x43e42e 0x44e8e5 0x7b9f27 0x7b9418 0x471701
#	0x7b9f26	net/http.(*http2clientStream).writeRequest+0xac6	/usr/local/go/src/net/http/h2_bundle.go:8549
#	0x7b9417	net/http.(*http2clientStream).doRequest+0x17		/usr/local/go/src/net/http/h2_bundle.go:8411

1 @ 0x43e42e 0x44e8e5 0xb39825 0xb3b469 0xb6b7eb 0x5aadf8 0x5a7777 0xb6af65 0x3b829c5 0x43dfbb 0x471701
#	0xb39824	github.com/benthosdev/benthos/v4/internal/cli/common.RunManagerUntilStopped+0x324	/go/src/github.com/benthosdev/benthos/internal/cli/common/manager.go:198
#	0xb3b468	github.com/benthosdev/benthos/v4/internal/cli/common.RunService+0x828			/go/src/github.com/benthosdev/benthos/internal/cli/common/service.go:81
#	0xb6b7ea	github.com/benthosdev/benthos/v4/internal/cli.App.func2+0x1ca				/go/src/github.com/benthosdev/benthos/internal/cli/run.go:183
#	0x5aadf7	github.com/urfave/cli/v2.(*Command).Run+0x997						/go/pkg/mod/github.com/urfave/cli/[email protected]/command.go:274
#	0x5a7776	github.com/urfave/cli/v2.(*App).RunContext+0x5b6					/go/pkg/mod/github.com/urfave/cli/[email protected]/app.go:332
#	0xb6af64	github.com/benthosdev/benthos/v4/internal/cli.Run+0x44					/go/src/github.com/benthosdev/benthos/internal/cli/run.go:283
#	0x3b829c4	main.main+0x24										/go/src/github.com/benthosdev/benthos/cmd/benthos/main.go:13
#	0x43dfba	runtime.main+0x2ba									/usr/local/go/src/runtime/proc.go:267

1 @ 0x43e42e 0x44f918 0x44f8ef 0x46d665 0x47ce7d 0x227fb08 0x227faf4 0xa63e6c 0xa65753 0xa64b73 0x471701
#	0x46d664	sync.runtime_SemacquireMutex+0x24								/usr/local/go/src/runtime/sema.go:77
#	0x47ce7c	sync.(*Mutex).lockSlow+0x15c									/usr/local/go/src/sync/mutex.go:171
#	0x227fb07	sync.(*Mutex).Lock+0x67										/usr/local/go/src/sync/mutex.go:90
#	0x227faf3	github.com/benthosdev/benthos/v4/internal/impl/io.(*httpClientInput).Close+0x53			/go/src/github.com/benthosdev/benthos/internal/impl/io/input_http_client.go:315
#	0xa63e6b	github.com/benthosdev/benthos/v4/internal/component/input.(*AsyncPreserver).Close+0x4b		/go/src/github.com/benthosdev/benthos/internal/component/input/async_preserver.go:121
#	0xa65752	github.com/benthosdev/benthos/v4/internal/component/input.(*AsyncReader).loop.func1+0x32	/go/src/github.com/benthosdev/benthos/internal/component/input/async_reader.go:98
#	0xa64b72	github.com/benthosdev/benthos/v4/internal/component/input.(*AsyncReader).loop+0x932		/go/src/github.com/benthosdev/benthos/internal/component/input/async_reader.go:172

1 @ 0x43e42e 0x46d919 0x46d8f9 0x47b125 0x7a365f 0x7c0405 0x75979b 0xfce999 0x227e8fb 0x227e7c5 0xa63a0f 0xa68489 0x471701
#	0x46d8f8	sync.runtime_notifyListWait+0x138							/usr/local/go/src/runtime/sema.go:527
#	0x47b124	sync.(*Cond).Wait+0x84									/usr/local/go/src/sync/cond.go:70
#	0x7a365e	net/http.(*http2pipe).Read+0xde								/usr/local/go/src/net/http/h2_bundle.go:3765
#	0x7c0404	net/http.http2transportResponseBody.Read+0x64						/usr/local/go/src/net/http/h2_bundle.go:9622
#	0x75979a	bufio.(*Scanner).Scan+0x81a								/usr/local/go/src/bufio/scan.go:214
#	0xfce998	github.com/benthosdev/benthos/v4/internal/codec.(*linesReader).Next+0x78		/go/src/github.com/benthosdev/benthos/internal/codec/reader.go:578
#	0x227e8fa	github.com/benthosdev/benthos/v4/internal/impl/io.(*httpClientInput).readStreamed+0xfa	/go/src/github.com/benthosdev/benthos/internal/impl/io/input_http_client.go:239
#	0x227e7c4	github.com/benthosdev/benthos/v4/internal/impl/io.(*httpClientInput).ReadBatch+0x24	/go/src/github.com/benthosdev/benthos/internal/impl/io/input_http_client.go:226
#	0xa63a0e	github.com/benthosdev/benthos/v4/internal/component/input.NewAsyncPreserver.func1+0x2e	/go/src/github.com/benthosdev/benthos/internal/component/input/async_preserver.go:35
#	0xa68488	github.com/benthosdev/benthos/v4/internal/autoretry.(*List[...]).dispatchReader+0x48	/go/src/github.com/benthosdev/benthos/internal/autoretry/auto_retry_list.go:119
  @service=benthos
INFO Updated main config                           @service=benthos

And the config looks something like this:

input:
  http_client:
    url: <meow>
    stream:
      enabled: true
output:
  stdout: {}

Jeffail avatar Feb 16 '24 10:02 Jeffail

Seeing this as well for SFTP input to STDOUT output

g8r-b8 avatar Jul 12 '24 16:07 g8r-b8