vector
vector copied to clipboard
Vector unable to process AWS S3 messages (SQS notification parsing error)
A note for the community
- Please vote on this issue by adding a 👍 reaction to the original issue to help the community and maintainers prioritize this request
- If you are interested in working on this issue or have submitted a pull request, please leave a comment
Problem
I am setting up an S3 source to capture our AWS alblogs. I set up the following source:
[sources.alblogs]
type = "aws_s3"
region = "us-west-2"
compression = "gzip"
[sources.alblogs.sqs]
queue_url = "https://sqs.us-west-2.amazonaws.com/XX/alb-logs-dev-sqs-notification"
Vector is receiving the SQS notification messages as expected, and those look like this (shortened for discord message limit):
{"Records":[{"eventVersion":"2.1","eventSource":"aws:s3","awsRegion":"us-west-2","eventTime":"2022-09-15T18:10:03.735Z","eventName":"ObjectCreated:Put","userIdentity":{"principalId":"AWS:AI"},"requestParameters":{"sourceIPAddress":"2600:1f13:745:3500:d439:cb9d:73b5:46a1"},"responseElements":{"x-amz-request-id":"X3SJH","x-amz-id-2":"rih"},"s3":{"s3SchemaVersion":"1.0","configurationId":"tf-s3-queue-202X","bucket":{"name":"alb-logs-dev.XXXXXXXXXX","ownerIdentity":{"principalId":"XX"},"arn":"arn:aws:s3:::alb-logs-dev.X"},"object":{"key":"public/AWSLogs/XXX/elasticloadbalancing/us-west-2/2022/09/15/X_elasticloadbalancing_us-west-2_app.dev-api.xx_20220915T1810Z_10.XXX.XXX.XXX_61j36cqr.log.gz","size":227370,"eTag":"955c8xxad9","sequencer":"00632XF29"}}}]}
But then is generating this error and not processing these events further, the SQS queue message is removed from the queue.
{...
"vector": {
"error": "Could not parse SQS message with id 8-x-x-4 as S3 notification: missing field Records at line 1 column 246",
"error_code": "failed_processing_sqs_message",
"error_type": "parser_failed",
"host": {},
"message_id": "8-x-x-4",
"metadata": {
"kind": "event",
"target": "vector::internal_events::aws_sqs::s3"
},
"pid": 112,
"source_type": "internal_logs",
"stage": "processing",
"timestamp": "2XXZ",
"vector": {
"component_id": "alblogs",
"component_kind": "source",
"component_name": "alblogs",
"component_type": "aws_s3"
}
}
},
...}
I am expecting that the SQS notification would trigger vector to receive the S3 message and process it. The SQS message seems to be formated correctly, based on what that error message is indicating it is looking for.
We are using the Dockerized vector image at timberio/vector:latest-alpine
. Full configs are shown below -- obfuscated as needed.
Configuration
[log_schema]
host_key = "host.hostname"
timestamp_key = "@timestamp"
[api]
enabled = true
address = "0.0.0.0:8686"
[sources.vector_metrics]
type = "internal_metrics"
[sinks.prometheus]
type = "prometheus_exporter"
inputs = ["vector_metrics"]
address = "0.0.0.0:1953"
[sources.vector_logs]
type = "internal_logs"
[transforms.vector]
type = "remap"
inputs = ["vector_logs"]
source = '''
.vector = del(.)
."@timestamp" = del(.vector."@timestamp")
.message = del(.vector.message)
.log.logger = del(.vector.metadata.module_path)
.log.level = downcase(string!(del(.vector.metadata.level)))
.ecs.version = "1.7"
.agent.type = "vector"
.agent.version = get_env_var("VECTOR_VERSION") ?? null
.event.dataset = ["app"]
.host.hostname = get_hostname!()
.cloud.provider = get_env_var("CLOUD_PROVIDER") ?? null
.cloud.region = get_env_var("CLOUD_REGION") ?? null
.tenant.name = "XXXXX"
.service.name = "XXX-dev"
del(.vector.host.hostname)
'''
[sinks.daas_vector]
type = "elasticsearch"
inputs = ["vector"]
endpoint = "${ELK_URL}"
bulk.index = "logz-XXXXXXX-forwarder"
bulk.action = "create"
auth.strategy = "basic"
auth.user = "${ELK_USERNAME}"
auth.password = "${ELK_PASSWORD}"
batch.max_bytes = 10490000
batch.max_events = 1000
batch.timeout_secs = 8
request.retry_attempts = 1
buffer.type = "disk"
buffer.max_size = 268435488
buffer.when_full = "drop_newest"
tls.verify_certificate = false
tls.verify_hostname = false
[[tests]]
name = "check_vector_logs"
[[tests.inputs]]
insert_at = "vector"
type = "log"
log_fields.message = "Received one event."
log_fields.timestamp = "2021-01-06T19:25:57.852322900Z"
log_fields."metadata.level" = "WARN"
log_fields."metadata.module_path" = "vector::topology"
[[tests.outputs]]
extract_from = "vector"
[[tests.outputs.conditions]]
type = "check_fields"
"@timestamp.eq" = "2021-01-06T19:25:57.852322900Z"
"message.eq" = "Received one event."
"log.level.eq" = "warn"
"log.logger.eq" = "vector::topology"
"ecs.version.eq" = "1.7"
"agent.type.eq" = "vector"
"agent.version.exists" = true
"event.dataset[0].eq" = "app"
"host.hostname.exists" = true
"cloud.provider.exists" = true
"cloud.region.exists" = true
"tenant.name.eq" = "watchwith"
"service.name.eq" = "mts-dev"
[sinks.daas]
type = "elasticsearch"
inputs = ["remap", "remap_sqs", "remap_alblogs"]
endpoint = "${ELK_URL}"
bulk.index = "logz-XXXXXX-XXX-dev"
bulk.action = "create"
auth.strategy = "basic"
auth.user = "${ELK_USERNAME}"
auth.password = "${ELK_PASSWORD}"
buffer.type = "disk"
buffer.max_size = 1049000000
buffer.when_full = "drop_newest"
tls.verify_certificate = false
tls.verify_hostname = false
[sinks.s3_archive]
type = "aws_s3"
inputs = [ "sqs" ]
bucket = "XXX-dev-vector-logs-backup"
key_prefix = "archive/date=%F/"
compression = "gzip"
region = "us-west-2"
healthcheck = false
[sinks.s3_archive.encoding]
codec = "json"
[sources.syslog]
type = "syslog"
address = "0.0.0.0:514"
mode = "udp"
[sources.sqs]
type = "aws_sqs"
poll_secs = 15
delete_message = true
client_concurrency = 1
region = "${SQS_QUEUE_REGION}"
queue_url = "${SQS_QUEUE_URL}"
[sources.alblogs]
type = "aws_s3"
region = "us-west-2"
compression = "gzip"
[sources.alblogs.sqs]
queue_url = "https://sqs.us-west-2.amazonaws.com/XXXXXXXX/alb-logs-XXX-dev-sqs-notification"
[transforms.remap]
type = "remap"
inputs = ["syslog"]
source = '''
'''
[transforms.remap_sqs]
type = "remap"
inputs = ["sqs"]
source = """
. = parse_json!(.message)"""
[transforms.remap_alblogs]
type = "remap"
inputs = ["alblogs"]
source = """
. = parse_aws_alb_log!(.message)"""
Version
vector 0.23.3 (x86_64-unknown-linux-musl af8c9e1 2022-08-10)
Debug Output
Since these are Fargate-backed images -- this will be difficult, but I can make it happen if that helps.
Example Data
No response
Additional Context
Vector is running in a pair of Fargate-backed ECS managed docker containers. They current successfully receive logs from an SQS queue (separate from the S3 that we are trying to read from), so I know it is capable of reading SQS messages correctly.
References
No response