amazon-kinesis-connector-flink
amazon-kinesis-connector-flink copied to clipboard
AWS Kinesis Stream read from different account fails after upgrade to flink-connector-kinesis 1.15.2
I have a AWS Kinesis Analytics application Java project (Runtime Apache Flink 1.13, with flink-connector-kinesis 1.13.2) that consumes a Kinesis Stream from a different account. It was setup using this example doc - https://docs.aws.amazon.com/kinesisanalytics/latest/java/examples-cross.html
When this project was upgraded to run on AWS Kinesis Analytics application with Flink Runtime 1.15 (with flink-connector-kinesis 1.15.2), the cross account access code fails. Here is the code for the Kinesis Consumer.
Properties inputProperties = new Properties();
inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION,"use-east-1");
inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
inputProperties.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER,
AWSConfigConstants.CredentialProvider.ASSUME_ROLE.toString());
inputProperties.setProperty(ConsumerConfigConstants.AWS_ROLE_ARN,
"<valid AWS role in source account>");
inputProperties.setProperty(ConsumerConfigConstants.AWS_ROLE_SESSION_NAME, "env1");
DataStream<Stream> messages =env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
This code fails with the following message:
org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil$FlinkKinesisStreamConsumerRegistrarException: Error registering stream: InsertionOrders-Production
at org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.registerStreamConsumers(StreamConsumerRegistrarUtil.java:125)
at org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.registerStreamConsumers(StreamConsumerRegistrarUtil.java:106)
at org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.lazilyRegisterStreamConsumers(StreamConsumerRegistrarUtil.java:75)
at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.<init>(KinesisDataFetcher.java:430)
at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.<init>(KinesisDataFetcher.java:366)
at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.createFetcher(FlinkKinesisConsumer.java:536)
at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:308)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:332)
Suppressed: java.lang.NullPointerException
at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:421)
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:114)
at org.apache.flink.streaming.api.operators.StreamSource.close(StreamSource.java:124)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:163)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:125)
at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:997)
at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:254)
at org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72)
at org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127)
at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:916)
at org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.KinesisException: User: arn:aws:sts::<SOURCE ACCOUNT>:assumed-role/kinesis-application-role/kiam-kiam is not authorized to perform: kinesis:DescribeStreamSummary on resource: arn:aws:kinesis:us-east-1:<SOURCE ACCOUNT>:stream/<Kinesis stream> because no identity-based policy allows the kinesis:DescribeStreamSummary action (Service: Kinesis, Status Code: 400, Request ID: d00c9528-959f-8c53-8cc3-29bf8d26b49e, Extended Request ID: gxGUGHNtCG3Zrg5XigZDt4eM8XrqXb2noSyU/zM/LkDTaLa8w4xZpjqkbmdusoHAPgI0zwTlYmmdyLHsqlsWrMr6g5PAwIi2)
at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2.lambda$describeStreamSummary$0(KinesisProxyV2.java:101)
at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2.invokeWithRetryAndBackoff(KinesisProxyV2.java:191)
at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2.describeStreamSummary(KinesisProxyV2.java:100)
at org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.StreamConsumerRegistrar.registerStreamConsumer(StreamConsumerRegistrar.java:90)
at org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.registerStreamConsumers(StreamConsumerRegistrarUtil.java:122)
... 9 more
Caused by: org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.KinesisException: User: arn:aws:sts::<SOURCE ACCOUNT>:assumed-role/kinesis-application-role/kiam-kiam is not authorized to perform: kinesis:DescribeStreamSummary on resource: arn:aws:kinesis:us-east-1:<SOURCE ACCOUNT>:stream/<Kinesis stream> because no identity-based policy allows the kinesis:DescribeStreamSummary action (Service: Kinesis, Status Code: 400, Request ID: d00c9528-959f-8c53-8cc3-29bf8d26b49e, Extended Request ID: gxGUGHNtCG3Zrg5XigZDt4eM8XrqXb2noSyU/zM/LkDTaLa8w4xZpjqkbmdusoHAPgI0zwTlYmmdyLHsqlsWrMr6g5PAwIi2)
at org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.KinesisException$BuilderImpl.build(KinesisException.java:95)
at org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.KinesisException$BuilderImpl.build(KinesisException.java:55)
at org.apache.flink.kinesis.shaded.software.amazon.awssdk.protocols.json.internal.unmarshall.AwsJsonProtocolErrorUnmarshaller.unmarshall(AwsJsonProtocolErrorUnmarshaller.java:87)
at org.apache.flink.kinesis.shaded.software.amazon.awssdk.protocols.json.internal.unmarshall.AwsJsonProtocolErrorUnmarshaller.handle(AwsJsonProtocolErrorUnmarshaller.java:61)
at org.apache.flink.kinesis.shaded.software.amazon.awssdk.protocols.json.internal.unmarshall.AwsJsonProtocolErrorUnmarshaller.handle(AwsJsonProtocolErrorUnmarshaller.java:40)
at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.http.MetricCollectingHttpResponseHandler.lambda$handle$0(MetricCollectingHttpResponseHandler.java:52)
at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.util.MetricUtils.measureDurationUnsafe(MetricUtils.java:63)
at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.http.MetricCollectingHttpResponseHandler.handle(MetricCollectingHttpResponseHandler.java:52)
at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.async.AsyncResponseHandler.lambda$prepare$0(AsyncResponseHandler.java:89)
at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.async.AsyncResponseHandler$BaosSubscriber.onComplete(AsyncResponseHandler.java:132)
at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$DataCountingPublisher$1.onComplete(ResponseHandler.java:513)
at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.runAndLogError(ResponseHandler.java:250)
at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.access$600(ResponseHandler.java:75)
at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter$1.onComplete(ResponseHandler.java:371)
at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.nrs.HandlerPublisher.complete(HandlerPublisher.java:447)
at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.nrs.HandlerPublisher.handlerRemoved(HandlerPublisher.java:435)
at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.callHandlerRemoved(AbstractChannelHandlerContext.java:946)
at org.apache.flink.kinesis.shaded.io.netty.channel.DefaultChannelPipeline.callHandlerRemoved0(DefaultChannelPipeline.java:637)
at org.apache.flink.kinesis.shaded.io.netty.channel.DefaultChannelPipeline.remove(DefaultChannelPipeline.java:477)
at org.apache.flink.kinesis.shaded.io.netty.channel.DefaultChannelPipeline.remove(DefaultChannelPipeline.java:423)
at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.nrs.HttpStreamsHandler.removeHandlerIfActive(HttpStreamsHandler.java:361)
at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.nrs.HttpStreamsHandler.handleReadHttpContent(HttpStreamsHandler.java:223)
at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.nrs.HttpStreamsHandler.channelRead(HttpStreamsHandler.java:199)
at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.nrs.HttpStreamsClientHandler.channelRead(HttpStreamsClientHandler.java:173)
at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.LastHttpContentHandler.channelRead(LastHttpContentHandler.java:43)
at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.Http2ToHttpInboundAdapter.onDataRead(Http2ToHttpInboundAdapter.java:87)
at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.Http2ToHttpInboundAdapter.channelRead0(Http2ToHttpInboundAdapter.java:49)
at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.Http2ToHttpInboundAdapter.channelRead0(Http2ToHttpInboundAdapter.java:42)
at org.apache.flink.kinesis.shaded.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at org.apache.flink.kinesis.shaded.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at org.apache.flink.kinesis.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at org.apache.flink.kinesis.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at org.apache.flink.kinesis.shaded.io.netty.handler.codec.http2.AbstractHttp2StreamChannel$Http2ChannelUnsafe.doRead0(AbstractHttp2StreamChannel.java:901)
at org.apache.flink.kinesis.shaded.io.netty.handler.codec.http2.AbstractHttp2StreamChannel.fireChildRead(AbstractHttp2StreamChannel.java:555)
at org.apache.flink.kinesis.shaded.io.netty.handler.codec.http2.Http2MultiplexHandler.channelRead(Http2MultiplexHandler.java:180)
at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at org.apache.flink.kinesis.shaded.io.netty.handler.codec.http2.Http2FrameCodec.onHttp2Frame(Http2FrameCodec.java:707)
at org.apache.flink.kinesis.shaded.io.netty.handler.codec.http2.Http2FrameCodec$FrameListener.onDataRead(Http2FrameCodec.java:646)
at org.apache.flink.kinesis.shaded.io.netty.handler.codec.http2.Http2FrameListenerDecorator.onDataRead(Http2FrameListenerDecorator.java:36)
at org.apache.flink.kinesis.shaded.io.netty.handler.codec.http2.Http2EmptyDataFrameListener.onDataRead(Http2EmptyDataFrameListener.java:49)
at org.apache.flink.kinesis.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onDataRead(DefaultHttp2ConnectionDecoder.java:307)
at org.apache.flink.kinesis.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger$1.onDataRead(Http2InboundFrameLogger.java:48)
at org.apache.flink.kinesis.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readDataFrame(DefaultHttp2FrameReader.java:415)
at org.apache.flink.kinesis.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:250)
at org.apache.flink.kinesis.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:159)
at org.apache.flink.kinesis.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41)
at org.apache.flink.kinesis.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:173)
at org.apache.flink.kinesis.shaded.io.netty.handler.codec.http2.DecoratingHttp2ConnectionDecoder.decodeFrame(DecoratingHttp2ConnectionDecoder.java:63)
at org.apache.flink.kinesis.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:378)
at org.apache.flink.kinesis.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438)
at org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
at org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
at org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at org.apache.flink.kinesis.shaded.io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1372)
at org.apache.flink.kinesis.shaded.io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1235)
at org.apache.flink.kinesis.shaded.io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1284)
at org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
at org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
at org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at org.apache.flink.kinesis.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at org.apache.flink.kinesis.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at org.apache.flink.kinesis.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
at org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
at org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
at org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at org.apache.flink.kinesis.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.base/java.lang.Thread.run(Thread.java:829)