connect icon indicating copy to clipboard operation
connect copied to clipboard

Potential race condition when sending batches to http output (that doesn't use multipart)

Open wirtsi opened this issue 2 years ago • 5 comments

Hey Ash,

I have a weird issue that only occurs sometimes. What I do is

  • Read data from an API through the input
  • Massage the data
  • Split the larger objects into small messages via unarchive
  • Send the objects to a http output. This endpoint doesn't support multipart so batch_as_multipart is off

Code looks somewhat like this

    - bloblang: |
       root.items = this.items.map_each(item -> this.zooQuantity.filter(zoo -> zoo.key == item.key).index(0).assign(item)) 
      label: merge_zoo_inventory
    - bloblang: |
        root = this.items.filter(item -> item.zooStock != item.availability.availableQuantity)
      label: filter_unchanged
    - unarchive:
        format: json_array
      label: split_messages
    - bloblang: |
        root.id = this.availability.id
        root.version = this.availability.version
        root.actions = [{"action": "changeQuantity", "quantity": this.zooStock}]
      label: assemble_payload
output:
    http_client:
      url: https://api.europe-west1.gcp.commercetools.com/zooplus-dev/inventory/${! json("id") }
      verb: POST
      batch_as_multipart: false
      batching:
        period: 1s
      headers:
        Content-Type: application/json
      timeout: 5s
      max_in_flight: 1
      oauth2:
        enabled: true
        client_key: '<key>'
        client_secret: '<secret>'
        token_url: 'https://auth.europe-west1.gcp.commercetools.com/oauth/token?grant_type=client_credentials'

So most of the time I get outputs like

{"@timestamp":"2022-03-09T17:18:06+01:00","@service":"benthos","component":"benthos.pipeline.processor.9","level":"INFO","message":"{\"actions\":[{\"action\":\"changeQuantity\",\"quantity\":521}],\"id\":\"8a621fa6-7702-4c1f-91e3-13e0adb85bc0\",\"version\":20}"}
{"@timestamp":"2022-03-09T17:18:06+01:00","@service":"benthos","component":"ct_product_projections","level":"DEBUG","message":"Waiting for pending acks to resolve before shutting down."}
{"@timestamp":"2022-03-09T17:18:06+01:00","@service":"benthos","component":"ct_product_projections","level":"DEBUG","message":"Pending acks resolved."}
{"@timestamp":"2022-03-09T17:18:06+01:00","@service":"benthos","component":"benthos","level":"INFO","message":"Pipeline has terminated. Shutting down the service."}

But every 2-3 runs I get

{"@timestamp":"2022-03-09T17:18:11+01:00","@service":"benthos","component":"benthos.pipeline.processor.9","level":"INFO","message":"{\"actions\":[{\"action\":\"changeQuantity\",\"quantity\":521}],\"id\":\"8a621fa6-7702-4c1f-91e3-13e0adb85bc0\",\"version\":20}"}
{"@timestamp":"2022-03-09T17:18:11+01:00","@service":"benthos","component":"ct_product_projections","level":"DEBUG","message":"Waiting for pending acks to resolve before shutting down."}
{"@timestamp":"2022-03-09T17:18:11+01:00","@service":"benthos","component":"ct_product_projections","level":"DEBUG","message":"Pending acks resolved."}
{"@timestamp":"2022-03-09T17:18:11+01:00","@service":"benthos","component":"benthos","level":"INFO","message":"Pipeline has terminated. Shutting down the service."}
{"@timestamp":"2022-03-09T17:18:11+01:00","@service":"benthos","component":"ct_inventory","level":"ERROR","message":"Failed to send message to http_client: https://api.europe-west1.gcp.commercetools.com/zooplus-dev/inventory/10093b6f-47aa-4a58-9cbd-b2be174bdfbd: type was closed"}

So to me this looks like after the first broken-down message gets acked by the output, the original message is also acknowledged and the http output never gets to send all the requests. Another idea might be that the input also uses oauth (it reads from CommerceTools) and the oauth client is somehow shared and torn down too soon?

Any ideas on how to work around this?

Bests & thanks :-)

wirtsi avatar Mar 09 '22 16:03 wirtsi

Hey @wirtsi, which version of benthos are you using?

Jeffail avatar Mar 09 '22 16:03 Jeffail

Ah, yes.. that should help: Version: 3.65.0

wirtsi avatar Mar 09 '22 16:03 wirtsi

Quick triage check of the code base and I suspect this might be a problem with graceful termination tearing down the output too aggressively, one thing worth trying is keeping batch_as_multipart as true and break the batch out with a split before hitting the output:

output:
  processors:
    - split: {}
  http_client:
    batch_as_multipart: true
    ...

If you can still reproduce the error with that set up then let me know, otherwise I think I know where this problem is.

Jeffail avatar Mar 09 '22 16:03 Jeffail

@Jeffail just tried this ... but no luck. It feels that the errors occurs less frequently but also before the outcome was not consistent. Sometimes 10 calls work in a row, sometimes 10 fail

Thanks again!

wirtsi avatar Mar 09 '22 17:03 wirtsi

Hey @wirtsi, I'm not having any luck trying to reproduce this. I don't think it's likely to be related to oauth2 settings but I can't rule it out either. Is this still something you're able to reproduce often? If so there's a couple approaches we can take for diagnosing this. If you can reduce the example down to something I can run locally that'd be ideal, otherwise I can look into adding some custom logging and diagnostics to a build for you to try out.

Jeffail avatar May 24 '22 17:05 Jeffail