vector
vector copied to clipboard
Vector hangs forever waiting for a sink to finish after rereading a config.
A note for the community
No response
Problem
The steps to reproduce:
- Run netcat:
nc -k -l 0.0.0.0 3000
- Run vector with the attached config (the address to connect is
127.0.0.1:3001
):
cargo run --release --no-default-features --features 'sources-stdin-sinks-socket' -- -c /tmp/vector.toml -w
Vector tries to connect to the tcp server in an infinite loop:
2023-01-13T13:11:10.663923Z INFO vector::app: Internal log rate limit configured. internal_log_rate_secs=10
2023-01-13T13:11:10.664098Z INFO vector::app: Log level is enabled. level="vector=info,codec=info,vrl=info,file_source=info,tower_limit=trace,rdkafka=info,buffers=info,lapin=info,kube=info"
2023-01-13T13:11:10.664164Z INFO vector::config::watcher: Creating configuration file watcher.
2023-01-13T13:11:10.664382Z INFO vector::config::watcher: Watching configuration files.
2023-01-13T13:11:10.664455Z INFO vector::app: Loading configs. paths=["/tmp/vector.toml"]
2023-01-13T13:11:10.666274Z INFO vector::sources::file_descriptors: Capturing stdin.
2023-01-13T13:11:10.666371Z INFO vector::topology::running: Running healthchecks.
2023-01-13T13:11:10.666488Z INFO vector: Vector has started. debug="false" version="0.27.0" arch="x86_64" revision=""
2023-01-13T13:11:10.666651Z ERROR vector::topology::builder: msg="Healthcheck failed." error=Connect error: Connection refused (os error 111) component_kind="sink" component_type="socket" component_id=socket component_name=socket
1
2023-01-13T13:11:14.876315Z ERROR sink{component_kind="sink" component_id=socket component_type=socket component_name=socket}: vector::internal_events::common: Unable to connect. error=Connect error: Connection refused (os error 111) error_code="failed_connecting" error_type="connection_failed" stage="sending" internal_log_rate_limit=true
22023-01-13T13:11:15.378472Z ERROR sink{component_kind="sink" component_id=socket component_type=socket component_name=socket}: vector::internal_events::common: Internal log [Unable to connect.] is being rate limited.
3
- Change the connection address from 3001 to 3000. As a result, vector must reread the config, recreate the
socket
sink and connect successfully to the server, but in the fact vector is waiting forever for the sink to complete:
2023-01-13T13:11:21.690757Z INFO vector::config::watcher: Configuration file changed.
2023-01-13T13:11:21.693208Z INFO vector::topology::running: Reloading running topology with new configuration.
^C2023-01-13T13:11:30.386904Z ERROR sink{component_kind="sink" component_id=socket component_type=socket component_name=socket}: vector::internal_events::common: Internal log [Unable to connect.] has been rate limited 4 times.
2023-01-13T13:11:30.386969Z ERROR sink{component_kind="sink" component_id=socket component_type=socket component_name=socket}: vector::internal_events::common: Unable to connect. error=Connect error: Connection refused (os error 111) error_code="failed_connecting" error_type="connection_failed" stage="sending" internal_log_rate_limit=true
Possible solution: inside src/topology/builder.rs
run a future that start a timer after the tripwire is set, then in select
wait for either sink
or this future to complete:
diff --git a/src/topology/builder.rs b/src/topology/builder.rs
index 422dd641d..67bdc49b5 100644
--- a/src/topology/builder.rs
+++ b/src/topology/builder.rs
@@ -13,7 +13,7 @@ use stream_cancel::{StreamExt as StreamCancelExt, Trigger, Tripwire};
use tokio::{
select,
sync::oneshot,
- time::{timeout, Duration},
+ time::{sleep, timeout, Duration},
};
use tracing::Instrument;
use vector_common::internal_event::{
@@ -479,7 +479,7 @@ pub async fn build_pieces(
let (trigger, tripwire) = Tripwire::new();
let sink = async move {
- debug!("Sink starting.");
+ info!("Sink starting.");
// Why is this Arc<Mutex<Option<_>>> needed you ask.
// In case when this function build_pieces errors
@@ -496,7 +496,7 @@ pub async fn build_pieces(
let mut rx = wrap(rx);
let events_received = register!(EventsReceived);
- sink.run(
+ let sink = sink.run(
rx.by_ref()
.filter(|events: &EventArray| ready(filter_events_type(events, input_type)))
.inspect(|events| {
@@ -505,17 +505,31 @@ pub async fn build_pieces(
events.estimated_json_encoded_size_of(),
))
})
- .take_until_if(tripwire),
- )
- .await
- .map(|_| {
- debug!("Sink finished normally.");
- TaskOutput::Sink(rx)
- })
- .map_err(|_| {
- debug!("Sink finished with an error.");
- TaskError::Opaque
- })
+ .take_until_if(tripwire.clone()),
+ );
+ let shutdown = tripwire.then(|_| async move {
+ info!("Force shutdown sink in 10 secs.");
+ sleep(Duration::from_secs(10)).await;
+ });
+
+ select! {
+ biased;
+
+ _ = shutdown => {
+ info!("Sink shutdowned.");
+ Ok(TaskOutput::Sink(rx))
+ },
+ res = sink => match res {
+ Ok(_) => {
+ debug!("Sink finished normally.");
+ Ok(TaskOutput::Sink(rx))
+ }
+ Err(_) => {
+ debug!("Sink finished with an error.");
+ Err(TaskError::Opaque)
+ }
+ }
+ }
};
let task = Task::new(key.clone(), typetag, sink);
With the patch, vector successfully connects to the server after rereading the config:
2023-01-13T13:32:14.813911Z INFO vector::app: Internal log rate limit configured. internal_log_rate_secs=10
2023-01-13T13:32:14.814033Z INFO vector::app: Log level is enabled. level="vector=info,codec=info,vrl=info,file_source=info,tower_limit=trace,rdkafka=info,buffers=info,lapin=info,kube=info"
2023-01-13T13:32:14.814084Z INFO vector::config::watcher: Creating configuration file watcher.
2023-01-13T13:32:14.814228Z INFO vector::config::watcher: Watching configuration files.
2023-01-13T13:32:14.814291Z INFO vector::app: Loading configs. paths=["/tmp/vector.toml"]
2023-01-13T13:32:14.815766Z INFO vector::sources::file_descriptors: Capturing stdin.
2023-01-13T13:32:14.815791Z INFO vector::topology::running: Running healthchecks.
2023-01-13T13:32:14.815904Z INFO vector: Vector has started. debug="false" version="0.27.0" arch="x86_64" revision=""
2023-01-13T13:32:14.815903Z INFO sink{component_kind="sink" component_id=socket component_type=socket component_name=socket}: vector::topology::builder: Sink starting.
2023-01-13T13:32:14.816064Z ERROR vector::topology::builder: msg="Healthcheck failed." error=Connect error: Connection refused (os error 111) component_kind="sink" component_type="socket" component_id=socket component_name=socket
1
2023-01-13T13:34:11.636341Z ERROR sink{component_kind="sink" component_id=socket component_type=socket component_name=socket}: vector::internal_events::common: Unable to connect. error=Connect error: Connection refused (os error 111) error_code="failed_connecting" error_type="connection_failed" stage="sending" internal_log_rate_limit=true
2023-01-13T13:34:12.138994Z ERROR sink{component_kind="sink" component_id=socket component_type=socket component_name=socket}: vector::internal_events::common: Internal log [Unable to connect.] is being rate limited.
2
3
2023-01-13T13:34:20.971383Z INFO vector::config::watcher: Configuration file changed.
2023-01-13T13:34:20.974670Z INFO vector::topology::running: Reloading running topology with new configuration.
2023-01-13T13:34:20.975052Z INFO sink{component_kind="sink" component_id=socket component_type=socket component_name=socket}: vector::topology::builder: Force shutdown sink in 10 secs.
2023-01-13T13:34:27.147312Z ERROR sink{component_kind="sink" component_id=socket component_type=socket component_name=socket}: vector::internal_events::common: Internal log [Unable to connect.] has been rate limited 4 times.
2023-01-13T13:34:27.147391Z ERROR sink{component_kind="sink" component_id=socket component_type=socket component_name=socket}: vector::internal_events::common: Unable to connect. error=Connect error: Connection refused (os error 111) error_code="failed_connecting" error_type="connection_failed" stage="sending" internal_log_rate_limit=true
2023-01-13T13:34:30.976738Z INFO sink{component_kind="sink" component_id=socket component_type=socket component_name=socket}: vector::topology::builder: Sink shutdowned.
2023-01-13T13:34:30.977354Z INFO vector::topology::running: Running healthchecks.
2023-01-13T13:34:30.977777Z INFO vector::topology::running: New configuration loaded successfully.
2023-01-13T13:34:30.977792Z INFO sink{component_kind="sink" component_id=socket component_type=socket component_name=socket}: vector::topology::builder: Sink starting.
2023-01-13T13:34:30.977845Z INFO vector: Vector has reloaded. path=[File("/tmp/vector.toml", None)]
2023-01-13T13:34:30.978478Z INFO vector::topology::builder: Healthcheck passed.
4
5
Netcat output:
❯ nc -k -l 0.0.0.0 3000
{"message":"2","source_type":"stdin","timestamp":"2023-01-13T13:36:59.061227675Z"}
{"message":"3","source_type":"stdin","timestamp":"2023-01-13T13:36:59.882999111Z"}
{"message":"4","source_type":"stdin","timestamp":"2023-01-13T13:37:35.358937153Z"}
{"message":"5","source_type":"stdin","timestamp":"2023-01-13T13:37:36.090770332Z"}
Configuration
[sources.stdin]
type = "stdin"
[sinks.socket]
type = "socket"
inputs = ["stdin"]
address = "127.0.0.1:3001"
mode = "tcp"
encoding.codec = "json"
Version
master-ce0038485
Debug Output
No response
Example Data
No response
Additional Context
No response
References
No response
Interesting - when trying to reproduce with the 0.26.0
release it appears to work without hanging? Perhaps this is caused by the feature set you've created?
❯ vector -c vector.toml -w
2023-01-13T14:21:08.536572Z INFO vector::app: Internal log rate limit configured. internal_log_rate_secs=10
2023-01-13T14:21:08.536713Z INFO vector::app: Log level is enabled. level="vector=info,codec=info,vrl=info,file_source=info,tower_limit=trace,rdkafka=info,buffers=info,lapin=info,kube=info"
2023-01-13T14:21:08.536774Z INFO vector::config::watcher: Creating configuration file watcher.
2023-01-13T14:21:08.541314Z INFO vector::config::watcher: Watching configuration files.
2023-01-13T14:21:08.541368Z INFO vector::app: Loading configs. paths=["vector.toml"]
2023-01-13T14:21:08.543759Z INFO vector::sources::file_descriptors: Capturing stdin.
2023-01-13T14:21:08.543816Z INFO vector::topology::running: Running healthchecks.
2023-01-13T14:21:08.544032Z INFO vector: Vector has started. debug="false" version="0.26.0" arch="x86_64" revision="c6b5bc2 2022-12-05"
2023-01-13T14:21:08.544241Z INFO vector::app: API is disabled, enable by setting `api.enabled` to `true` and use commands like `vector top`.
2023-01-13T14:21:08.544392Z ERROR vector::topology::builder: msg="Healthcheck: Failed Reason." error=Connect error: Connection refused (os error 61) component_kind="sink" component_type="socket" component_id=socket component_name=socket
2023-01-13T14:21:35.815561Z INFO vector::config::watcher: Configuration file changed.
2023-01-13T14:21:35.818710Z INFO vector::topology::running: Reloading running topology with new configuration.
2023-01-13T14:21:35.820920Z INFO vector::topology::running: Running healthchecks.
2023-01-13T14:21:35.821050Z INFO vector::topology::running: New configuration loaded successfully.
2023-01-13T14:21:35.821317Z INFO vector: Vector has reloaded. path=[File("vector.toml", None)]
2023-01-13T14:21:35.821920Z INFO vector::topology::builder: Healthcheck: Passed.
Though I did see if I switched the config back to 3001
Vector panicked 🤔
2023-01-13T14:25:34.553288Z INFO vector::config::watcher: Configuration file changed.
2023-01-13T14:25:34.556178Z INFO vector::topology::running: Reloading running topology with new configuration.
thread 'vector-worker' panicked at '`async fn` resumed after completion', /Users/runner/.cargo/registry/src/github.com-1ecc6299db9ec823/stream-cancel-0.8.1/src/combinator.rs:158:51
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
2023-01-13T14:25:34.560809Z ERROR sink{component_kind="sink" component_id=socket component_type=socket component_name=socket}: vector::topology: An error occurred that Vector couldn't handle: the task panicked and was aborted.
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: Panicked', src/topology/running.rs:493:54
Meet the same issue with vector v0.25.0
2023-03-01T07:18:47.689185Z INFO vector::app: Internal log rate limit configured. internal_log_rate_secs=10
2023-03-01T07:18:47.689342Z INFO vector::app: Log level is enabled. level="vector=info,codec=info,vrl=info,file_source=info,tower_limit=trace,rdkafka=info,buffers=info,lapin=info,kube=info"
2023-03-01T07:18:47.689386Z INFO vector::config::watcher: Creating configuration file watcher.
2023-03-01T07:18:47.689745Z INFO vector::config::watcher: Watching configuration files.
2023-03-01T07:18:47.689783Z INFO vector::app: Loading configs. paths=["/etc/ovm/vector"]
2023-03-01T07:18:47.692549Z WARN vector::config: Source has acknowledgements enabled by a sink, but acknowledgements are not supported by this source. Silent data loss could occur. source="vector-agent-logs" sink="vector-server-logging"
2023-03-01T07:18:47.781199Z INFO vector::topology::running: Running healthchecks.
2023-03-01T07:18:47.781368Z INFO vector: Vector has started. debug="false" version="0.25.0" arch="x86_64" revision="cf7843b 2022-09-26"
2023-03-01T07:18:47.781391Z INFO vector::internal_events::api: API server running. address=0.0.0.0:8686 playground=http://0.0.0.0:8686/playground
2023-03-01T07:18:47.782044Z INFO source{component_kind="source" component_id=traffic component_type=http component_name=traffic}: vector::sources::util::http::prelude: Building HTTP server. address=127.0.0.1:8690
2023-03-01T07:18:47.783088Z ERROR vector::topology::builder: msg="Healthcheck: Failed Reason." error=Vector source unhealthy component_kind="sink" component_type="vector" component_id=vector-server-traffic component_name=vector-server-traffic
2023-03-01T07:18:48.785024Z WARN sink{component_kind="sink" component_id=vector-server-metrics component_type=vector component_name=vector-server-metrics}:request{request_id=1}: vector::sinks::util::retries: Retrying after error. error=Request failed: status: Unavailable, message: "error trying to connect: tcp connect error: Connection refused (os error 111)", details: [], metadata: MetadataMap { headers: {} }
2023-03-01T07:18:49.787483Z WARN sink{component_kind="sink" component_id=vector-server-metrics component_type=vector component_name=vector-server-metrics}:request{request_id=1}: vector::sinks::util::retries: Retrying after error. error=Request failed: status: Unavailable, message: "error trying to connect: tcp connect error: Connection refused (os error 111)", details: [], metadata: MetadataMap { headers: {} }
2023-03-01T07:18:50.789453Z WARN sink{component_kind="sink" component_id=vector-server-metrics component_type=vector component_name=vector-server-metrics}:request{request_id=1}: vector::sinks::util::retries: Retrying after error. error=Request failed: status: Unavailable, message: "error trying to connect: tcp connect error: Connection refused (os error 111)", details: [], metadata: MetadataMap { headers: {} }
2023-03-01T07:18:52.792890Z WARN sink{component_kind="sink" component_id=vector-server-metrics component_type=vector component_name=vector-server-metrics}:request{request_id=1}: vector::sinks::util::retries: Retrying after error. error=Request failed: status: Unavailable, message: "error trying to connect: tcp connect error: Connection refused (os error 111)", details: [], metadata: MetadataMap { headers: {} }
2023-03-01T07:18:53.134102Z INFO vector::config::watcher: Configuration file changed.
2023-03-01T07:18:53.136095Z WARN vector::config: Source has acknowledgements enabled by a sink, but acknowledgements are not supported by this source. Silent data loss could occur. source="vector-agent-logs" sink="vector-server-logging"
2023-03-01T07:18:53.136295Z INFO vector::topology::running: Reloading running topology with new configuration.
2023-03-01T07:18:53.136365Z INFO vector::topology::running: Running healthchecks.
2023-03-01T07:18:53.136383Z INFO vector::topology::running: New configuration loaded successfully.
2023-03-01T07:18:53.136581Z INFO vector: Vector has reloaded. path=[Dir("/etc/ovm/vector")]
2023-03-01T07:18:54.061355Z WARN sink{component_kind="sink" component_id=vector-server-traffic component_type=vector component_name=vector-server-traffic}:request{request_id=1}: vector::sinks::util::retries: Retrying after error. error=Request failed: status: Unavailable, message: "error trying to connect: tcp connect error: Connection refused (os error 111)", details: [], metadata: MetadataMap { headers: {} }
2023-03-01T07:18:55.063033Z WARN sink{component_kind="sink" component_id=vector-server-traffic component_type=vector component_name=vector-server-traffic}:request{request_id=1}: vector::sinks::util::retries: Retrying after error. error=Request failed: status: Unavailable, message: "error trying to connect: tcp connect error: Connection refused (os error 111)", details: [], metadata: MetadataMap { headers: {} }
2023-03-01T07:18:55.503700Z INFO vector::config::watcher: Configuration file changed.
2023-03-01T07:18:55.505903Z WARN vector::config: Source has acknowledgements enabled by a sink, but acknowledgements are not supported by this source. Silent data loss could occur. source="vector-agent-logs" sink="vector-server-logging"
2023-03-01T07:18:55.506158Z INFO vector::topology::running: Reloading running topology with new configuration.
2023-03-01T07:18:55.794525Z WARN sink{component_kind="sink" component_id=vector-server-metrics component_type=vector component_name=vector-server-metrics}:request{request_id=1}: vector::sinks::util::retries: Retrying after error. error=Request failed: status: Unavailable, message: "error trying to connect: tcp connect error: Connection refused (os error 111)", details: [], metadata: MetadataMap { headers: {} }
2023-03-01T07:18:56.065974Z WARN sink{component_kind="sink" component_id=vector-server-traffic component_type=vector component_name=vector-server-traffic}:request{request_id=1}: vector::sinks::util::retries: Retrying after error. error=Request failed: status: Unavailable, message: "error trying to connect: tcp connect error: Connection refused (os error 111)", details: [], metadata: MetadataMap { headers: {} }
2023-03-01T07:18:58.067839Z ERROR sink{component_kind="sink" component_id=vector-server-traffic component_type=vector component_name=vector-server-traffic}:request{request_id=1}: vector::internal_events::common: Retries exhausted; dropping the request. error=Request failed: status: Unavailable, message: "error trying to connect: tcp connect error: Connection refused (os error 111)", details: [], metadata: MetadataMap { headers: {} } error_type="request_failed" stage="sending" internal_log_rate_limit=true
2023-03-01T07:18:58.067894Z ERROR sink{component_kind="sink" component_id=vector-server-traffic component_type=vector component_name=vector-server-traffic}:request{request_id=1}: vector_core::stream::driver: Service call failed. error=Request { source: Status { code: Unavailable, message: "error trying to connect: tcp connect error: Connection refused (os error 111)", source: Some(hyper::Error(Connect, Custom { kind: Other, error: ConnectError("tcp connect error", Os { code: 111, kind: ConnectionRefused, message: "Connection refused" }) })) } } request_id=1
2023-03-01T07:19:00.796134Z WARN sink{component_kind="sink" component_id=vector-server-metrics component_type=vector component_name=vector-server-metrics}:request{request_id=1}: vector::sinks::util::retries: Retrying after error. error=Request failed: status: Unavailable, message: "error trying to connect: tcp connect error: Connection refused (os error 111)", details: [], metadata: MetadataMap { headers: {} }
2023-03-01T07:19:08.797712Z WARN sink{component_kind="sink" component_id=vector-server-metrics component_type=vector component_name=vector-server-metrics}:request{request_id=1}: vector::sinks::util::retries: Retrying after error. error=Request failed: status: Unavailable, message: "error trying to connect: tcp connect error: Connection refused (os error 111)", details: [], metadata: MetadataMap { headers: {} }
2023-03-01T07:19:21.800233Z WARN sink{component_kind="sink" component_id=vector-server-metrics component_type=vector component_name=vector-server-metrics}:request{request_id=1}: vector::sinks::util::retries: Retrying after error. error=Request failed: status: Unavailable, message: "error trying to connect: tcp connect error: Connection refused (os error 111)", details: [], metadata: MetadataMap { headers: {} }
2023-03-01T07:19:42.803212Z WARN sink{component_kind="sink" component_id=vector-server-metrics component_type=vector component_name=vector-server-metrics}:request{request_id=1}: vector::sinks::util::retries: Retrying after error. error=Request failed: status: Unavailable, message: "error trying to connect: tcp connect error: Connection refused (os error 111)", details: [], metadata: MetadataMap { headers: {} }
2023-03-01T07:20:16.805305Z WARN sink{component_kind="sink" component_id=vector-server-metrics component_type=vector component_name=vector-server-metrics}:request{request_id=1}: vector::sinks::util::retries: Retrying after error. error=Request failed: status: Unavailable, message: "error trying to connect: tcp connect error: Connection refused (os error 111)", details: [], metadata: MetadataMap { headers: {} }
2023-03-01T07:21:11.807342Z WARN sink{component_kind="sink" component_id=vector-server-metrics component_type=vector component_name=vector-server-metrics}:request{request_id=1}: vector::sinks::util::retries: Retrying after error. error=Request failed: status: Unavailable, message: "error trying to connect: tcp connect error: Connection refused (os error 111)", details: [], metadata: MetadataMap { headers: {} }
2023-03-01T07:22:40.809642Z WARN sink{component_kind="sink" component_id=vector-server-metrics component_type=vector component_name=vector-server-metrics}:request{request_id=1}: vector::sinks::util::retries: Retrying after error. error=Request failed: status: Unavailable, message: "error trying to connect: tcp connect error: Connection refused (os error 111)", details: [], metadata: MetadataMap { headers: {} }
2023-03-01T07:25:04.811215Z WARN sink{component_kind="sink" component_id=vector-server-metrics component_type=vector component_name=vector-server-metrics}:request{request_id=1}: vector::sinks::util::retries: Retrying after error. error=Request failed: status: Unavailable, message: "error trying to connect: tcp connect error: Connection refused (os error 111)", details: [], metadata: MetadataMap { headers: {} }
Vector just stuck at trying to connect... but after I restart vector, it works.
My guess is that the problem occurs when there is a failed sink request, and although it can detect that the configuration has changed, it does not reload the configuration and keeps waiting for the failed sink to end, but since the previously established grpc connection has failed (assuming it is a sink request from vector to vector, the target vector is also reloaded at this point), resulting in unlimited retries.
My guess is that the problem occurs when there is a failed sink request, and although it can detect that the configuration has changed, it does not reload the configuration and keeps waiting for the failed sink to end, but since the previously established grpc connection has failed (assuming it is a sink request from vector to vector, the target vector is also reloaded at this point), resulting in unlimited retries.
Yeah, I think this is right: the old sink is continuing to try to retry requests as it flushes the in-flight data.
The same problem occurs also for the websocket sink. I use the patched version of vector and pass a tripwire to the websocket sink. The sink checks the tripwire in a loop while reconnecting to WS server and exits from the loop when the vector configuration is reloaded. In the fact, the tripwire acts as a cancellation token. So far, two solutions comes to my mind:
- check for the tripwire in src/topology/builder.rs in code that runs a sink (like it's done for sources)
- or pass a cancellation token to sinks (this way requires to modify each sink)
But for the first solution I sometimes ran into the panic:
thread 'vector-worker' panicked at '`async fn` resumed after completion