localstack icon indicating copy to clipboard operation
localstack copied to clipboard

Issue with Lambda event source mapping creation using a Kinesis stream consumer ARN

Open mohllal opened this issue 2 years ago • 6 comments

Is there an existing issue for this?

  • [X] I have searched the existing issues

Current Behavior

While attempting to create a Lambda event source mapping with a Kinesis stream consumer ARN as the event source (not with the stream directly), I encountered errors despite the successful creation of the event source mapping.

2024-04-04 22:23:24 2024-04-04T20:23:24.516  INFO --- [   asgi_gw_1] localstack.request.aws     : AWS lambda.CreateEventSourceMapping => 202
2024-04-04 22:23:24 2024-04-04T20:23:24.534  INFO --- [functhread13] l.s.k.kinesis_mock_server  : [warn] kinesis.mock.cache.Cache 2024-04-04T20:23:24.533874Z x-amz-id-2=m5uvuv4lQl8JAEl+KXLF05bRc2q5gr/+2B0PngUqNHl0yWnDva897kT7tSompNF2bZGLfN10B19SFFmPGI0HQoXB8KY8lG2Q, contentType=application/x-amz-json-1.1, x-amzn-RequestId=e5584ca1-46be-40dd-b6ef-4eabcb9308ca, action=DescribeStream, contextId=7383bb33-4ece-438c-9a80-4e9d38aa705d, streamName=kinesis-consumer-esm-lambda:1712261124 Describing the stream was unuccessful
2024-04-04 22:23:24 2024-04-04T20:23:24.534  INFO --- [functhread13] l.s.k.kinesis_mock_server  : kinesis.mock.InvalidArgumentException: Stream Name 'kinesis-consumer-esm-lambda:1712261124' contains invalid characters
2024-04-04 22:23:24 2024-04-04T20:23:24.534  INFO --- [functhread13] l.s.k.kinesis_mock_server  : at vC(/var/lib/localstack/lib/kinesis-local/0.4.6/node_modules/kinesis-local/main.js:711:73)
2024-04-04 22:23:24 2024-04-04T20:23:24.534  INFO --- [functhread13] l.s.k.kinesis_mock_server  : at {anonymous}()(/var/lib/localstack/lib/kinesis-local/0.4.6/node_modules/kinesis-local/main.js:3268:152)
2024-04-04 22:23:24 2024-04-04T20:23:24.534  INFO --- [functhread13] l.s.k.kinesis_mock_server  : at w.h(/var/lib/localstack/lib/kinesis-local/0.4.6/node_modules/kinesis-local/main.js:2932:115)
2024-04-04 22:23:24 2024-04-04T20:23:24.534  INFO --- [functhread13] l.s.k.kinesis_mock_server  : at t7(/var/lib/localstack/lib/kinesis-local/0.4.6/node_modules/kinesis-local/main.js:4371:311)
2024-04-04 22:23:24 2024-04-04T20:23:24.534  INFO --- [functhread13] l.s.k.kinesis_mock_server  : at e.ml(/var/lib/localstack/lib/kinesis-local/0.4.6/node_modules/kinesis-local/main.js:4399:328)
2024-04-04 22:23:24 2024-04-04T20:23:24.534  INFO --- [functhread13] l.s.k.kinesis_mock_server  : at GN.ml(/var/lib/localstack/lib/kinesis-local/0.4.6/node_modules/kinesis-local/main.js:1366:197)
2024-04-04 22:23:24 2024-04-04T20:23:24.534  INFO --- [functhread13] l.s.k.kinesis_mock_server  : at {anonymous}()(/var/lib/localstack/lib/kinesis-local/0.4.6/node_modules/kinesis-local/main.js:3570:53)
2024-04-04 22:23:24 2024-04-04T20:23:24.534  INFO --- [functhread13] l.s.k.kinesis_mock_server  : at u.qa(/var/lib/localstack/lib/kinesis-local/0.4.6/node_modules/kinesis-local/main.js:2931:395)
2024-04-04 22:23:24 2024-04-04T20:23:24.534  INFO --- [functhread13] l.s.k.kinesis_mock_server  : at Immediate._onImmediate(/var/lib/localstack/lib/kinesis-local/0.4.6/node_modules/kinesis-local/main.js:2931:60)
2024-04-04 22:23:24 2024-04-04T20:23:24.535  INFO --- [functhread13] l.s.k.kinesis_mock_server  : at process.processImmediate(node:internal/timers:476:21)
2024-04-04 22:23:24 2024-04-04T20:23:24.540 ERROR --- [functhread29] l.s.l.e.stream_event_sourc : Cannot describe target stream arn:aws:kinesis:us-east-1:000000000000:stream/kinesis.inventory.products/consumer/kinesis-consumer-esm-lambda:1712261124 of event source mapping 36009136-4364-4e3d-92d6-f844789059fd: An error occurred (InvalidArgumentException) when calling the DescribeStream operation: Stream Name 'kinesis-consumer-esm-lambda:1712261124' contains invalid characters

Upon inspecting the code, it appears that the issue may stem from the following part: https://github.com/localstack/localstack/blob/a3b9adeb05e6bd3f0c4b5d8ab106d1f52cae1ee6/localstack/services/lambda_/event_source_listeners/stream_event_source_listener.py#L373-L381

It attempts to call _get_stream_description, which invokes a mocked DescribeStream with a stream ARN representing a stream consumer ARN instead of a stream ARN.

This mismatch fails because the stream consumer ARN contains a postfix of :timestamp at the end with the invalid : character.

Expected Behavior

If the target stream ARN for the event source mapping represents a Kinesis stream consumer ARN rather than a Kinesis stream ARN, the appropriate operation to execute is DescribeStreamConsumer, not DescribeStream.

This is because passing the ConsumerARN will suffice, given that the regex validation is as follows: ^(arn):aws.*:kinesis:.*:\d{12}:.*stream\/[a-zA-Z0-9_.-]+\/consumer\/[a-zA-Z0-9_.-]+:[0-9]+

Considering that there is no distinct parameter for the CreateEventSourceMapping operation to differentiate whether the provided EventSourceARN is a Kinesis stream ARN or a Kinesis stream consumer ARN, it can check both DescribeStream and DescribeStreamConsumer together.

If either operation returns a successful response, the target stream status can be inferred from there.

How are you starting LocalStack?

With a docker-compose file

Steps To Reproduce

How am I starting localstack (e.g., bin/localstack command, arguments, or docker-compose.yml)

docker run localstack/localstack:3.3

Client commands (e.g., AWS SDK code snippet, or sequence of "awslocal" commands)

  1. Create a Lambda function named for example test-lambda-function
  2. Create a Kinesis stream
awslocal kinesis \
  create-stream \
  --shard-count 1 \
  --stream-name test-kinesis-stream
  1. Register a Kinesis stream consumer
AWS_REGION=us-east-1
LOCALSTACK_KINESIS_STREAM_ARN_BASE=arn:aws:kinesis:$AWS_REGION:000000000000:stream

awslocal kinesis \
  register-stream-consumer \
  --consumer-name test-kinesis-stream-consumer \
  --stream-arn "${LOCALSTACK_KINESIS_STREAM_ARN_BASE}/test-kinesis-stream"
  1. Create a Lambda event source mapping
AWS_REGION=us-east-1
LOCALSTACK_KINESIS_STREAM_CONSUMER_ARN= ... # Get stream consumer ARN from step 3

awslocal lambda \
  create-event-source-mapping \
  --function-name test-lambda-function \
  --event-source-arn $LOCALSTACK_KINESIS_STREAM_CONSUMER_ARN \
  --starting-position TRIM_HORIZON
  1. Open Localstack container logs and notice the mentioned errors despite the successful creation of the event source mapping.

Environment

- OS: macOS Sonoma 14.4.1
- LocalStack: 3.3

Anything else?

mohllal avatar Apr 04 '24 21:04 mohllal

Welcome to LocalStack! Thanks for reporting your first issue and our team will be working towards fixing the issue for you or reach out for more background information. We recommend joining our Slack Community for real-time help and drop a message to LocalStack Pro Support if you are a Pro user! If you are willing to contribute towards fixing this issue, please have a look at our contributing guidelines and our contributing guide.

localstack-bot avatar Apr 04 '24 21:04 localstack-bot

I can see there is a function from localstack.services.kinesis.provider import find_stream_for_consumer, which returns stream_name. Maybe we can modify the function to return stream_arn as well.

def find_stream_for_consumer(consumer_arn):
    account_id = extract_account_id_from_arn(consumer_arn)
    region_name = extract_region_from_arn(consumer_arn)
    kinesis = connect_to(aws_access_key_id=account_id, region_name=region_name).kinesis
    for stream_name in kinesis.list_streams()["StreamNames"]:
        stream_arn = arns.kinesis_stream_arn(stream_name, account_id, region_name)
        for cons in kinesis.list_stream_consumers(StreamARN=stream_arn)["Consumers"]:
            if cons["ConsumerARN"] == consumer_arn:
[+]                return stream_name, stream_arn
    raise Exception("Unable to find stream for stream consumer %s" % consumer_arn)

Then use this function in to get stream_arn corresponsing to conusmer arn.

def _monitor_stream_event_sources(self, *args):
    # additional code not directly related to the issue at hand...
    try:
[+]        if len(stream_arn.split("/")[-1].split(":")) > 1:
[+]            consumer_arn = stream_arn
[+]             _, stream_arn = find_stream_for_consumer(consumer_arn)
        stream_description = self._get_stream_description(stream_client, stream_arn)
    except Exception as e:
        LOG.error(
            "Cannot describe target stream %s of event source mapping %s: %s",
            stream_arn,
            mapping_uuid,
            e,
        )
        continue
    # additional code not directly related to the issue at hand...

Let me know your thoughts @viren-nadkarni @mohllal Would be glad to pursue a PR for this solution if it makes sense.

AbdullahMakhdoom avatar Apr 06 '24 17:04 AbdullahMakhdoom

Thank you @AbdullahMakhdoom for your investigation!

Upon further examination of the code, I initially believed that your suggestion would solve the issue concerning describing the target stream using a stream consumer ARN, as you re-assign a valid stream_arn post retrieval from the stream consumer ARN.

However, upon closer inspection, I've discovered that the current implementation of the _monitor_stream_event_sources method still retrieves data from the stream shards using the standard interator method, rather than subscribing to shards using the stream consumer, as outlined in here.

https://github.com/localstack/localstack/blob/a3b9adeb05e6bd3f0c4b5d8ab106d1f52cae1ee6/localstack/services/lambda_/event_source_listeners/stream_event_source_listener.py#L373-L418

It seems I misunderstood the problem initially, thinking it was just about describing the event source mapping target stream bug using an ARN that belongs to a stream consumer not a stream itself. However, it's actually more complex than that, involving differences in consuming strategies and their implementation.

mohllal avatar Apr 07 '24 00:04 mohllal

I just noticed that the Kinesis mock server utilized by Localstack doesn't support SubscribeToShard operation.

LocalStack is proxying the functionality, as a workaround, using the GetRecords operation. However, this approach differs from AWS functionality, which relies on the HTTP2 server push mechanism between the shard and the consumer. https://github.com/localstack/localstack/blob/a3b9adeb05e6bd3f0c4b5d8ab106d1f52cae1ee6/localstack/services/kinesis/provider.py#L84-L143

Based on the above implementation, there's no behaviour difference between utilizing a dedicated fan-out stream consumer and a consumer as a Lambda event source mapping source on LocalStack.

This holds true even if we were able to modify the implementation of the StreamEventSourceListener class to accommodate the proxied subscribe_to_shard method.

mohllal avatar Apr 07 '24 01:04 mohllal

Thank you very much for sharing this issue with a lot of context and detailed discussion 💯 🙏

We are actively working on improving our Lambda Event Source Mapping implementation 📈, also based on your valuable feedback. LocalStack 3.7.0 shipped today with a new implementation available using LAMBDA_EVENT_SOURCE_MAPPING=v2: https://blog.localstack.cloud/2024-08-29-localstack-release-v-3-7-0/#new-lambda-event-source-mapping-implementation

Please notice that we have not yet added test coverage for this scenario, so stay tuned for updates 🚀

joe4dev avatar Aug 29 '24 20:08 joe4dev

What is the status on this issue? I get possibly the same error, I don't know if it's the same exact use case but it is saying the stream name contains invalid characters. I assume since this is still open, it's not entirely unexpected for me to be seeing it?

I'm not sure yet if this error actually affects us functionally, but at a minimum since it is logging very frequently, it makes it difficult to parse through for any other logs we want to find.

Edit: I'm using the new release from this week and now I haven't seen this again.

mason-e avatar Oct 01 '24 16:10 mason-e

Thank you for asking and sharing your feedback @mason-e

Edit: I'm using the new release from this week and now I haven't seen this again.

LocalStack 3.8.0 comes with our new default implementation for Lambda Event Source Mapping, including a complete re-write of the StreamPoller for Kinesis and DynamoDB.

@mohllal @AbdullahMakhdoom Does LocalStack 3.8.1 also resolve your issues?

joe4dev avatar Oct 24 '24 15:10 joe4dev

Hello 👋! It looks like this issue hasn’t been active in longer than two weeks. We encourage you to check if this is still an issue in the latest release. In the absence of more information, we will be closing this issue soon. If you find that this is still a problem, please feel free to provide a comment or upvote with a reaction on the initial post to prevent automatic closure. If the issue is already closed, please feel free to open a new one.

localstack-bot avatar Nov 07 '24 16:11 localstack-bot