vector icon indicating copy to clipboard operation
vector copied to clipboard

Flush sinks during shutdown

Open jszwedko opened this issue 3 years ago • 7 comments

A note for the community (please keep)

Community Note

  • Please vote on this issue by adding a 👍 reaction to the original issue to help the community and maintainers prioritize this request
  • If you are interested in working on this issue or have submitted a pull request, please leave a comment

Use Cases

Motivated by https://discord.com/channels/742820443487993987/746070591097798688/943246759294038017

The user in question has an aws_s3 sink with a large batch timeout (as would be common):

[sinks.aws_s3_upload]
type = "aws_s3"
inputs = ["app_logs"]
bucket = "app-logs"
region = "us-west-2"
[sinks.aws_s3_upload.batch]
  max_bytes = 500000000
  timeout_secs = 1800

In this case, if a SIGTERM is sent to Vector, Vector won't try to flush the batch, but wait the 60 seconds and then just terminate.

Attempted Solutions

No response

Proposal

During shutdown, Vector should attempt to:

  • Stop sources (this already happens)
  • Flush all data from sources and transforms through to sinks (I'm not sure if this happens?)
  • Flush batches in sinks (does not currently happen)

references

  • https://github.com/vectordotdev/vector/issues/9042

Version

No response

jszwedko avatar Feb 15 '22 21:02 jszwedko

Hi @jszwedko I did some load testing along with fluentd & vector, really impressed with the performance of vector. I too have same concern, at present we have to upload our logs from the pods to s3. If vector doesn't upload the remaining chunks (be it whatever size) upon termination, we'll loose the logs.

Any other alternative ways for us to handle this case in agents itself?

Would be nice if vector can flush the logs upon termination also size based uploads like fluentbit/fluentd https://docs.fluentbit.io/manual/pipeline/outputs/s3#configuration-file

manivannan-g avatar Feb 21 '22 07:02 manivannan-g

Hi @manivannan-g !

Vector can do size based uploads (see the batch settings on the aws_s3 sink). I agree this issue needs to be resolved for avoiding data loss for long batches though.

One workaround is configuring a disk buffer on the sink so that the data is preserved across Vector restarts.

jszwedko avatar Feb 23 '22 00:02 jszwedko

Thanks! @jszwedko That would definitely helpful in case of pod (vector agent) restarts. May I know what happens during the node termination? does vector upload whatever in the buffer to S3 and exits?

manivannan-g avatar Feb 23 '22 18:02 manivannan-g

Thanks! @jszwedko That would definitely helpful in case of pod (vector agent) restarts. May I know what happens during the node termination? does vector upload whatever in the buffer to S3 and exits?

Ah, no, you would run into the same issue where Vector would fail to flush the data before shutting down. Also, in the event of a hard crash, the data would be lost.

A setup I might suggest for you is:

  • Have Vector agents collect data and write to a centralized Vector aggregator cluster
  • Configure the centralized aggregator cluster to use some sort of network attached storage for persistent volumes so the data survives node loss. This cluster would be the one writing to S3.

Really, though, we should just resolve this issue which would reduce data loss for you except in the event of an abrupt node failure.

jszwedko avatar Feb 23 '22 19:02 jszwedko

I just wanted to ask as it may be somewhat related.

We're thinking of using vector to process logs from AWS S3 using and SQS queue. The idea was to run vector in ECR Fargate and scale up based on CPU load and/or queue back pressure. The concern we have is during scale down if Vector handles termination gracefully?

Our worry is that if we have a log file "in flight" and Vector gets the SIGTERM from ECR telling to terminate the container will it:

  1. Finish processing the current file.
  2. Not poll for another SQS message to process while in the shutdown process?

From reading this thread it seems like currently any job you have in-flight has 60 seconds to finish or it lost. The worry then is that half the data was sent on to the destination, the SQS event doesn't get deleted and thus is made visible again in the queue so it could be picked up by another node and processed leading to duplication.

Anyone able to offer and insight if this would be the case?

NeilJed avatar Jun 15 '22 21:06 NeilJed

I just wanted to ask as it may be somewhat related.

We're thinking of using vector to process logs from AWS S3 using and SQS queue. The idea was to run vector in ECR Fargate and scale up based on CPU load and/or queue back pressure. The concern we have is during scale down if Vector handles termination gracefully?

Our worry is that if we have a log file "in flight" and Vector gets the SIGTERM from ECR telling to terminate the container will it:

  1. Finish processing the current file.
  2. Not poll for another SQS message to process while in the shutdown process?

From reading this thread it seems like currently any job you have in-flight has 60 seconds to finish or it lost. The worry then is that half the data was sent on to the destination, the SQS event doesn't get deleted and thus is made visible again in the queue so it could be picked up by another node and processed leading to duplication.

Anyone able to offer and insight if this would be the case?

Hi @NeilJed ! Apologies for the delayed response. I think what you are describing would be the case, currently; that if Vector isn't able to finish processing after 60 seconds, it would terminate without deleting the SQS event.

jszwedko avatar Jul 05 '22 16:07 jszwedko

Hi! Are there updates on the prioritization of this feature? Handling the termination case is a critical requirement of our workflow.

shomilj avatar Sep 20 '22 00:09 shomilj

+1 for implementing this We are using vector to send logs from docker to S3 on aws spot instances. Spot gives only 2 minutes warning before the machine is forcibly shut down. If vector cant flush (drain) the in-memory logs to s3, then we lose the logs - not a good thing for production.

aniaptebsft avatar Oct 03 '22 18:10 aniaptebsft

Does anyone know if there are any hacks we can take in the meantime to ensure chunks are flushed to S3?

shomilj avatar Oct 17 '22 17:10 shomilj

Does anyone know if there are any hacks we can take in the meantime to ensure chunks are flushed to S3?

I set the batch timeout to lower than 60 seconds

    batch:
      timeout_secs: 45

shamil avatar Oct 17 '22 18:10 shamil

Does anyone know if there are any hacks we can take in the meantime to ensure chunks are flushed to S3?

I set the batch timeout to lower than 60 seconds

    batch:
      timeout_secs: 45

This has an edge case -- the source reads some events just before SIGTERM and make it into a batch that starts its timer 15 seconds after SIGTERM. Those logs won't get flushed since they would be flushed 30 seconds after the force shutdown occurs. Or, if for some reason it takes > 15 seconds to successfully send a batch to its destination.

My idea here is questionable, but adds some more assurance that the batch timeout_secs will apply as desired. But this can't work with all Vector sources because it requires a way to "drain" them externally, and there's no drain source API.

The gist is, if you can write a custom shutdown handler that captures the SIGTERM and cuts off the sources when the shutdown starts, then wait until they're "flushed" (either by monitoring component_sent_events and buffer_events in the pipeline, or just setting a long enough timer) before sending the real SIGTERM to Vector.

In k8s you can do this with a preStop hook but again it's limited to sources that you can cut off outside of Vector. In my case I'm draining and shutting down the proxy that other systems send to Vector through, then waiting a bit before shutting down Vector itself. Possibly a wrapper bash script to start Vector and catch SIGTERM before it hits Vector could be used in non-k8s but I'm not too familiar with how that would work.

This mostly is helpful for k8s with HPA scale-downs and when pods are moved off a node (such as for cluster upgrades or just resource management). With spot or preemptible instances, this would need to be combined with a small batch.timeout_secs to ensure the average time to 'drain' a source plus the batch timeout seconds plus the time to actually send a batch successfully is less than the timeout the cloud provider has to force kill and reclaim the nodes.

Which is all very hacky and only covers a few types of sources. Being able to increase the 60 seconds to some timeout value of our choosing would be a decent interim solution, until source+transform+sink draining and flushing is implemented.

tdorianh avatar Oct 17 '22 18:10 tdorianh

Some notes from a discussion:

It's possible that the new-style sink architecture makes this easier. Assuming that's true we could just resolve this for the new-style sinks and continue efforts to port sinks. This issue is also only really important for sinks with long batch timeouts like the blob store sinks since the rest would flush within the grace period anyway.

jszwedko avatar Jan 24 '23 15:01 jszwedko

Any plans for this to be prioritized?

edoakes avatar Mar 16 '23 16:03 edoakes

Closing this as this appears to work correctly on the latest version of Vector. https://github.com/vectordotdev/vector/pull/17667 adds a test for it.

jszwedko avatar Aug 02 '23 17:08 jszwedko

I'm still seeing a S3 sink time out on SIGTERM on Vector 0.32.1 – here is the schema of my sink configuration (w/ some fake values):

    customer_bucket_application_logs:
        type: aws_s3
        inputs:
            - application_logs
            - worker_logs
        region: fake-region
        batch:
            timeout_secs: 86400
            max_bytes: 20000000
        buffer:
            type: disk
            max_size: 1073741824
        bucket: fake_bucket_name
        key_prefix: logs/v1/fake_org_id/fake_cluster_id/head-node/${VECTOR_NODE_IP}/${VECTOR_INSTANCE_ID}/{{file}}/
        compression: gzip
        encoding:
            codec: text
        healthcheck:
            enabled: true
        filename_time_format: '%s.%f'

...and here are the logs that I am seeing –

{"msg":"{\"timestamp\":\"2023-09-11T00:38:16.530812Z\",\"level\":\"INFO\",\"message\":\"Shutting down... Waiting on running components.\",\"remaining_components\":\"\\\"[REDACTED],az_loki_sink, customer_bucket_application_logs\\\"\",\"time_remaining\":\"\\\"59 seconds left\\\"\",\"target\":\"vector::topology::running\"}"}
{"msg":"{\"timestamp\":\"2023-09-11T00:38:21.528819Z\",\"level\":\"INFO\",\"message\":\"Shutting down... Waiting on running components.\",\"remaining_components\":\"\\\"customer_bucket_application_logs\\\"\",\"time_remaining\":\"\\\"54 seconds left\\\"\",\"target\":\"vector::topology::running\"}"}
{"msg":"{\"timestamp\":\"2023-09-11T00:38:24.512060Z\",\"level\":\"DEBUG\",\"utilization\":\"0.0000024811197398628487\",\"target\":\"vector::utilization\",\"span\":{\"component_id\":\"customer_bucket_application_logs\",\"component_kind\":\"sink\",\"component_name\":\"customer_bucket_application_logs\",\"component_type\":\"aws_s3\",\"name\":\"sink\"},\"spans\":[{\"component_id\":\"customer_bucket_application_logs\",\"component_kind\":\"sink\",\"component_name\":\"customer_bucket_application_logs\",\"component_type\":\"aws_s3\",\"name\":\"sink\"}]}"}
{"msg":"{\"timestamp\":\"2023-09-11T00:38:26.529046Z\",\"level\":\"INFO\",\"message\":\"Shutting down... Waiting on running components.\",\"remaining_components\":\"\\\"customer_bucket_application_logs\\\"\",\"time_remaining\":\"\\\"49 seconds left\\\"\",\"target\":\"vector::topology::running\"}"}
{"msg":"{\"timestamp\":\"2023-09-11T00:38:29.512051Z\",\"level\":\"DEBUG\",\"utilization\":\"0.00000026053199619697556\",\"target\":\"vector::utilization\",\"span\":{\"component_id\":\"customer_bucket_application_logs\",\"component_kind\":\"sink\",\"component_name\":\"customer_bucket_application_logs\",\"component_type\":\"aws_s3\",\"name\":\"sink\"},\"spans\":[{\"component_id\":\"customer_bucket_application_logs\",\"component_kind\":\"sink\",\"component_name\":\"customer_bucket_application_logs\",\"component_type\":\"aws_s3\",\"name\":\"sink\"}]}"}
{"msg":"{\"timestamp\":\"2023-09-11T00:38:31.529132Z\",\"level\":\"INFO\",\"message\":\"Shutting down... Waiting on running components.\",\"remaining_components\":\"\\\"customer_bucket_application_logs\\\"\",\"time_remaining\":\"\\\"44 seconds left\\\"\",\"target\":\"vector::topology::running\"}"}
{"msg":"{\"timestamp\":\"2023-09-11T00:38:34.512165Z\",\"level\":\"DEBUG\",\"utilization\":\"0.00000003829292025671705\",\"target\":\"vector::utilization\",\"span\":{\"component_id\":\"customer_bucket_application_logs\",\"component_kind\":\"sink\",\"component_name\":\"customer_bucket_application_logs\",\"component_type\":\"aws_s3\",\"name\":\"sink\"},\"spans\":[{\"component_id\":\"customer_bucket_application_logs\",\"component_kind\":\"sink\",\"component_name\":\"customer_bucket_application_logs\",\"component_type\":\"aws_s3\",\"name\":\"sink\"}]}"}
{"msg":"{\"timestamp\":\"2023-09-11T00:38:36.529103Z\",\"level\":\"INFO\",\"message\":\"Shutting down... Waiting on running components.\",\"remaining_components\":\"\\\"customer_bucket_application_logs\\\"\",\"time_remaining\":\"\\\"39 seconds left\\\"\",\"target\":\"vector::topology::running\"}"}
{"msg":"{\"timestamp\":\"2023-09-11T00:38:39.512284Z\",\"level\":\"DEBUG\",\"utilization\":\"0.000000017508967817804845\",\"target\":\"vector::utilization\",\"span\":{\"component_id\":\"customer_bucket_application_logs\",\"component_kind\":\"sink\",\"component_name\":\"customer_bucket_application_logs\",\"component_type\":\"aws_s3\",\"name\":\"sink\"},\"spans\":[{\"component_id\":\"customer_bucket_application_logs\",\"component_kind\":\"sink\",\"component_name\":\"customer_bucket_application_logs\",\"component_type\":\"aws_s3\",\"name\":\"sink\"}]}"}
{"msg":"{\"timestamp\":\"2023-09-11T00:38:41.529373Z\",\"level\":\"INFO\",\"message\":\"Shutting down... Waiting on running components.\",\"remaining_components\":\"\\\"customer_bucket_application_logs\\\"\",\"time_remaining\":\"\\\"34 seconds left\\\"\",\"target\":\"vector::topology::running\"}"}
{"msg":"{\"timestamp\":\"2023-09-11T00:38:44.512496Z\",\"level\":\"DEBUG\",\"utilization\":\"0.000000013810384989818745\",\"target\":\"vector::utilization\",\"span\":{\"component_id\":\"customer_bucket_application_logs\",\"component_kind\":\"sink\",\"component_name\":\"customer_bucket_application_logs\",\"component_type\":\"aws_s3\",\"name\":\"sink\"},\"spans\":[{\"component_id\":\"customer_bucket_application_logs\",\"component_kind\":\"sink\",\"component_name\":\"customer_bucket_application_logs\",\"component_type\":\"aws_s3\",\"name\":\"sink\"}]}"}
{"msg":"{\"timestamp\":\"2023-09-11T00:38:46.529601Z\",\"level\":\"INFO\",\"message\":\"Shutting down... Waiting on running components.\",\"remaining_components\":\"\\\"customer_bucket_application_logs\\\"\",\"time_remaining\":\"\\\"29 seconds left\\\"\",\"target\":\"vector::topology::running\"}"}
{"msg":"{\"timestamp\":\"2023-09-11T00:38:49.511695Z\",\"level\":\"DEBUG\",\"utilization\":\"0.000000012182770307988884\",\"target\":\"vector::utilization\",\"span\":{\"component_id\":\"customer_bucket_application_logs\",\"component_kind\":\"sink\",\"component_name\":\"customer_bucket_application_logs\",\"component_type\":\"aws_s3\",\"name\":\"sink\"},\"spans\":[{\"component_id\":\"customer_bucket_application_logs\",\"component_kind\":\"sink\",\"component_name\":\"customer_bucket_application_logs\",\"component_type\":\"aws_s3\",\"name\":\"sink\"}]}"}
{"msg":"{\"timestamp\":\"2023-09-11T00:38:51.529577Z\",\"level\":\"INFO\",\"message\":\"Shutting down... Waiting on running components.\",\"remaining_components\":\"\\\"customer_bucket_application_logs\\\"\",\"time_remaining\":\"\\\"24 seconds left\\\"\",\"target\":\"vector::topology::running\"}"}
{"msg":"{\"timestamp\":\"2023-09-11T00:38:54.512515Z\",\"level\":\"DEBUG\",\"utilization\":\"0.00000002767393059864618\",\"target\":\"vector::utilization\",\"span\":{\"component_id\":\"customer_bucket_application_logs\",\"component_kind\":\"sink\",\"component_name\":\"customer_bucket_application_logs\",\"component_type\":\"aws_s3\",\"name\":\"sink\"},\"spans\":[{\"component_id\":\"customer_bucket_application_logs\",\"component_kind\":\"sink\",\"component_name\":\"customer_bucket_application_logs\",\"component_type\":\"aws_s3\",\"name\":\"sink\"}]}"}
{"msg":"{\"timestamp\":\"2023-09-11T00:38:56.529514Z\",\"level\":\"INFO\",\"message\":\"Shutting down... Waiting on running components.\",\"remaining_components\":\"\\\"customer_bucket_application_logs\\\"\",\"time_remaining\":\"\\\"19 seconds left\\\"\",\"target\":\"vector::topology::running\"}"}
{"msg":"{\"timestamp\":\"2023-09-11T00:38:59.512429Z\",\"level\":\"DEBUG\",\"utilization\":\"0.000000029767855425787193\",\"target\":\"vector::utilization\",\"span\":{\"component_id\":\"customer_bucket_application_logs\",\"component_kind\":\"sink\",\"component_name\":\"customer_bucket_application_logs\",\"component_type\":\"aws_s3\",\"name\":\"sink\"},\"spans\":[{\"component_id\":\"customer_bucket_application_logs\",\"component_kind\":\"sink\",\"component_name\":\"customer_bucket_application_logs\",\"component_type\":\"aws_s3\",\"name\":\"sink\"}]}"}
{"msg":"{\"timestamp\":\"2023-09-11T00:39:01.529367Z\",\"level\":\"INFO\",\"message\":\"Shutting down... Waiting on running components.\",\"remaining_components\":\"\\\"customer_bucket_application_logs\\\"\",\"time_remaining\":\"\\\"14 seconds left\\\"\",\"target\":\"vector::topology::running\"}"}
{"msg":"{\"timestamp\":\"2023-09-11T00:39:04.511612Z\",\"level\":\"DEBUG\",\"utilization\":\"0.000000013778553422731799\",\"target\":\"vector::utilization\",\"span\":{\"component_id\":\"customer_bucket_application_logs\",\"component_kind\":\"sink\",\"component_name\":\"customer_bucket_application_logs\",\"component_type\":\"aws_s3\",\"name\":\"sink\"},\"spans\":[{\"component_id\":\"customer_bucket_application_logs\",\"component_kind\":\"sink\",\"component_name\":\"customer_bucket_application_logs\",\"component_type\":\"aws_s3\",\"name\":\"sink\"}]}"}
{"msg":"{\"timestamp\":\"2023-09-11T00:39:06.529524Z\",\"level\":\"INFO\",\"message\":\"Shutting down... Waiting on running components.\",\"remaining_components\":\"\\\"customer_bucket_application_logs\\\"\",\"time_remaining\":\"\\\"9 seconds left\\\"\",\"target\":\"vector::topology::running\"}"}
{"msg":"{\"timestamp\":\"2023-09-11T00:39:09.512467Z\",\"level\":\"DEBUG\",\"utilization\":\"0.00000001361576252990203\",\"target\":\"vector::utilization\",\"span\":{\"component_id\":\"customer_bucket_application_logs\",\"component_kind\":\"sink\",\"component_name\":\"customer_bucket_application_logs\",\"component_type\":\"aws_s3\",\"name\":\"sink\"},\"spans\":[{\"component_id\":\"customer_bucket_application_logs\",\"component_kind\":\"sink\",\"component_name\":\"customer_bucket_application_logs\",\"component_type\":\"aws_s3\",\"name\":\"sink\"}]}"}
{"msg":"{\"timestamp\":\"2023-09-11T00:39:11.529345Z\",\"level\":\"INFO\",\"message\":\"Shutting down... Waiting on running components.\",\"remaining_components\":\"\\\"customer_bucket_application_logs\\\"\",\"time_remaining\":\"\\\"4 seconds left\\\"\",\"target\":\"vector::topology::running\"}"}
{"msg":"{\"timestamp\":\"2023-09-11T00:39:14.512472Z\",\"level\":\"DEBUG\",\"utilization\":\"0.0000000289015487554785\",\"target\":\"vector::utilization\",\"span\":{\"component_id\":\"customer_bucket_application_logs\",\"component_kind\":\"sink\",\"component_name\":\"customer_bucket_application_logs\",\"component_type\":\"aws_s3\",\"name\":\"sink\"},\"spans\":[{\"component_id\":\"customer_bucket_application_logs\",\"component_kind\":\"sink\",\"component_name\":\"customer_bucket_application_logs\",\"component_type\":\"aws_s3\",\"name\":\"sink\"}]}"}
{"msg":"{\"timestamp\":\"2023-09-11T00:39:16.529375Z\",\"level\":\"ERROR\",\"message\":\"Failed to gracefully shut down in time. Killing components.\",\"components\":\"\\\"customer_bucket_application_logs\\\"\",\"target\":\"vector::topology::running\"}"}

All of my other sinks (an assortment of Loki / Prometheus Remote Write / etc.) flush fine.

Is there some requirement that logs must stop being emitted from the source in order for the S3 sink to shut down properly? (I don't think this is the case for the other sinks?)

shomilj avatar Sep 11 '23 00:09 shomilj

Hi @shomilj !

One guess I have is that your disk buffer still has data in it. When shutting down, I believe the sinks will try to drain the buffers before stopping. Are you able to check if the disk buffer had data in it? You can look at this internal metric, https://vector.dev/docs/reference/configuration/sources/internal_metrics/#buffer_events, to check. If that is the case, I can open a separate issue since I think, for disk buffers, waiting is unnecessary given the data will be persisted.

jszwedko avatar Sep 12 '23 10:09 jszwedko

@jszwedko can you confirm that sinks try to drain disk buffers when SIGTERM is sent? Can you direct me to the code responsible for that?

hillmandj avatar Sep 26 '23 16:09 hillmandj

@jszwedko can you confirm that sinks try to drain disk buffers when SIGTERM is sent? Can you direct me to the code responsible for that?

Correct, Vector will wait for buffers to drain. If that doesn't happen within 60 seconds (the default timeout) Vector will still shut down anyway. You can disable the shutdown timeout via --no-graceful-shutdown-limit or change the timeout with --graceful-shutdown-limit-secs. The way this is modeled in the code is simply that the input stream for the sink isn't closed so long as there are events in the buffer.

jszwedko avatar Sep 26 '23 17:09 jszwedko