connect
connect copied to clipboard
Asyn preserver blocking shutdowns (http_client input with streaming enabled)
Log dump of the blocked pipeline:
INFO Main config updated, attempting to update pipeline. @service=benthos
DEBU Waiting for pending acks to resolve before shutting down. @service=benthos label="" path=root.input
DEBU Pending acks resolved. @service=benthos label="" path=root.input
ERRO Encountered error whilst attempting to shut down gracefully: context deadline exceeded @service=benthos
INFO Some components prevented forced termination as they were either blocked from delivering data or from acknowledging delivered data within the shutdown timeout. This could potentially cause duplicate messages to be delivered on the next run. @service=benthos
DEBU goroutine profile: total 11
1 @ 0x40f069 0x46dd09 0xb2b113 0x471701
# 0x46dd08 os/signal.signal_recv+0x28 /usr/local/go/src/runtime/sigqueue.go:152
# 0xb2b112 os/signal.loop+0x12 /usr/local/go/src/os/signal/signal_unix.go:23
1 @ 0x433291 0x46b79d 0x9ce771 0x9ce5a5 0x9cb046 0xa7c6ce 0xb3d067 0xb3ca3d 0xad35aa 0xadd3b1 0x471701
# 0x46b79c runtime/pprof.runtime_goroutineProfileWithLabels+0x1c /usr/local/go/src/runtime/mprof.go:844
# 0x9ce770 runtime/pprof.writeRuntimeProfile+0xb0 /usr/local/go/src/runtime/pprof/pprof.go:734
# 0x9ce5a4 runtime/pprof.writeGoroutine+0x44 /usr/local/go/src/runtime/pprof/pprof.go:694
# 0x9cb045 runtime/pprof.(*Profile).WriteTo+0x145 /usr/local/go/src/runtime/pprof/pprof.go:329
# 0xa7c6cd github.com/benthosdev/benthos/v4/internal/stream.(*Type).Stop+0x30d /go/src/github.com/benthosdev/benthos/internal/stream/type.go:266
# 0xb3d066 github.com/benthosdev/benthos/v4/internal/cli/common.(*SwappableStopper).Replace+0xe6 /go/src/github.com/benthosdev/benthos/internal/cli/common/swappable.go:85
# 0xb3ca3c github.com/benthosdev/benthos/v4/internal/cli/common.initNormalMode.func2+0xbc /go/src/github.com/benthosdev/benthos/internal/cli/common/service.go:205
# 0xad35a9 github.com/benthosdev/benthos/v4/internal/config.(*Reader).TriggerMainUpdate+0xd29 /go/src/github.com/benthosdev/benthos/internal/config/reader.go:385
# 0xadd3b0 github.com/benthosdev/benthos/v4/internal/config.(*Reader).BeginFileWatching.func3+0x7d0 /go/src/github.com/benthosdev/benthos/internal/config/watcher.go:176
1 @ 0x43e42e 0x436c57 0x46bbc5 0x4edd67 0x4ef05a 0x4ef048 0x4fa9b2 0x4fa9aa 0xace692 0x471701
# 0x46bbc4 internal/poll.runtime_pollWait+0x84 /usr/local/go/src/runtime/netpoll.go:343
# 0x4edd66 internal/poll.(*pollDesc).wait+0x26 /usr/local/go/src/internal/poll/fd_poll_runtime.go:84
# 0x4ef059 internal/poll.(*pollDesc).waitRead+0x279 /usr/local/go/src/internal/poll/fd_poll_runtime.go:89
# 0x4ef047 internal/poll.(*FD).Read+0x267 /usr/local/go/src/internal/poll/fd_unix.go:164
# 0x4fa9b1 os.(*File).read+0x51 /usr/local/go/src/os/file_posix.go:29
# 0x4fa9a9 os.(*File).Read+0x49 /usr/local/go/src/os/file.go:118
# 0xace691 github.com/fsnotify/fsnotify.(*Watcher).readEvents+0xd1 /go/pkg/mod/github.com/fsnotify/[email protected]/backend_inotify.go:483
1 @ 0x43e42e 0x436c57 0x46bbc5 0x4edd67 0x4ef05a 0x4ef048 0x6b0885 0x6c2165 0x7166db 0x524138 0x7168be 0x713e90 0x71a158 0x71a161 0x756d97 0x4e5ef0 0x79b0a5 0x79b074 0x79b7e5 0x7be6bf 0x7bdc45 0x471701
# 0x46bbc4 internal/poll.runtime_pollWait+0x84 /usr/local/go/src/runtime/netpoll.go:343
# 0x4edd66 internal/poll.(*pollDesc).wait+0x26 /usr/local/go/src/internal/poll/fd_poll_runtime.go:84
# 0x4ef059 internal/poll.(*pollDesc).waitRead+0x279 /usr/local/go/src/internal/poll/fd_poll_runtime.go:89
# 0x4ef047 internal/poll.(*FD).Read+0x267 /usr/local/go/src/internal/poll/fd_unix.go:164
# 0x6b0884 net.(*netFD).Read+0x24 /usr/local/go/src/net/fd_posix.go:55
# 0x6c2164 net.(*conn).Read+0x44 /usr/local/go/src/net/net.go:179
# 0x7166da crypto/tls.(*atLeastReader).Read+0x3a /usr/local/go/src/crypto/tls/conn.go:805
# 0x524137 bytes.(*Buffer).ReadFrom+0x97 /usr/local/go/src/bytes/buffer.go:211
# 0x7168bd crypto/tls.(*Conn).readFromUntil+0xdd /usr/local/go/src/crypto/tls/conn.go:827
# 0x713e8f crypto/tls.(*Conn).readRecordOrCCS+0x24f /usr/local/go/src/crypto/tls/conn.go:625
# 0x71a157 crypto/tls.(*Conn).readRecord+0x157 /usr/local/go/src/crypto/tls/conn.go:587
# 0x71a160 crypto/tls.(*Conn).Read+0x160 /usr/local/go/src/crypto/tls/conn.go:1369
# 0x756d96 bufio.(*Reader).Read+0x196 /usr/local/go/src/bufio/bufio.go:244
# 0x4e5eef io.ReadAtLeast+0x8f /usr/local/go/src/io/io.go:335
# 0x79b0a4 io.ReadFull+0x64 /usr/local/go/src/io/io.go:354
# 0x79b073 net/http.http2readFrameHeader+0x33 /usr/local/go/src/net/http/h2_bundle.go:1635
# 0x79b7e4 net/http.(*http2Framer).ReadFrame+0x84 /usr/local/go/src/net/http/h2_bundle.go:1899
# 0x7be6be net/http.(*http2clientConnReadLoop).run+0x11e /usr/local/go/src/net/http/h2_bundle.go:9339
# 0x7bdc44 net/http.(*http2ClientConn).readLoop+0x64 /usr/local/go/src/net/http/h2_bundle.go:9234
1 @ 0x43e42e 0x436c57 0x46bbc5 0x4edd67 0x4f324c 0x4f323a 0x6b28a9 0x6cce3e 0x6cbcd0 0x7dbb64 0x7db7d1 0xaca668 0xb3a1ed 0x471701
# 0x46bbc4 internal/poll.runtime_pollWait+0x84 /usr/local/go/src/runtime/netpoll.go:343
# 0x4edd66 internal/poll.(*pollDesc).wait+0x26 /usr/local/go/src/internal/poll/fd_poll_runtime.go:84
# 0x4f324b internal/poll.(*pollDesc).waitRead+0x2ab /usr/local/go/src/internal/poll/fd_poll_runtime.go:89
# 0x4f3239 internal/poll.(*FD).Accept+0x299 /usr/local/go/src/internal/poll/fd_unix.go:611
# 0x6b28a8 net.(*netFD).accept+0x28 /usr/local/go/src/net/fd_unix.go:172
# 0x6cce3d net.(*TCPListener).accept+0x1d /usr/local/go/src/net/tcpsock_posix.go:152
# 0x6cbccf net.(*TCPListener).Accept+0x2f /usr/local/go/src/net/tcpsock.go:315
# 0x7dbb63 net/http.(*Server).Serve+0x363 /usr/local/go/src/net/http/server.go:3056
# 0x7db7d0 net/http.(*Server).ListenAndServe+0x70 /usr/local/go/src/net/http/server.go:2985
# 0xaca667 github.com/benthosdev/benthos/v4/internal/api.(*Type).ListenAndServe+0xe7 /go/src/github.com/benthosdev/benthos/internal/api/api.go:289
# 0xb3a1ec github.com/benthosdev/benthos/v4/internal/cli/common.newStoppableManager.func1+0x2c /go/src/github.com/benthosdev/benthos/internal/cli/common/manager.go:228
1 @ 0x43e42e 0x44e8e5 0x1c90239 0x471701
# 0x1c90238 github.com/golang/glog.(*fileSink).flushDaemon+0xb8 /go/pkg/mod/github.com/golang/[email protected]/glog_file.go:351
1 @ 0x43e42e 0x44e8e5 0x201437f 0x471701
# 0x201437e go.opencensus.io/stats/view.(*worker).start+0x9e /go/pkg/mod/[email protected]/stats/view/worker.go:292
1 @ 0x43e42e 0x44e8e5 0x7b9f27 0x7b9418 0x471701
# 0x7b9f26 net/http.(*http2clientStream).writeRequest+0xac6 /usr/local/go/src/net/http/h2_bundle.go:8549
# 0x7b9417 net/http.(*http2clientStream).doRequest+0x17 /usr/local/go/src/net/http/h2_bundle.go:8411
1 @ 0x43e42e 0x44e8e5 0xb39825 0xb3b469 0xb6b7eb 0x5aadf8 0x5a7777 0xb6af65 0x3b829c5 0x43dfbb 0x471701
# 0xb39824 github.com/benthosdev/benthos/v4/internal/cli/common.RunManagerUntilStopped+0x324 /go/src/github.com/benthosdev/benthos/internal/cli/common/manager.go:198
# 0xb3b468 github.com/benthosdev/benthos/v4/internal/cli/common.RunService+0x828 /go/src/github.com/benthosdev/benthos/internal/cli/common/service.go:81
# 0xb6b7ea github.com/benthosdev/benthos/v4/internal/cli.App.func2+0x1ca /go/src/github.com/benthosdev/benthos/internal/cli/run.go:183
# 0x5aadf7 github.com/urfave/cli/v2.(*Command).Run+0x997 /go/pkg/mod/github.com/urfave/cli/[email protected]/command.go:274
# 0x5a7776 github.com/urfave/cli/v2.(*App).RunContext+0x5b6 /go/pkg/mod/github.com/urfave/cli/[email protected]/app.go:332
# 0xb6af64 github.com/benthosdev/benthos/v4/internal/cli.Run+0x44 /go/src/github.com/benthosdev/benthos/internal/cli/run.go:283
# 0x3b829c4 main.main+0x24 /go/src/github.com/benthosdev/benthos/cmd/benthos/main.go:13
# 0x43dfba runtime.main+0x2ba /usr/local/go/src/runtime/proc.go:267
1 @ 0x43e42e 0x44f918 0x44f8ef 0x46d665 0x47ce7d 0x227fb08 0x227faf4 0xa63e6c 0xa65753 0xa64b73 0x471701
# 0x46d664 sync.runtime_SemacquireMutex+0x24 /usr/local/go/src/runtime/sema.go:77
# 0x47ce7c sync.(*Mutex).lockSlow+0x15c /usr/local/go/src/sync/mutex.go:171
# 0x227fb07 sync.(*Mutex).Lock+0x67 /usr/local/go/src/sync/mutex.go:90
# 0x227faf3 github.com/benthosdev/benthos/v4/internal/impl/io.(*httpClientInput).Close+0x53 /go/src/github.com/benthosdev/benthos/internal/impl/io/input_http_client.go:315
# 0xa63e6b github.com/benthosdev/benthos/v4/internal/component/input.(*AsyncPreserver).Close+0x4b /go/src/github.com/benthosdev/benthos/internal/component/input/async_preserver.go:121
# 0xa65752 github.com/benthosdev/benthos/v4/internal/component/input.(*AsyncReader).loop.func1+0x32 /go/src/github.com/benthosdev/benthos/internal/component/input/async_reader.go:98
# 0xa64b72 github.com/benthosdev/benthos/v4/internal/component/input.(*AsyncReader).loop+0x932 /go/src/github.com/benthosdev/benthos/internal/component/input/async_reader.go:172
1 @ 0x43e42e 0x46d919 0x46d8f9 0x47b125 0x7a365f 0x7c0405 0x75979b 0xfce999 0x227e8fb 0x227e7c5 0xa63a0f 0xa68489 0x471701
# 0x46d8f8 sync.runtime_notifyListWait+0x138 /usr/local/go/src/runtime/sema.go:527
# 0x47b124 sync.(*Cond).Wait+0x84 /usr/local/go/src/sync/cond.go:70
# 0x7a365e net/http.(*http2pipe).Read+0xde /usr/local/go/src/net/http/h2_bundle.go:3765
# 0x7c0404 net/http.http2transportResponseBody.Read+0x64 /usr/local/go/src/net/http/h2_bundle.go:9622
# 0x75979a bufio.(*Scanner).Scan+0x81a /usr/local/go/src/bufio/scan.go:214
# 0xfce998 github.com/benthosdev/benthos/v4/internal/codec.(*linesReader).Next+0x78 /go/src/github.com/benthosdev/benthos/internal/codec/reader.go:578
# 0x227e8fa github.com/benthosdev/benthos/v4/internal/impl/io.(*httpClientInput).readStreamed+0xfa /go/src/github.com/benthosdev/benthos/internal/impl/io/input_http_client.go:239
# 0x227e7c4 github.com/benthosdev/benthos/v4/internal/impl/io.(*httpClientInput).ReadBatch+0x24 /go/src/github.com/benthosdev/benthos/internal/impl/io/input_http_client.go:226
# 0xa63a0e github.com/benthosdev/benthos/v4/internal/component/input.NewAsyncPreserver.func1+0x2e /go/src/github.com/benthosdev/benthos/internal/component/input/async_preserver.go:35
# 0xa68488 github.com/benthosdev/benthos/v4/internal/autoretry.(*List[...]).dispatchReader+0x48 /go/src/github.com/benthosdev/benthos/internal/autoretry/auto_retry_list.go:119
@service=benthos
INFO Updated main config @service=benthos
And the config looks something like this:
input:
http_client:
url: <meow>
stream:
enabled: true
output:
stdout: {}
Seeing this as well for SFTP input to STDOUT output