flink icon indicating copy to clipboard operation
flink copied to clipboard

[FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

Open afedulov opened this issue 3 years ago • 2 comments

What is the purpose of the change

Implements FLIP-238

Brief change log

Adds a new FLIP-27-based source for data generation.

Verifying this change

Added integration and unit tests for verifying the data generation process.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no) :
    • Change from private to package private access for NumberSequenceSource.CheckpointSerializer
    • Adds SourceReaderContext.currentParallelism() for calculating required subtasks data generation rates
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

afedulov avatar Sep 05 '22 15:09 afedulov

CI report:

  • 3169231ceb90e2242e88e7a276190beda45f7e67 UNKNOWN
  • a39f4934524b3dfacc4be299b4a3ff4d3396b549 Azure: SUCCESS
Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

flinkbot avatar Sep 05 '22 15:09 flinkbot

@zentol I addressed your comments, could you please take a look?

afedulov avatar Sep 22 '22 13:09 afedulov

@zentol the latest three commits address the main open discussion threads:

  • DataGeneratorSource is moved into a separate connector module
  • RateLimiterStrategy is added
  • SourceReaderFactory-based constructor is sealed PTAL. Thanks.

afedulov avatar Oct 10 '22 13:10 afedulov

@zentol, thanks for the quick feedback. I addressed your comments.

afedulov avatar Oct 11 '22 12:10 afedulov

Tried it out in some internal projects and it worked like charm. :heart:

Perfect, glad to hear! :clap:

Should we deprecate the existing DataGeneratorSource in flink-streaming-java?

Good question. It seems to be mainly used for Row-based data generation, so some work might be needed if we also want to provide utilities for our new generator for this use case out of the box. It also claims to be "stateful" DataGeneratorSource.java#L35, however, the actual generators seem to all default to no-op DataGenerator.java#L45, so it should be no issue to use the generator function instead. The next goal is to deprecate SourceFunction and the old DataGeneratorSource would fall into the "to transitively deprecate" bucket anyhow. We can either do it here or in the next step, I have no preference.

What about documentation?

:+1: I'll work on that.

afedulov avatar Oct 11 '22 14:10 afedulov

I guess the FLIP also needs an update according to some interface changes that took place.

afedulov avatar Oct 11 '22 14:10 afedulov

I would deprecated it now so we have some incentive to migrate existing usages (in particular internal!) to migrate to the new source.

the actual generators seem to all default to no-op DataGenerator.java#L45, so it should be no issue to use the generator function instead

Looks like the DataGenTableSource may use the stateful SequenceGenerator (DataGenTableSourceFactory#createContainer)?.

zentol avatar Oct 12 '22 10:10 zentol

Looks like the DataGenTableSource may use the stateful SequenceGenerator (DataGenTableSourceFactory#createContainer)?.

Oh, indeed, missed that one. However, it looks like the stateful property of the generated sequences is already covered by the underlying NumberSequenceSource that we use internally, so it should be a matter of simply using a deterministic GeneratorFunction to achieve the same results (i.e mapping Long to Integer, Short, Byte etc.).

I would deprecated it now so we have some incentive to migrate existing usages (in particular internal!) to migrate to the new source.

Marked as deprecated, but I am not sure if simple "Use [fullpath]DataGeneratorSource instead." is enough here.

afedulov avatar Oct 13 '22 13:10 afedulov

@zentol I added the docs. Not 100% sure if the last bit about per-checkpoint generation is relevant enough for the general public. I am open to drop it if you think it is not.

afedulov avatar Oct 13 '22 13:10 afedulov

Marked as deprecated, but I am not sure if simple "Use [fullpath]DataGeneratorSource instead." is enough here.

You could extend it a bit to state that stateful generator functions should instead be re-implemented to be deterministic based on the index.

zentol avatar Oct 13 '22 13:10 zentol

@zentol I modified the docs according to your proposals, PTAL.

afedulov avatar Oct 14 '22 11:10 afedulov

We're running into a bug in the Japicmp plugin; I'll try to resolve that upstream. https://github.com/siom79/japicmp/issues/341

zentol avatar Oct 15 '22 10:10 zentol

FYI: my fix has been merged; we're just waiting for the next japicmp release.

zentol avatar Oct 21 '22 13:10 zentol

Oct 18 11:05:28 [ERROR] Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 42.188 s <<< FAILURE! - in org.apache.flink.architecture.rules.ApiAnnotationRules
Oct 18 11:05:28 [ERROR] ApiAnnotationRules.PUBLIC_API_METHODS_USE_ONLY_PUBLIC_API_TYPES  Time elapsed: 0.282 s  <<< ERROR!
Oct 18 11:05:28 com.tngtech.archunit.library.freeze.StoreUpdateFailedException: Updating frozen violations is disabled (enable by configuration freeze.store.default.allowStoreUpdate=true)
Oct 18 11:05:28 	at com.tngtech.archunit.library.freeze.ViolationStoreFactory$TextFileBasedViolationStore.save(ViolationStoreFactory.java:125)
Oct 18 11:05:28 	at com.tngtech.archunit.library.freeze.FreezingArchRule$ViolationStoreLineBreakAdapter.save(FreezingArchRule.java:279)
Oct 18 11:05:28 	at com.tngtech.archunit.library.freeze.FreezingArchRule.removeObsoleteViolationsFromStore(FreezingArchRule.java:147)
Oct 18 11:05:28 	at com.tngtech.archunit.library.freeze.FreezingArchRule.removeObsoleteViolationsFromStoreAndReturnNewViolations(FreezingArchRule.java:139)
Oct 18 11:05:28 	at com.tngtech.archunit.library.freeze.FreezingArchRule.evaluate(FreezingArchRule.java:120)
Oct 18 11:05:28 	at com.tngtech.archunit.lang.ArchRule$Assertions.check(ArchRule.java:80)
Oct 18 11:05:28 	at com.tngtech.archunit.library.freeze.FreezingArchRule.check(FreezingArchRule.java:96)

:weary: What is it this time...

zentol avatar Oct 21 '22 13:10 zentol

Rebased the PR and it now incorporates #21372 to resolve the japicmp issues. Final CI issue was an outdated exclusion in the archunit store.

zentol avatar Nov 23 '22 11:11 zentol