[Feature Request] Add batching and concurrency options
Many of the AWS clients support sending multiple events within a single request, such as SQS and EventBridge, but there are typically quota limits on how many. Addtionally, the client is usually configured with a maximum number of concurrent requests, typically defined by the HTTP agent (tends to be 50).
I have often found myself repeating the process of having some iterable of events and having to chunk according to the service limit per request, and then handling the requests serially or with some degree of concurrency.
It would be useful if the generated Effect clients had additional options to control this, though appreciate this might then affect response (success and response handling). I have tended to rely on errors being returned to the invoker so that it takes advantage of the inherent retries and DLQ handling where configured, rather than attempting to implement retry logic at the client level.
I'm not sure if its possible for the code gen to know what the service limits are, and which commands they would apply to. There are additional service limits such as payload size to also consider, although calculating those can, I think, be service specific and perhaps much harder to implement generically.
For example:
EventBridge.putEvents(
{ entries },
{}, // handler options
{ batchSize: 10, concurrency: 10}
)
that is a good suggestion, I had this in mind to implement for dynamodb service (for batchGetItem and batchWriteItem)
but I suggest to ether merge options with second argument
EventBridge.putEvents(
{ entries },
{ batchSize: 10, concurrency: 10} // handler options
)
or add some higher order function like withConcurrency, similar to withRequestBatching and withConcurrency from Effect
I'm not sure if its possible for the code gen to know what the service limits are, and which commands they would apply to.
that is most probably the case, but I'm not agains to customize clients, especially it is already the case for client-s3 where I added presigned url overloads as custom addition https://github.com/floydspace/effect-aws/blob/eabaa4a1e81c64a0d3b316ba09081f05a4a644aa/packages/client-s3/src/S3Service.ts#L850-L862
I took a look at the Request/RequestResolver interfaces in the Batching docs.
Here I was taking the approach that requests could be made for single entries, and the resolver would submit the batch in a single putEvents call. The idea being that the caller would only have to iterate of input entries once, and the batching logic should work on those single entries. This might be an approach for all commands that accept arrays of entries, but there may be some complication over whether the target resource (SQS Queue etc) is part of the entry or the request. In my initial use case of EventBridge the EventBus is an entry detail, so easy to handle.
However, I don't seem to be able to get the correct return type, as Request.complete/completeEffect have void returns:
class PutEventError {
readonly _tag = 'PutEventError'
}
interface PutEvent extends Request.Request<PutEventsResultEntry, PutEventError>{
readonly _tag: 'PutEvent'
readonly entry: PutEventsRequestEntry
}
const PutEvent = Request.tagged<PutEvent>('PutEvent')
const PutEventResolver = RequestResolver.makeBatched((requests: ReadonlyArray<PutEvent>) =>
EventBridge.putEvents({ Entries: requests?.map(request => request.entry) }).pipe(
Effect.map(response => response.Entries ?? []),
Effect.andThen(results =>
Effect.forEach(requests, (request, index) =>
Request.completeEffect(request, Effect.succeed(results[index]!))))
)
)
gives:
TS2345: Argument of type
(requests: ReadonlyArray<PutEvent>) => Effect. Effect<void[], InternalError | SdkError, EventBridgeService>
is not assignable to parameter of type
(requests: [PutEvent, ...PutEvent[]]) => Effect<void, never, EventBridgeService>
Type
Effect<void[], InternalError | SdkError, EventBridgeService>
is not assignable to type Effect<void, never, EventBridgeService>
Type void[] is not assignable to type void
Update code in previous comment to include request types
you complete your request when it succeeds, but you also need to complete in case of errors,
according to the TS error, the expected output error is never, means you need to Effect.orDie
however I see you actually use PutEventError error in your request interface, so you need to remap from InternalError | SdkError to PutEventError
...
Effect.catchAll((error) =>
Effect.forEach(requests, Request.completeEffect(Effect.fail(new PutEventError()))),
),
Thanks, somehow I wasn't thinking that was important to the outcome!
Here is an outline of Batching code for an EventBridge PutEvent (singular):
class PutEventError {
readonly _tag = 'PutEventError'
}
interface PutEventRequest extends Request.Request<PutEventsResultEntry, PutEventError>{
readonly _tag: 'PutEventRequest'
readonly entry: PutEventsRequestEntry
}
const PutEventRequest = Request.tagged<PutEventRequest>('PutEventRequest')
const PutEventResolver = RequestResolver.makeBatched((requests: ReadonlyArray<PutEventRequest>) =>
EventBridge.putEvents({ Entries: requests?.map(request => request.entry) }).pipe(
Effect.map(response => response.Entries ?? []),
Effect.andThen(results =>
Effect.forEach(requests, (request, index) =>
Request.completeEffect(request, Effect.succeed(results[index]!)))
),
Effect.catchAll((_error) =>
Effect.forEach(requests, Request.completeEffect(Effect.fail(new PutEventError()))),
),
)
).pipe(
RequestResolver.batchN(10),
RequestResolver.contextFromServices(EventBridge)
)
const PutEvent = (entry: PutEventsRequestEntry) => Effect.request(PutEventRequest({ entry }), PutEventResolver)
And here is example usage, with the commenting out of my previous chunking code:
const decodeServiceHandler = (event: unknown, _context: Context) => pipe(
Schema.decodeUnknown(EventBridgeEventWithUplink)(event),
Effect.andThen(uplink => decodeUplink(uplink.detail)),
Effect.andThen(Effect.forEach(({ type, ...payload }) => Effect.gen(function* () {
const config = yield* DecoderConfig
return Common.PutEventsRequestEntry.make({
Source: config.source,
DetailType: type ?? config.detailType,
Detail: JSON.stringify(payload),
EventBusName: config.eventBusArn,
})
}))),
Effect.andThen(entries => Effect.forEach(entries, PutEvent, { batching: true })),
// Effect.andThen(payloads => Chunk.fromIterable(payloads).pipe(Chunk.chunksOf(10))),
// Effect.andThen(Effect.forEach(entries => EventBridge.putEvents({ Entries: Chunk.toArray(entries) }), { concurrency: 10 })),
Effect.as(undefined),
)
Observations:
- Even when specifying
{ concurrency: 2 }on theforEachwithPutEvent, it looks like the requests are done in series, and I don't see an option in theRequestResolverormakeBatchedto do them in parallel (if desired, which I would imagine would be the default case). - It's not clear if batching allows for a batching window (can't see one), for example, to make a downstream request when the upstream producer is slow. I can imagine the case of an SQS Queue consumer (using client side polling in Lambda, not an event trigger) where you might get less than the batch size requests within a specified window, and want to pass the accumulated events downstream whilst continuing to wait. That might be beyond the scope, I don't have an immediate need for concurrency on top of batching, but seems like a useful optimisation.
Creating the Request/RequestResolver is quite a lot of boiler plate on top of the existing effect-aws code, and given that it might be hard to determine which services/commands could benefit from batching, it might be easier to create a utility, perhaps as a static on the service, that allows the developer to make the Request query and associated code on demand.
Something like:
const PutEvent = EventBridge.makeBatching(PutEventsCommand, {
max: 10,
window: Duration.milliseconds(100),
concurrency: 10
})
I also think that if possible, the query effect (PutEvent in this case) should enable batching by default, and so perhaps this encapsulates the forEach and becomes PutEvents (plural):
const PutEvents = (entries: ReadonlyArray<PutEventsRequestEntry>) => Effect.forEach(entries, PutEvent, { batching: true })
Maybe that's an option to the utility function, as to whether it presents a singular or plural interface.
As previously suggested, batching could be added as an option to the existing service methods, but I think I like the idea of keeping those as an 'effect'ual version of the SDK, and supporting batching on an adhoc basis.
I think something would need to be done about accumulating results, or maybe its sufficient to just return an array of the regular command output.
Thinking out loud here - I went back to see how Batching compared to the chunking alternative:
I define the features as:
- Respecting Service Limits - Automatic splitting of batch requests to respect service limits
- Concurrency Control - Allowing for concurrent batch requests to optimise throughput
- Distributed producers - Support for producers from different fibers (concurrent processing of upstream events)
- Optimal aggregation - Automatic aggregation of items into the maximum allowed items per batch request, whilst allowing for early flush in the event of a slow upstream
| Feature | Batching | Chunking |
|---|---|---|
| Respecting service limits | Yes | Yes |
| Concurrency control | No | Yes |
| Distributed producers | Yes | No |
| Optimal aggregation | Yes | No |
| Batching Window | No | No |
For completeness, here is an implementation of PutEvents that uses Chunks to meet the service limit whilst allowing concurrency:
const ChunkedPutEvents = ({ batchSize, concurrency } : { batchSize: number, concurrency: Concurrency }) => (entries: ReadonlyArray<PutEventsRequestEntry>) => {
const chunks = Chunk.chunksOf(batchSize)(Chunk.fromIterable(entries))
return Effect.forEach(chunks, chunk => EventBridge.putEvents({ Entries: Chunk.toArray(chunk)}), { concurrency })
}
I'm not familiar enough with Effect or the Batching API to fully understand whether its capable of providing the concurrency support. I don't see a way to configure concurrenct so I'm assuming there is none. I am also assuming that it will aggregrate separate request until a batch is full, but without the possibility of a batching window so flush downstream when the upstream is slow.
I've somewhat familiar with Mailbox which might provide the necessary support. It seems possible to have an API similar to makeBatched which creates the mailbox and a consumer (the 'request resolver'), and then pass the mailbox to a request wrapper so that it can insert items into the mailbox (the request effect).
The mailbox can be converted to a Stream with a Stream.groupedWithin to honor the batch size and window. I think it would then need fibers to be created for each batch and a way to collect the results, whilst respecting the concurrency configuration.
I worked up batching for DynamoDB Get, using BatchGetCommand under the hood. This is less straightforward as requests need to be grouped by table and then the responses matched up to the request. This is far from complete:
- Doesn't handle
UnprocessedItemswhich can occur when the request/response exceeds the service limits - Should ensure that all requests are completed in the batch, even if the AWS response doesn't contain a match (e.g. not found)
- Not clear on how to customise the batching size
- Not clear on how to make batch requests concurrent
- Not clear on how to get batching to flush a partial batch in the case of slow upstream requests
- Probably lots of edge cases
The main value here was that there is plenty of nuance in how indivdual AWS clients handle batching and that might make adding broad support for batching in effect-aws more effort.
import { KeysAndAttributes } from "@aws-sdk/client-dynamodb"
import { BatchGetCommandInput, NativeAttributeValue } from "@aws-sdk/lib-dynamodb"
import { DynamoDBDocument } from "@effect-aws/lib-dynamodb"
import { Data, Effect, Equal, Request, RequestResolver } from "effect"
type Item = Record<string, NativeAttributeValue>
type KeyAndAttributes = Omit<KeysAndAttributes, "Keys"> & {
Key: Item
}
export class GetItemError extends Data.TaggedError('GetItemError')<{
Key: Item
cause: Error
}> {
}
interface GetItemRequest extends Request.Request<Item, GetItemError>, KeyAndAttributes {
readonly _tag: 'GetItemRequest'
readonly TableArn: string
}
export const GetItemRequest = Request.tagged<GetItemRequest>('GetItemRequest')
const GetItemResolver = RequestResolver.makeBatched((requests: ReadonlyArray<GetItemRequest>) => Effect.gen(function* () {
const groupRequestsByTable = () => {
type TableArn = string
const requestsPerTableMap = new Map<TableArn, Set<GetItemRequest>>()
for (const request of requests) {
const setOfRequests = requestsPerTableMap.get(request.TableArn) ?? new Set()
setOfRequests.add(request)
requestsPerTableMap.set(request.TableArn, setOfRequests)
}
return requestsPerTableMap
}
const makeBatchGetItemCommand = (requestsPerTableMap: ReturnType<typeof groupRequestsByTable>) => {
const input: BatchGetCommandInput = { RequestItems: {} }
for (const [tableArn, tableRequests] of requestsPerTableMap) {
input.RequestItems ??= {}
input.RequestItems[tableArn] ??= { Keys: [] }
for (const request of tableRequests) {
input.RequestItems[tableArn].Keys!.push(request.Key)
}
}
return input
}
const requestsPerTableMap = groupRequestsByTable()
const input = makeBatchGetItemCommand(requestsPerTableMap)
const itemMatchesKey = (item: Item, key: Item) => {
for (const [k, v] of Object.entries(key)) {
if (!Equal.equals(item[k], v)) {
return false
}
}
return true
}
yield* DynamoDBDocument.batchGet(input).pipe(
Effect.andThen(response =>
Effect.forEach(Object.entries(response.Responses ?? {}), ([tableArn, items]) =>
Effect.forEach(items, item =>
Effect.filter(requests, request => Effect.succeed(request.TableArn === tableArn && itemMatchesKey(item, request.Key))).pipe(
Effect.andThen(Effect.forEach(request =>
Request.completeEffect(request, Effect.succeed(item)))),
),
),
),
),
Effect.catchAll(cause =>
Effect.forEach(requests, request =>
Request.completeEffect(request, new GetItemError({
cause,
Key: request.Key,
})),
),
),
)
})).pipe(
RequestResolver.batchN(100),
RequestResolver.contextFromServices(DynamoDBDocument),
)
export const GetItem = (request: GetItemRequest) => Effect.request(request, GetItemResolver)