Issue with Lambda event source mapping creation using a Kinesis stream consumer ARN
Is there an existing issue for this?
- [X] I have searched the existing issues
Current Behavior
While attempting to create a Lambda event source mapping with a Kinesis stream consumer ARN as the event source (not with the stream directly), I encountered errors despite the successful creation of the event source mapping.
2024-04-04 22:23:24 2024-04-04T20:23:24.516 INFO --- [ asgi_gw_1] localstack.request.aws : AWS lambda.CreateEventSourceMapping => 202
2024-04-04 22:23:24 2024-04-04T20:23:24.534 INFO --- [functhread13] l.s.k.kinesis_mock_server : [warn] kinesis.mock.cache.Cache 2024-04-04T20:23:24.533874Z x-amz-id-2=m5uvuv4lQl8JAEl+KXLF05bRc2q5gr/+2B0PngUqNHl0yWnDva897kT7tSompNF2bZGLfN10B19SFFmPGI0HQoXB8KY8lG2Q, contentType=application/x-amz-json-1.1, x-amzn-RequestId=e5584ca1-46be-40dd-b6ef-4eabcb9308ca, action=DescribeStream, contextId=7383bb33-4ece-438c-9a80-4e9d38aa705d, streamName=kinesis-consumer-esm-lambda:1712261124 Describing the stream was unuccessful
2024-04-04 22:23:24 2024-04-04T20:23:24.534 INFO --- [functhread13] l.s.k.kinesis_mock_server : kinesis.mock.InvalidArgumentException: Stream Name 'kinesis-consumer-esm-lambda:1712261124' contains invalid characters
2024-04-04 22:23:24 2024-04-04T20:23:24.534 INFO --- [functhread13] l.s.k.kinesis_mock_server : at vC(/var/lib/localstack/lib/kinesis-local/0.4.6/node_modules/kinesis-local/main.js:711:73)
2024-04-04 22:23:24 2024-04-04T20:23:24.534 INFO --- [functhread13] l.s.k.kinesis_mock_server : at {anonymous}()(/var/lib/localstack/lib/kinesis-local/0.4.6/node_modules/kinesis-local/main.js:3268:152)
2024-04-04 22:23:24 2024-04-04T20:23:24.534 INFO --- [functhread13] l.s.k.kinesis_mock_server : at w.h(/var/lib/localstack/lib/kinesis-local/0.4.6/node_modules/kinesis-local/main.js:2932:115)
2024-04-04 22:23:24 2024-04-04T20:23:24.534 INFO --- [functhread13] l.s.k.kinesis_mock_server : at t7(/var/lib/localstack/lib/kinesis-local/0.4.6/node_modules/kinesis-local/main.js:4371:311)
2024-04-04 22:23:24 2024-04-04T20:23:24.534 INFO --- [functhread13] l.s.k.kinesis_mock_server : at e.ml(/var/lib/localstack/lib/kinesis-local/0.4.6/node_modules/kinesis-local/main.js:4399:328)
2024-04-04 22:23:24 2024-04-04T20:23:24.534 INFO --- [functhread13] l.s.k.kinesis_mock_server : at GN.ml(/var/lib/localstack/lib/kinesis-local/0.4.6/node_modules/kinesis-local/main.js:1366:197)
2024-04-04 22:23:24 2024-04-04T20:23:24.534 INFO --- [functhread13] l.s.k.kinesis_mock_server : at {anonymous}()(/var/lib/localstack/lib/kinesis-local/0.4.6/node_modules/kinesis-local/main.js:3570:53)
2024-04-04 22:23:24 2024-04-04T20:23:24.534 INFO --- [functhread13] l.s.k.kinesis_mock_server : at u.qa(/var/lib/localstack/lib/kinesis-local/0.4.6/node_modules/kinesis-local/main.js:2931:395)
2024-04-04 22:23:24 2024-04-04T20:23:24.534 INFO --- [functhread13] l.s.k.kinesis_mock_server : at Immediate._onImmediate(/var/lib/localstack/lib/kinesis-local/0.4.6/node_modules/kinesis-local/main.js:2931:60)
2024-04-04 22:23:24 2024-04-04T20:23:24.535 INFO --- [functhread13] l.s.k.kinesis_mock_server : at process.processImmediate(node:internal/timers:476:21)
2024-04-04 22:23:24 2024-04-04T20:23:24.540 ERROR --- [functhread29] l.s.l.e.stream_event_sourc : Cannot describe target stream arn:aws:kinesis:us-east-1:000000000000:stream/kinesis.inventory.products/consumer/kinesis-consumer-esm-lambda:1712261124 of event source mapping 36009136-4364-4e3d-92d6-f844789059fd: An error occurred (InvalidArgumentException) when calling the DescribeStream operation: Stream Name 'kinesis-consumer-esm-lambda:1712261124' contains invalid characters
Upon inspecting the code, it appears that the issue may stem from the following part: https://github.com/localstack/localstack/blob/a3b9adeb05e6bd3f0c4b5d8ab106d1f52cae1ee6/localstack/services/lambda_/event_source_listeners/stream_event_source_listener.py#L373-L381
It attempts to call _get_stream_description, which invokes a mocked DescribeStream with a stream ARN representing a stream consumer ARN instead of a stream ARN.
This mismatch fails because the stream consumer ARN contains a postfix of :timestamp at the end with the invalid : character.
Expected Behavior
If the target stream ARN for the event source mapping represents a Kinesis stream consumer ARN rather than a Kinesis stream ARN, the appropriate operation to execute is DescribeStreamConsumer, not DescribeStream.
This is because passing the ConsumerARN will suffice, given that the regex validation is as follows: ^(arn):aws.*:kinesis:.*:\d{12}:.*stream\/[a-zA-Z0-9_.-]+\/consumer\/[a-zA-Z0-9_.-]+:[0-9]+
Considering that there is no distinct parameter for the CreateEventSourceMapping operation to differentiate whether the provided EventSourceARN is a Kinesis stream ARN or a Kinesis stream consumer ARN, it can check both DescribeStream and DescribeStreamConsumer together.
If either operation returns a successful response, the target stream status can be inferred from there.
How are you starting LocalStack?
With a docker-compose file
Steps To Reproduce
How am I starting localstack (e.g., bin/localstack command, arguments, or docker-compose.yml)
docker run localstack/localstack:3.3
Client commands (e.g., AWS SDK code snippet, or sequence of "awslocal" commands)
- Create a Lambda function named for example
test-lambda-function - Create a Kinesis stream
awslocal kinesis \
create-stream \
--shard-count 1 \
--stream-name test-kinesis-stream
- Register a Kinesis stream consumer
AWS_REGION=us-east-1
LOCALSTACK_KINESIS_STREAM_ARN_BASE=arn:aws:kinesis:$AWS_REGION:000000000000:stream
awslocal kinesis \
register-stream-consumer \
--consumer-name test-kinesis-stream-consumer \
--stream-arn "${LOCALSTACK_KINESIS_STREAM_ARN_BASE}/test-kinesis-stream"
- Create a Lambda event source mapping
AWS_REGION=us-east-1
LOCALSTACK_KINESIS_STREAM_CONSUMER_ARN= ... # Get stream consumer ARN from step 3
awslocal lambda \
create-event-source-mapping \
--function-name test-lambda-function \
--event-source-arn $LOCALSTACK_KINESIS_STREAM_CONSUMER_ARN \
--starting-position TRIM_HORIZON
- Open Localstack container logs and notice the mentioned errors despite the successful creation of the event source mapping.
Environment
- OS: macOS Sonoma 14.4.1
- LocalStack: 3.3
Anything else?
- Using AWS Lambda with Amazon Kinesis Stream Consumer
- describe-stream-consumer CLI command
- describe-stream CLI command
Welcome to LocalStack! Thanks for reporting your first issue and our team will be working towards fixing the issue for you or reach out for more background information. We recommend joining our Slack Community for real-time help and drop a message to LocalStack Pro Support if you are a Pro user! If you are willing to contribute towards fixing this issue, please have a look at our contributing guidelines and our contributing guide.
I can see there is a function from localstack.services.kinesis.provider import find_stream_for_consumer, which returns stream_name. Maybe we can modify the function to return stream_arn as well.
def find_stream_for_consumer(consumer_arn):
account_id = extract_account_id_from_arn(consumer_arn)
region_name = extract_region_from_arn(consumer_arn)
kinesis = connect_to(aws_access_key_id=account_id, region_name=region_name).kinesis
for stream_name in kinesis.list_streams()["StreamNames"]:
stream_arn = arns.kinesis_stream_arn(stream_name, account_id, region_name)
for cons in kinesis.list_stream_consumers(StreamARN=stream_arn)["Consumers"]:
if cons["ConsumerARN"] == consumer_arn:
[+] return stream_name, stream_arn
raise Exception("Unable to find stream for stream consumer %s" % consumer_arn)
Then use this function in to get stream_arn corresponsing to conusmer arn.
def _monitor_stream_event_sources(self, *args):
# additional code not directly related to the issue at hand...
try:
[+] if len(stream_arn.split("/")[-1].split(":")) > 1:
[+] consumer_arn = stream_arn
[+] _, stream_arn = find_stream_for_consumer(consumer_arn)
stream_description = self._get_stream_description(stream_client, stream_arn)
except Exception as e:
LOG.error(
"Cannot describe target stream %s of event source mapping %s: %s",
stream_arn,
mapping_uuid,
e,
)
continue
# additional code not directly related to the issue at hand...
Let me know your thoughts @viren-nadkarni @mohllal Would be glad to pursue a PR for this solution if it makes sense.
Thank you @AbdullahMakhdoom for your investigation!
Upon further examination of the code, I initially believed that your suggestion would solve the issue concerning describing the target stream using a stream consumer ARN, as you re-assign a valid stream_arn post retrieval from the stream consumer ARN.
However, upon closer inspection, I've discovered that the current implementation of the _monitor_stream_event_sources method still retrieves data from the stream shards using the standard interator method, rather than subscribing to shards using the stream consumer, as outlined in here.
https://github.com/localstack/localstack/blob/a3b9adeb05e6bd3f0c4b5d8ab106d1f52cae1ee6/localstack/services/lambda_/event_source_listeners/stream_event_source_listener.py#L373-L418
It seems I misunderstood the problem initially, thinking it was just about describing the event source mapping target stream bug using an ARN that belongs to a stream consumer not a stream itself. However, it's actually more complex than that, involving differences in consuming strategies and their implementation.
I just noticed that the Kinesis mock server utilized by Localstack doesn't support SubscribeToShard operation.
LocalStack is proxying the functionality, as a workaround, using the GetRecords operation. However, this approach differs from AWS functionality, which relies on the HTTP2 server push mechanism between the shard and the consumer.
https://github.com/localstack/localstack/blob/a3b9adeb05e6bd3f0c4b5d8ab106d1f52cae1ee6/localstack/services/kinesis/provider.py#L84-L143
Based on the above implementation, there's no behaviour difference between utilizing a dedicated fan-out stream consumer and a consumer as a Lambda event source mapping source on LocalStack.
This holds true even if we were able to modify the implementation of the StreamEventSourceListener class to accommodate the proxied subscribe_to_shard method.
Thank you very much for sharing this issue with a lot of context and detailed discussion 💯 🙏
We are actively working on improving our Lambda Event Source Mapping implementation 📈, also based on your valuable feedback.
LocalStack 3.7.0 shipped today with a new implementation available using LAMBDA_EVENT_SOURCE_MAPPING=v2: https://blog.localstack.cloud/2024-08-29-localstack-release-v-3-7-0/#new-lambda-event-source-mapping-implementation
Please notice that we have not yet added test coverage for this scenario, so stay tuned for updates 🚀
What is the status on this issue? I get possibly the same error, I don't know if it's the same exact use case but it is saying the stream name contains invalid characters. I assume since this is still open, it's not entirely unexpected for me to be seeing it?
I'm not sure yet if this error actually affects us functionally, but at a minimum since it is logging very frequently, it makes it difficult to parse through for any other logs we want to find.
Edit: I'm using the new release from this week and now I haven't seen this again.
Thank you for asking and sharing your feedback @mason-e
Edit: I'm using the new release from this week and now I haven't seen this again.
LocalStack 3.8.0 comes with our new default implementation for Lambda Event Source Mapping, including a complete re-write of the StreamPoller for Kinesis and DynamoDB.
@mohllal @AbdullahMakhdoom Does LocalStack 3.8.1 also resolve your issues?
Hello 👋! It looks like this issue hasn’t been active in longer than two weeks. We encourage you to check if this is still an issue in the latest release. In the absence of more information, we will be closing this issue soon. If you find that this is still a problem, please feel free to provide a comment or upvote with a reaction on the initial post to prevent automatic closure. If the issue is already closed, please feel free to open a new one.