flink icon indicating copy to clipboard operation
flink copied to clipboard

[FLINK-21966][python]Support Kinesis connector in Python DataStream API

Open xinbinhuang opened this issue 3 years ago • 7 comments

What is the purpose of the change

Add support for kinesis connector for the Python datastream API

Brief change log

  • Add FlinkKinesisProducer
    • [ ] Add common setters/getters
  • Add FlinkKinesisConsumer
    • [ ] Add common setters/getters

Verifying this change

  • [ ] Add tests coverage in test_connectors.py

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): ( no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): ( no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not documented)

xinbinhuang avatar Apr 06 '21 06:04 xinbinhuang

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 389307fe59892fbef32b99c70a78681d36fb5dd4 (Thu Sep 23 17:59:52 UTC 2021)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!
  • This pull request references an unassigned Jira ticket. According to the code contribution guide, tickets need to be assigned before starting with the implementation work.

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.

The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

flinkbot avatar Apr 06 '21 06:04 flinkbot

CI report:

  • 389307fe59892fbef32b99c70a78681d36fb5dd4 Azure: FAILURE
Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

flinkbot avatar Apr 06 '21 06:04 flinkbot

We have implemented something similar and started receiving classloader exceptions as below, however the job continues to work fine and stream data from Kinesis:

Job has been submitted with JobID 31975670b3660829abf7e69c3a13c2c6
Exception in thread "Thread-11" java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'.
        at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:164)
        at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.getResource(FlinkUserCodeClassLoaders.java:183)
        at java.lang.ClassLoader.getResource(ClassLoader.java:1084)
        at org.apache.hadoop.conf.Configuration.getResource(Configuration.java:2779)
        at org.apache.hadoop.conf.Configuration.getStreamReader(Configuration.java:3035)
        at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2994)
        at org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2967)
        at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2847)
        at org.apache.hadoop.conf.Configuration.get(Configuration.java:1199)
        at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1811)
        at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1788)
        at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
        at org.apache.hadoop.util.ShutdownHookManager.shutdownExecutor(ShutdownHookManager.java:145)
        at org.apache.hadoop.util.ShutdownHookManager.access$300(ShutdownHookManager.java:65)
        at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:102)

We also tried the implementation from this PR and got the same error as above.

Happy to contribute back here and help to debug if someone can point us in the right direction. We are using:

  • Flink 1.13.1
  • apache-flink==1.13.1 (for pyflink)
  • YARN on AWS EMR
  • flink-sql-connector-kinesis_2.11-1.13.1.jar
  • flink run -t yarn-per-job --detached -py /path/to/main.py --pyFiles /path/to/dependencies.zip -pyexec /usr/bin/python3.7 --parallelism 4 to submit the job

DavidFricker avatar Jul 06 '21 15:07 DavidFricker

I can help if needed as well!

kbohinski avatar Jul 08 '21 15:07 kbohinski

Thanks @xinbinhuang . This would be a great addition

aimran-adroll avatar Jan 26 '22 19:01 aimran-adroll

What's the status of this PR? I can help to review if it's updated.

dianfu avatar May 25 '22 03:05 dianfu

@xinbinhuang this would be very nice to have. Let me know if I can help.

SubNader avatar Sep 07 '22 07:09 SubNader

Closed as it should already been supported.

dianfu avatar Apr 24 '23 05:04 dianfu