amazon-kinesis-connector-flink icon indicating copy to clipboard operation
amazon-kinesis-connector-flink copied to clipboard

AWS Kinesis Stream read from different account fails after upgrade to flink-connector-kinesis 1.15.2

Open spachari-im opened this issue 1 year ago • 0 comments

I have a AWS Kinesis Analytics application Java project (Runtime Apache Flink 1.13, with flink-connector-kinesis 1.13.2) that consumes a Kinesis Stream from a different account. It was setup using this example doc - https://docs.aws.amazon.com/kinesisanalytics/latest/java/examples-cross.html

When this project was upgraded to run on AWS Kinesis Analytics application with Flink Runtime 1.15 (with flink-connector-kinesis 1.15.2), the cross account access code fails. Here is the code for the Kinesis Consumer.

        Properties inputProperties = new Properties();
        inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION,"use-east-1");
        inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
        inputProperties.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER,
                                                         AWSConfigConstants.CredentialProvider.ASSUME_ROLE.toString());
        inputProperties.setProperty(ConsumerConfigConstants.AWS_ROLE_ARN,
                                                "<valid AWS role in source account>");
        inputProperties.setProperty(ConsumerConfigConstants.AWS_ROLE_SESSION_NAME, "env1");

         DataStream<Stream> messages =env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));

This code fails with the following message:

org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil$FlinkKinesisStreamConsumerRegistrarException: Error registering stream: InsertionOrders-Production
	at org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.registerStreamConsumers(StreamConsumerRegistrarUtil.java:125)
	at org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.registerStreamConsumers(StreamConsumerRegistrarUtil.java:106)
	at org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.lazilyRegisterStreamConsumers(StreamConsumerRegistrarUtil.java:75)
	at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.<init>(KinesisDataFetcher.java:430)
	at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.<init>(KinesisDataFetcher.java:366)
	at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.createFetcher(FlinkKinesisConsumer.java:536)
	at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:308)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:332)
	Suppressed: java.lang.NullPointerException
		at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:421)
		at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
		at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:114)
		at org.apache.flink.streaming.api.operators.StreamSource.close(StreamSource.java:124)
		at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:163)
		at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:125)
		at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:997)
		at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:254)
		at org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72)
		at org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127)
		at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:916)
		at org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:935)
		at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
		at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:935)
		at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
		at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
		at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.KinesisException: User: arn:aws:sts::<SOURCE ACCOUNT>:assumed-role/kinesis-application-role/kiam-kiam is not authorized to perform: kinesis:DescribeStreamSummary on resource: arn:aws:kinesis:us-east-1:<SOURCE ACCOUNT>:stream/<Kinesis stream> because no identity-based policy allows the kinesis:DescribeStreamSummary action (Service: Kinesis, Status Code: 400, Request ID: d00c9528-959f-8c53-8cc3-29bf8d26b49e, Extended Request ID: gxGUGHNtCG3Zrg5XigZDt4eM8XrqXb2noSyU/zM/LkDTaLa8w4xZpjqkbmdusoHAPgI0zwTlYmmdyLHsqlsWrMr6g5PAwIi2)
	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
	at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2.lambda$describeStreamSummary$0(KinesisProxyV2.java:101)
	at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2.invokeWithRetryAndBackoff(KinesisProxyV2.java:191)
	at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2.describeStreamSummary(KinesisProxyV2.java:100)
	at org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.StreamConsumerRegistrar.registerStreamConsumer(StreamConsumerRegistrar.java:90)
	at org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.registerStreamConsumers(StreamConsumerRegistrarUtil.java:122)
	... 9 more
Caused by: org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.KinesisException: User: arn:aws:sts::<SOURCE ACCOUNT>:assumed-role/kinesis-application-role/kiam-kiam is not authorized to perform: kinesis:DescribeStreamSummary on resource: arn:aws:kinesis:us-east-1:<SOURCE ACCOUNT>:stream/<Kinesis stream> because no identity-based policy allows the kinesis:DescribeStreamSummary action (Service: Kinesis, Status Code: 400, Request ID: d00c9528-959f-8c53-8cc3-29bf8d26b49e, Extended Request ID: gxGUGHNtCG3Zrg5XigZDt4eM8XrqXb2noSyU/zM/LkDTaLa8w4xZpjqkbmdusoHAPgI0zwTlYmmdyLHsqlsWrMr6g5PAwIi2)
	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.KinesisException$BuilderImpl.build(KinesisException.java:95)
	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.KinesisException$BuilderImpl.build(KinesisException.java:55)
	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.protocols.json.internal.unmarshall.AwsJsonProtocolErrorUnmarshaller.unmarshall(AwsJsonProtocolErrorUnmarshaller.java:87)
	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.protocols.json.internal.unmarshall.AwsJsonProtocolErrorUnmarshaller.handle(AwsJsonProtocolErrorUnmarshaller.java:61)
	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.protocols.json.internal.unmarshall.AwsJsonProtocolErrorUnmarshaller.handle(AwsJsonProtocolErrorUnmarshaller.java:40)
	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.http.MetricCollectingHttpResponseHandler.lambda$handle$0(MetricCollectingHttpResponseHandler.java:52)
	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.util.MetricUtils.measureDurationUnsafe(MetricUtils.java:63)
	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.http.MetricCollectingHttpResponseHandler.handle(MetricCollectingHttpResponseHandler.java:52)
	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.async.AsyncResponseHandler.lambda$prepare$0(AsyncResponseHandler.java:89)
	at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.async.AsyncResponseHandler$BaosSubscriber.onComplete(AsyncResponseHandler.java:132)
	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$DataCountingPublisher$1.onComplete(ResponseHandler.java:513)
	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.runAndLogError(ResponseHandler.java:250)
	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.access$600(ResponseHandler.java:75)
	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter$1.onComplete(ResponseHandler.java:371)
	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.nrs.HandlerPublisher.complete(HandlerPublisher.java:447)
	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.nrs.HandlerPublisher.handlerRemoved(HandlerPublisher.java:435)
	at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.callHandlerRemoved(AbstractChannelHandlerContext.java:946)
	at org.apache.flink.kinesis.shaded.io.netty.channel.DefaultChannelPipeline.callHandlerRemoved0(DefaultChannelPipeline.java:637)
	at org.apache.flink.kinesis.shaded.io.netty.channel.DefaultChannelPipeline.remove(DefaultChannelPipeline.java:477)
	at org.apache.flink.kinesis.shaded.io.netty.channel.DefaultChannelPipeline.remove(DefaultChannelPipeline.java:423)
	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.nrs.HttpStreamsHandler.removeHandlerIfActive(HttpStreamsHandler.java:361)
	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.nrs.HttpStreamsHandler.handleReadHttpContent(HttpStreamsHandler.java:223)
	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.nrs.HttpStreamsHandler.channelRead(HttpStreamsHandler.java:199)
	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.nrs.HttpStreamsClientHandler.channelRead(HttpStreamsClientHandler.java:173)
	at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.LastHttpContentHandler.channelRead(LastHttpContentHandler.java:43)
	at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.Http2ToHttpInboundAdapter.onDataRead(Http2ToHttpInboundAdapter.java:87)
	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.Http2ToHttpInboundAdapter.channelRead0(Http2ToHttpInboundAdapter.java:49)
	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.Http2ToHttpInboundAdapter.channelRead0(Http2ToHttpInboundAdapter.java:42)
	at org.apache.flink.kinesis.shaded.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
	at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at org.apache.flink.kinesis.shaded.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
	at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at org.apache.flink.kinesis.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at org.apache.flink.kinesis.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at org.apache.flink.kinesis.shaded.io.netty.handler.codec.http2.AbstractHttp2StreamChannel$Http2ChannelUnsafe.doRead0(AbstractHttp2StreamChannel.java:901)
	at org.apache.flink.kinesis.shaded.io.netty.handler.codec.http2.AbstractHttp2StreamChannel.fireChildRead(AbstractHttp2StreamChannel.java:555)
	at org.apache.flink.kinesis.shaded.io.netty.handler.codec.http2.Http2MultiplexHandler.channelRead(Http2MultiplexHandler.java:180)
	at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at org.apache.flink.kinesis.shaded.io.netty.handler.codec.http2.Http2FrameCodec.onHttp2Frame(Http2FrameCodec.java:707)
	at org.apache.flink.kinesis.shaded.io.netty.handler.codec.http2.Http2FrameCodec$FrameListener.onDataRead(Http2FrameCodec.java:646)
	at org.apache.flink.kinesis.shaded.io.netty.handler.codec.http2.Http2FrameListenerDecorator.onDataRead(Http2FrameListenerDecorator.java:36)
	at org.apache.flink.kinesis.shaded.io.netty.handler.codec.http2.Http2EmptyDataFrameListener.onDataRead(Http2EmptyDataFrameListener.java:49)
	at org.apache.flink.kinesis.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onDataRead(DefaultHttp2ConnectionDecoder.java:307)
	at org.apache.flink.kinesis.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger$1.onDataRead(Http2InboundFrameLogger.java:48)
	at org.apache.flink.kinesis.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readDataFrame(DefaultHttp2FrameReader.java:415)
	at org.apache.flink.kinesis.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:250)
	at org.apache.flink.kinesis.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:159)
	at org.apache.flink.kinesis.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41)
	at org.apache.flink.kinesis.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:173)
	at org.apache.flink.kinesis.shaded.io.netty.handler.codec.http2.DecoratingHttp2ConnectionDecoder.decodeFrame(DecoratingHttp2ConnectionDecoder.java:63)
	at org.apache.flink.kinesis.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:378)
	at org.apache.flink.kinesis.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438)
	at org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
	at org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
	at org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
	at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at org.apache.flink.kinesis.shaded.io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1372)
	at org.apache.flink.kinesis.shaded.io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1235)
	at org.apache.flink.kinesis.shaded.io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1284)
	at org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
	at org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
	at org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
	at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at org.apache.flink.kinesis.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at org.apache.flink.kinesis.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at org.apache.flink.kinesis.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
	at org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
	at org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
	at org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
	at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
	at org.apache.flink.kinesis.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at java.base/java.lang.Thread.run(Thread.java:829)

spachari-im avatar May 06 '23 02:05 spachari-im