scio
scio copied to clipboard
Improve Flink runner support
Here’s an incomplete list of tasks. We can break them down further or create new issues to track as we go.
- [x] Make sure Flink runner runs all Beam examples BEAM-3924
- [x] Implement Flink specific
ScioContext
andScioResult
- [x] Work out a dependency strategy and fix existing conflicts
- [x] Make sure Flink runner runs all
scio-examples
- [ ]
FlinkRunnerResult
doesn't support non-blockingrun()
andcancel()
BEAM-593
From @jto
https://github.com/sbt/sbt-assembly#shading A better solution is to rename our versions of the conflicting packages.
++ 2.11.12
set libraryDependencies += "org.apache.beam" %% "beam-runners-flink" % "2.4.0"
set assemblyShadeRules in assembly := Seq(
ShadeRule.rename("org.objenesis.**" -> s"org.objenesis_shaded.@1").inAll,
ShadeRule.rename("com.esotericsoftware.kryo.**" -> s"com.esotericsoftware.kryo_shaded.@1").inAll,
ShadeRule.rename("com.twitter.chill.**" -> s"com.twitter.chill_shaded.@1").inAll
)
assembly
@Igosuki @bkirwi FYI, would love to collaborate on this.
Very interested in this as well. I've been attempting to use scio with a local Flink cluster and am running into the kryo compatibility issues listed in https://github.com/spotify/scio/issues/1128 . The gist is we're looking to be able to run pipelines locally outside of Google Cloud Dataflow but much faster/more efficiently than the DirectRunner.
DirectRunner
is intended for local testing and does a lot more checks, e.g. mutability, serialization. You should NOT run production pipeline with it. That said, you can turn those checks off to make it run much faster, but if you don't plan to run it in a distributed system, it's probably better to write simple java/scala code without such a heavy weight framework.
There are steps in between "running in production" and "testing". We have environments where google cloud dataflow is not a good option for us. Further, this is but one example of wanting to use Flink or Spark (we typically dislike tying ourselves to a single vendor; it somewhat flies in the face of using Beam in the first place if we can only use one runner in production).
Awesome. Feel free to submit issues or PRs.
I'm happy to help however I can. Was mostly trying to revive this open ticket and see if anyone is working on it.
For context, I've been attempting this combination of tools:
- scio 0.8.0-alpha1
- beam 2.12.0
- flink 1.7.2
- scala 2.12
I've been attempting the flink/beam/scala2_12 workaround documented here: https://issues.apache.org/jira/browse/BEAM-7544?focusedCommentId=16863149&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16863149 and have been testing against a flink cluster running locally (well, in docker, but local enough).
The issue seems to be kryo/objenesis incompatibilities between the Flink server itself and what's bundled with Scio/Beam (per https://github.com/spotify/scio/issues/1128 ).
Next up, I excluded kryo
from the flink dependencies, but it did not solve this problem as Flink itself still natively uses an ancient Kryo version (see https://issues.apache.org/jira/browse/FLINK-3154 ).
I'm interested in the Shade/package rename approach you mentioned above (https://github.com/spotify/scio/issues/1085#issuecomment-376653743) as it may be a viable path forward. I'm otherwise having no issues submitting the jobs to a remote Flink cluster.
Related issues:
- https://github.com/spotify/scio/issues/2171
- https://github.com/spotify/scio/issues/2642
From our recent experiments (porting a real production Scio 0.9 job to Flink), the biggest problem is different incompatible versions of Kryo being used by Scio and Flink. Downgrading Kryo in Scio and to set the classloading to parent-first in Flink which seems to work but is not an ideal solution.
Indeed, the problem is « different instances of Kryo within the same JVM, all using distinct major versions »
I was unable to figure out whether Flink’s private use of Kryo can be entirely dissociated from Scio’s (meaning that each serdeses objects in orthogonal channels) or whether we really must have a single instance of Kryo where objects from all three of Flink, Scio and User Code are intertwined.
Given Flink’s very careful shading of about everything under the sun except kryo, I’m really afraid the latter; but I lacked the bandwidth to complete my attempt at shading Kryo within a private Flink build and seeing what happens in a dev build of Scio (unfortunately, my lack of bandwidth is permanent for the foreseeable future, my apologies for that).
Le 25 juin 2020 à 15:18, Julien Tournay [email protected] a écrit :
From our recent experiments (porting a real production Scio 0.9 job to Flink), the biggest problem is different incompatible versions of Kryo being used by Scio and Flink. Downgrading Kryo in Scio and to set the classloading to parent-first in Flink which seems to work but is not an ideal solution.
— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/spotify/scio/issues/1085#issuecomment-649535334, or unsubscribe https://github.com/notifications/unsubscribe-auth/AALR7MAWWP6UKCQWUGYWCHLRYNFCDANCNFSM4EXMXBVA.
Yes, conflicting classes end up being loaded by the same classloader, normally you should be able to rely on the provided version of the environment. If you have an urgent business need to make this work, simply fork scio and change kryo to what you want.
No, the problem is the Flink-supplied Kryo. To go along the line you describe, one would need to first make a private build of Flink using that Kryo, then make a private build of BEAM, then make a private build of Scio.
Since my reason to use scio/Flink was to get out of years of being trapped into a private build of scalding+Tez in the first place… This doesn’t really fly for now.
(No urgency for me, just putting this on the record in case it helps anyone)
Le 30 juin 2020 à 09:57, Guillaume Balaine [email protected] a écrit :
Yes, conflicting classes end up being loaded by the same classloader, normally you should be able to rely on the provided version of the environment. If you have an urgent business need to make this work, simply fork scio and change kryo to what you want.
— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/spotify/scio/issues/1085#issuecomment-651621110, or unsubscribe https://github.com/notifications/unsubscribe-auth/AALR7MEGXXADZOTXGD4EP2TRZGLIFANCNFSM4EXMXBVA.
@Igosuki is right, changing only Scio's deps works. We recently did some experimentation internally and "fixing" the deps conflicts (chill needs to be downgraded) is enough. You also need to force Flink to use parent-first classloading.
True, using parent-first classloading indeed.
(still, checking whether Flink requires a shared Kryo or could be content with shading its own for its own purposes would help a long way. I lack the time and appropriate context for this at the moment, unfortunately)
Le 30 juin 2020 à 11:26, Julien Tournay [email protected] a écrit :
@Igosuki https://github.com/Igosuki is right, changing only Scio's deps works. We recently did some experimentation internally and "fixing" the deps conflicts (chill needs to be downgraded) is enough. You also need to force Flink to use parent-first classloading.
— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/spotify/scio/issues/1085#issuecomment-651675753, or unsubscribe https://github.com/notifications/unsubscribe-auth/AALR7MECLWQ5B4BHZ6KWTSDRZGVTLANCNFSM4EXMXBVA.
Last requirement
FlinkRunnerResult doesn't support non-blocking run() and cancel() BEAM-593
is fixed since beam 2.50
/scio 0.13.3
can we close this issue @jto ?
yes