vector icon indicating copy to clipboard operation
vector copied to clipboard

Vector unable to process AWS S3 messages (SQS notification parsing error)

Open irvintim opened this issue 2 years ago • 0 comments

A note for the community

  • Please vote on this issue by adding a 👍 reaction to the original issue to help the community and maintainers prioritize this request
  • If you are interested in working on this issue or have submitted a pull request, please leave a comment

Problem

I am setting up an S3 source to capture our AWS alblogs. I set up the following source:

[sources.alblogs]
  type = "aws_s3"
  region = "us-west-2"
  compression = "gzip"
  [sources.alblogs.sqs]
    queue_url = "https://sqs.us-west-2.amazonaws.com/XX/alb-logs-dev-sqs-notification" 

Vector is receiving the SQS notification messages as expected, and those look like this (shortened for discord message limit):

{"Records":[{"eventVersion":"2.1","eventSource":"aws:s3","awsRegion":"us-west-2","eventTime":"2022-09-15T18:10:03.735Z","eventName":"ObjectCreated:Put","userIdentity":{"principalId":"AWS:AI"},"requestParameters":{"sourceIPAddress":"2600:1f13:745:3500:d439:cb9d:73b5:46a1"},"responseElements":{"x-amz-request-id":"X3SJH","x-amz-id-2":"rih"},"s3":{"s3SchemaVersion":"1.0","configurationId":"tf-s3-queue-202X","bucket":{"name":"alb-logs-dev.XXXXXXXXXX","ownerIdentity":{"principalId":"XX"},"arn":"arn:aws:s3:::alb-logs-dev.X"},"object":{"key":"public/AWSLogs/XXX/elasticloadbalancing/us-west-2/2022/09/15/X_elasticloadbalancing_us-west-2_app.dev-api.xx_20220915T1810Z_10.XXX.XXX.XXX_61j36cqr.log.gz","size":227370,"eTag":"955c8xxad9","sequencer":"00632XF29"}}}]}

But then is generating this error and not processing these events further, the SQS queue message is removed from the queue.

{...
    "vector": {
      "error": "Could not parse SQS message with id 8-x-x-4 as S3 notification: missing field Records at line 1 column 246",
      "error_code": "failed_processing_sqs_message",
      "error_type": "parser_failed",
      "host": {},
      "message_id": "8-x-x-4",
      "metadata": {
        "kind": "event",
        "target": "vector::internal_events::aws_sqs::s3"
      },
      "pid": 112,
      "source_type": "internal_logs",
      "stage": "processing",
      "timestamp": "2XXZ",
      "vector": {
        "component_id": "alblogs",
        "component_kind": "source",
        "component_name": "alblogs",
        "component_type": "aws_s3"
      }
    }
  },
...}

I am expecting that the SQS notification would trigger vector to receive the S3 message and process it. The SQS message seems to be formated correctly, based on what that error message is indicating it is looking for.

We are using the Dockerized vector image at timberio/vector:latest-alpine. Full configs are shown below -- obfuscated as needed.

Configuration

[log_schema]
    host_key = "host.hostname"
    timestamp_key = "@timestamp"

[api]
    enabled = true
    address = "0.0.0.0:8686"

[sources.vector_metrics]
  type = "internal_metrics"

[sinks.prometheus]
  type = "prometheus_exporter"
  inputs = ["vector_metrics"]
  address = "0.0.0.0:1953"

[sources.vector_logs]
  type = "internal_logs"

[transforms.vector]
  type = "remap"
  inputs = ["vector_logs"]
  source = '''
    .vector = del(.)
    ."@timestamp" = del(.vector."@timestamp")
    .message = del(.vector.message)
    .log.logger = del(.vector.metadata.module_path)
    .log.level = downcase(string!(del(.vector.metadata.level)))

    .ecs.version = "1.7"
    .agent.type = "vector"
    .agent.version = get_env_var("VECTOR_VERSION") ?? null
    .event.dataset = ["app"]
    .host.hostname = get_hostname!()
    .cloud.provider = get_env_var("CLOUD_PROVIDER") ?? null
    .cloud.region = get_env_var("CLOUD_REGION") ?? null
    .tenant.name = "XXXXX"
    .service.name = "XXX-dev"

    del(.vector.host.hostname)
  '''

[sinks.daas_vector]
  type = "elasticsearch"
  inputs = ["vector"]
  endpoint = "${ELK_URL}"
  bulk.index = "logz-XXXXXXX-forwarder"
  bulk.action = "create"

  auth.strategy = "basic"
  auth.user = "${ELK_USERNAME}"
  auth.password = "${ELK_PASSWORD}"

  batch.max_bytes = 10490000
  batch.max_events = 1000
  batch.timeout_secs = 8

  request.retry_attempts = 1
  buffer.type = "disk"
  buffer.max_size = 268435488
  buffer.when_full = "drop_newest"

  tls.verify_certificate = false
  tls.verify_hostname = false

[[tests]]
  name = "check_vector_logs"

  [[tests.inputs]]
    insert_at = "vector"
    type = "log"
    log_fields.message = "Received one event."
    log_fields.timestamp = "2021-01-06T19:25:57.852322900Z"
    log_fields."metadata.level" = "WARN"
    log_fields."metadata.module_path" = "vector::topology"

  [[tests.outputs]]
    extract_from = "vector"

    [[tests.outputs.conditions]]
      type = "check_fields"
      "@timestamp.eq" = "2021-01-06T19:25:57.852322900Z"
      "message.eq" = "Received one event."
      "log.level.eq" = "warn"
      "log.logger.eq" = "vector::topology"
      "ecs.version.eq" = "1.7"
      "agent.type.eq" = "vector"
      "agent.version.exists" = true
      "event.dataset[0].eq" = "app"
      "host.hostname.exists" = true
      "cloud.provider.exists" = true
      "cloud.region.exists" = true
      "tenant.name.eq" = "watchwith"
      "service.name.eq" = "mts-dev"

[sinks.daas]
  type = "elasticsearch"
  inputs = ["remap", "remap_sqs", "remap_alblogs"]
  endpoint = "${ELK_URL}"
  bulk.index = "logz-XXXXXX-XXX-dev"
  bulk.action = "create"

  auth.strategy = "basic"
  auth.user = "${ELK_USERNAME}"
  auth.password = "${ELK_PASSWORD}"

  buffer.type = "disk"
  buffer.max_size = 1049000000
  buffer.when_full = "drop_newest"

  tls.verify_certificate = false
  tls.verify_hostname = false

[sinks.s3_archive]
  type = "aws_s3"
  inputs = [ "sqs" ]
  bucket = "XXX-dev-vector-logs-backup"
  key_prefix = "archive/date=%F/"
  compression = "gzip"
  region = "us-west-2"
  healthcheck = false

  [sinks.s3_archive.encoding]
    codec = "json"

[sources.syslog]
  type = "syslog"
  address = "0.0.0.0:514"
  mode = "udp"

[sources.sqs]
  type = "aws_sqs"
  poll_secs = 15
  delete_message = true
  client_concurrency = 1
  region = "${SQS_QUEUE_REGION}"
  queue_url = "${SQS_QUEUE_URL}"

[sources.alblogs]
  type = "aws_s3"
  region = "us-west-2"
  compression = "gzip"
  [sources.alblogs.sqs]
    queue_url = "https://sqs.us-west-2.amazonaws.com/XXXXXXXX/alb-logs-XXX-dev-sqs-notification"

[transforms.remap]
  type = "remap"
  inputs = ["syslog"]
  source = '''
  '''

[transforms.remap_sqs]
  type = "remap"
  inputs = ["sqs"]
  source = """
    . = parse_json!(.message)"""

[transforms.remap_alblogs]
  type = "remap"
  inputs = ["alblogs"]
  source = """
    . = parse_aws_alb_log!(.message)"""

Version

vector 0.23.3 (x86_64-unknown-linux-musl af8c9e1 2022-08-10)

Debug Output

Since these are Fargate-backed images -- this will be difficult, but I can make it happen if that helps.

Example Data

No response

Additional Context

Vector is running in a pair of Fargate-backed ECS managed docker containers. They current successfully receive logs from an SQS queue (separate from the S3 that we are trying to read from), so I know it is capable of reading SQS messages correctly.

References

No response

irvintim avatar Sep 15 '22 19:09 irvintim