amazon-kinesis-connector-flink icon indicating copy to clipboard operation
amazon-kinesis-connector-flink copied to clipboard

AmazonKinesisException when authenticating Kinesis through AWS SSO (profile)

Open mberchon opened this issue 3 years ago • 0 comments

Hello,

We are experiencing issues while using flink with AWS SSO authent to consume a kinesis stream (we don't have problem when using directly KCL, KPL or KDS SDK native library through SSO and we don't have problem too if flink is configured with basic authentication)

Some details about our context:

  • Flink 1.13.3,
  • AWS SDK 2.17.71,
  • Java 11,
  • MacOS,
  • SSO is used by developers in order to run integration tests from their own workstation (basic credentials isn't compliant with our organization policy).

Authenticating using basic credentials (setting AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY) works properly. Authenticating with SSO raize the below exception.

Caused by: software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.model.AmazonKinesisException: The security token included in the request is invalid (Service: AmazonKinesis; Status Code: 400; Error Code: UnrecognizedClientException; Request ID: e2873f40-b9d8-7649-b885-c3d801979544; Proxy: null)
	at software.amazon.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
	at software.amazon.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
	at software.amazon.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)
	at software.amazon.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
	at software.amazon.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
	at software.amazon.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
	at software.amazon.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
	at software.amazon.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
	at software.amazon.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
	at software.amazon.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
	at software.amazon.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
	at software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2893)
	at software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2860)
	at software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2849)
	at software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.executeListShards(AmazonKinesisClient.java:1590)
	at software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.listShards(AmazonKinesisClient.java:1559)
	at software.amazon.kinesis.connectors.flink.proxy.KinesisProxy.listShards(KinesisProxy.java:430)
	at software.amazon.kinesis.connectors.flink.proxy.KinesisProxy.getShardsOfStream(KinesisProxy.java:380)
	at software.amazon.kinesis.connectors.flink.proxy.KinesisProxy.getShardList(KinesisProxy.java:273)
	at software.amazon.kinesis.connectors.flink.internals.KinesisDataFetcher.discoverNewShardsToSubscribe(KinesisDataFetcher.java:855)
	at software.amazon.kinesis.connectors.flink.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:292)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)

Here is a git repo for reproduction

Following code is used when setting up SSO:

@Test
    public void test() throws Exception {

        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        see.setParallelism(1);

        Properties kinesisConsumerConfig = new Properties();
        kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
        kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, AWSConfigConstants.CredentialProvider.AUTO.toString());


        DataStream<String> kinesis = see.addSource(new FlinkKinesisConsumer<>(
                "flink-test",
                new SimpleStringSchema(),
                kinesisConsumerConfig));

        kinesis.print();

        see.execute();
    }

Attached following debug logs (credentials purged):

mberchon avatar Nov 08 '21 11:11 mberchon