scio
scio copied to clipboard
Support Neo4jIO
Support the experimental org.apache.beam.sdk.io.neo4j.Neo4jIO introduced in Beam 2.38. This is inspired by JdbcIO.
Request for comment: Beam only provides readAll, hence the usage of sc.parallelize(Seq(())).applyTransform to simulate a read.
- I don't know if it is the proper way to do this.
- Maybe @mattcasters knows why a simple
readis not provided in Beam.
Codecov Report
Merging #4488 (083129c) into main (36935c1) will decrease coverage by
0.30%. The diff coverage is40.00%.
:exclamation: Current head 083129c differs from pull request most recent head 2054563. Consider uploading reports for the commit 2054563 to get more accurate results
@@ Coverage Diff @@
## main #4488 +/- ##
==========================================
- Coverage 60.48% 60.18% -0.31%
==========================================
Files 275 279 +4
Lines 9882 10086 +204
Branches 438 840 +402
==========================================
+ Hits 5977 6070 +93
- Misses 3905 4016 +111
| Impacted Files | Coverage Δ | |
|---|---|---|
| ...rc/main/scala/com/spotify/scio/neo4j/Neo4jIO.scala | 29.41% <29.41%> (ø) |
|
| ...in/scala/com/spotify/scio/neo4j/Neo4jOptions.scala | 100.00% <100.00%> (ø) |
|
| .../spotify/scio/neo4j/syntax/SCollectionSyntax.scala | 100.00% <100.00%> (ø) |
|
| .../spotify/scio/neo4j/syntax/ScioContextSyntax.scala | 100.00% <100.00%> (ø) |
|
| ...om/spotify/scio/elasticsearch/CoderInstances.scala | 44.11% <0.00%> (-5.89%) |
:arrow_down: |
| ...om/spotify/scio/elasticsearch/CoderInstances.scala | 42.42% <0.00%> (-5.86%) |
:arrow_down: |
| ...com/spotify/scio/bigquery/types/TypeProvider.scala | 47.22% <0.00%> (-2.78%) |
:arrow_down: |
| ...la/com/spotify/scio/bigquery/client/BigQuery.scala | 22.44% <0.00%> (-2.56%) |
:arrow_down: |
| ...rc/main/scala/com/spotify/scio/util/ScioUtil.scala | 59.25% <0.00%> (-2.28%) |
:arrow_down: |
| ...n/scala/com/spotify/scio/extra/annoy/package.scala | 80.00% <0.00%> (-2.06%) |
:arrow_down: |
| ... and 43 more |
Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.
Maybe @mattcasters knows why a simple read is not provided in Beam.
Frankly I couldn't come up with a good way to parallelise a Cypher query in a generic way. If you would read just nodes or relationships you could split up the natural internal ID address space. For a generic cypher query it's not that trivial. The "solution" then is to provide sets of parameters to the query which is then executed multiple times in parallel. With a bit of luck you can get to the same embarrassingly parallel solution.
Frankly I couldn't come up with a good way to parallelise a Cypher query in a generic way.
Thanks for your answer!
I may be mistaken, but that indeed prevents using Splitable DoFn to get parallel IO, however it is still possible to have a naïve read that simply runs on a single node. I think that's what JdbcIO does — it only includes an outputParallelization parameter to shuffle the output.
With readAll, if we have only one cypher query to execute, we need to provide a one-discarded-element PCollection, which is kind of ugly.
To say things differently, this is not about the parallelisation feature, but the availability of read in the API, for convenience.
@turb for parallel reading in jdbc, we have the jdbcShardedSelect:
It takes a shardColumn and a numShards with
shardColumnbeing uniform and linear, where we can know the min and max valuesnumShardsto split theshardColumspace above into x buckets
we can potentially do smth like this
sc.paralellize(Range(min, shard1), Range(shard1, shard2), Range(shard2, max)) // to be computed ?
.applyTransform(
beam.Neo4jIO
.readAll[Unit, T]()
...
.withCypher(shardedCypher) // ensures we have $min $max filter
.withParametersFunction((r: Range) => Map("min" -> r.min, "max" -> r.max))
Maybe we can also create a very basic use-case in
scio-exampleif you know one
Added one in fixup 7669f0abc01562933efd32a9108935667b74b6f4
for parallel reading in jdbc, we have the
jdbcShardedSelect
I suppose @mattcasters has an opinion on it.
However, since the Beam impl is still experimental, is it a good idea to build this kind of pattern on top of it yet?
IMHO it's more for further direction.
I'd prefer incremental development, where we can start with the propose read and write and continue discussion in other issue/PR
I'd be happy to consider any and all changes to Neo4jIO. It was really hard to get it across the finish line because of all the new rules and regulations that are in effect in the Beam project which do not apply to existing code like JdbcIO. Because of this, both IOs are not the same. Neo4jIO was a bit of a challenge since there was no other clean code for an IO available. Months was spent on it. However, now that the code exists we can expand functionality. A Neo4j shard selector would indeed be something cool to consider. Feel free to create an issue in the beam project and I'd be happy to investigate.
I took some liberty and pushed on your branch some integration tests for neo4j.
That's great! Many thanks.
I have locally updated the Neo4j docker image pulled by testContainers to 4.4.10 from 3.5.0, as the later has no Apple Silicon version, hence runs forever and timeouts on my laptop. Should I share this change?
Thanks a lot for fixing the issues @turb
I think the PR is good to go now. I'll open an issue in magnolify to support the RowMapper automatic derivation as neotype core package comes with a lot of extras.
Thanks a lot for fixing the issues @turb I think the PR is good to go now. I'll open an issue in magnolify to support the
RowMapperautomatic derivation asneotypecore package comes with a lot of extras.
Great! Should we squash the fixups, or will this be managed by the merger?
we'll do a squash merge for the whole PR