[FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)
What is the purpose of the change
Implements FLIP-238
Brief change log
Adds a new FLIP-27-based source for data generation.
Verifying this change
Added integration and unit tests for verifying the data generation process.
Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (yes / no)
- The public API, i.e., is any changed class annotated with
@Public(Evolving): (yes / no) :- Change from private to package private access for
NumberSequenceSource.CheckpointSerializer - Adds
SourceReaderContext.currentParallelism()for calculating required subtasks data generation rates
- Change from private to package private access for
- The serializers: (yes / no / don't know)
- The runtime per-record code paths (performance sensitive): (yes / no / don't know)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
- The S3 file system connector: (yes / no / don't know)
Documentation
- Does this pull request introduce a new feature? (yes / no)
- If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
CI report:
- 3169231ceb90e2242e88e7a276190beda45f7e67 UNKNOWN
- a39f4934524b3dfacc4be299b4a3ff4d3396b549 Azure: SUCCESS
Bot commands
The @flinkbot bot supports the following commands:@flinkbot run azurere-run the last Azure build
@zentol I addressed your comments, could you please take a look?
@zentol the latest three commits address the main open discussion threads:
- DataGeneratorSource is moved into a separate connector module
- RateLimiterStrategy is added
- SourceReaderFactory-based constructor is sealed PTAL. Thanks.
@zentol, thanks for the quick feedback. I addressed your comments.
Tried it out in some internal projects and it worked like charm. :heart:
Perfect, glad to hear! :clap:
Should we deprecate the existing DataGeneratorSource in flink-streaming-java?
Good question. It seems to be mainly used for Row-based data generation, so some work might be needed if we also want to provide utilities for our new generator for this use case out of the box. It also claims to be "stateful" DataGeneratorSource.java#L35, however, the actual generators seem to all default to no-op DataGenerator.java#L45, so it should be no issue to use the generator function instead. The next goal is to deprecate SourceFunction and the old DataGeneratorSource would fall into the "to transitively deprecate" bucket anyhow. We can either do it here or in the next step, I have no preference.
What about documentation?
:+1: I'll work on that.
I guess the FLIP also needs an update according to some interface changes that took place.
I would deprecated it now so we have some incentive to migrate existing usages (in particular internal!) to migrate to the new source.
the actual generators seem to all default to no-op DataGenerator.java#L45, so it should be no issue to use the generator function instead
Looks like the DataGenTableSource may use the stateful SequenceGenerator (DataGenTableSourceFactory#createContainer)?.
Looks like the DataGenTableSource may use the stateful SequenceGenerator (DataGenTableSourceFactory#createContainer)?.
Oh, indeed, missed that one. However, it looks like the stateful property of the generated sequences is already covered by the underlying NumberSequenceSource that we use internally, so it should be a matter of simply using a deterministic GeneratorFunction to achieve the same results (i.e mapping Long to Integer, Short, Byte etc.).
I would deprecated it now so we have some incentive to migrate existing usages (in particular internal!) to migrate to the new source.
Marked as deprecated, but I am not sure if simple "Use [fullpath]DataGeneratorSource instead." is enough here.
@zentol I added the docs. Not 100% sure if the last bit about per-checkpoint generation is relevant enough for the general public. I am open to drop it if you think it is not.
Marked as deprecated, but I am not sure if simple "Use [fullpath]DataGeneratorSource instead." is enough here.
You could extend it a bit to state that stateful generator functions should instead be re-implemented to be deterministic based on the index.
@zentol I modified the docs according to your proposals, PTAL.
We're running into a bug in the Japicmp plugin; I'll try to resolve that upstream. https://github.com/siom79/japicmp/issues/341
FYI: my fix has been merged; we're just waiting for the next japicmp release.
Oct 18 11:05:28 [ERROR] Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 42.188 s <<< FAILURE! - in org.apache.flink.architecture.rules.ApiAnnotationRules
Oct 18 11:05:28 [ERROR] ApiAnnotationRules.PUBLIC_API_METHODS_USE_ONLY_PUBLIC_API_TYPES Time elapsed: 0.282 s <<< ERROR!
Oct 18 11:05:28 com.tngtech.archunit.library.freeze.StoreUpdateFailedException: Updating frozen violations is disabled (enable by configuration freeze.store.default.allowStoreUpdate=true)
Oct 18 11:05:28 at com.tngtech.archunit.library.freeze.ViolationStoreFactory$TextFileBasedViolationStore.save(ViolationStoreFactory.java:125)
Oct 18 11:05:28 at com.tngtech.archunit.library.freeze.FreezingArchRule$ViolationStoreLineBreakAdapter.save(FreezingArchRule.java:279)
Oct 18 11:05:28 at com.tngtech.archunit.library.freeze.FreezingArchRule.removeObsoleteViolationsFromStore(FreezingArchRule.java:147)
Oct 18 11:05:28 at com.tngtech.archunit.library.freeze.FreezingArchRule.removeObsoleteViolationsFromStoreAndReturnNewViolations(FreezingArchRule.java:139)
Oct 18 11:05:28 at com.tngtech.archunit.library.freeze.FreezingArchRule.evaluate(FreezingArchRule.java:120)
Oct 18 11:05:28 at com.tngtech.archunit.lang.ArchRule$Assertions.check(ArchRule.java:80)
Oct 18 11:05:28 at com.tngtech.archunit.library.freeze.FreezingArchRule.check(FreezingArchRule.java:96)
:weary: What is it this time...
Rebased the PR and it now incorporates #21372 to resolve the japicmp issues. Final CI issue was an outdated exclusion in the archunit store.