scio icon indicating copy to clipboard operation
scio copied to clipboard

Support Neo4jIO

Open turb opened this issue 3 years ago • 12 comments

Support the experimental org.apache.beam.sdk.io.neo4j.Neo4jIO introduced in Beam 2.38. This is inspired by JdbcIO.

Request for comment: Beam only provides readAll, hence the usage of sc.parallelize(Seq(())).applyTransform to simulate a read.

  1. I don't know if it is the proper way to do this.
  2. Maybe @mattcasters knows why a simple read is not provided in Beam.

turb avatar Aug 05 '22 21:08 turb

Codecov Report

Merging #4488 (083129c) into main (36935c1) will decrease coverage by 0.30%. The diff coverage is 40.00%.

:exclamation: Current head 083129c differs from pull request most recent head 2054563. Consider uploading reports for the commit 2054563 to get more accurate results

@@            Coverage Diff             @@
##             main    #4488      +/-   ##
==========================================
- Coverage   60.48%   60.18%   -0.31%     
==========================================
  Files         275      279       +4     
  Lines        9882    10086     +204     
  Branches      438      840     +402     
==========================================
+ Hits         5977     6070      +93     
- Misses       3905     4016     +111     
Impacted Files Coverage Δ
...rc/main/scala/com/spotify/scio/neo4j/Neo4jIO.scala 29.41% <29.41%> (ø)
...in/scala/com/spotify/scio/neo4j/Neo4jOptions.scala 100.00% <100.00%> (ø)
.../spotify/scio/neo4j/syntax/SCollectionSyntax.scala 100.00% <100.00%> (ø)
.../spotify/scio/neo4j/syntax/ScioContextSyntax.scala 100.00% <100.00%> (ø)
...om/spotify/scio/elasticsearch/CoderInstances.scala 44.11% <0.00%> (-5.89%) :arrow_down:
...om/spotify/scio/elasticsearch/CoderInstances.scala 42.42% <0.00%> (-5.86%) :arrow_down:
...com/spotify/scio/bigquery/types/TypeProvider.scala 47.22% <0.00%> (-2.78%) :arrow_down:
...la/com/spotify/scio/bigquery/client/BigQuery.scala 22.44% <0.00%> (-2.56%) :arrow_down:
...rc/main/scala/com/spotify/scio/util/ScioUtil.scala 59.25% <0.00%> (-2.28%) :arrow_down:
...n/scala/com/spotify/scio/extra/annoy/package.scala 80.00% <0.00%> (-2.06%) :arrow_down:
... and 43 more

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

codecov[bot] avatar Aug 05 '22 21:08 codecov[bot]

Maybe @mattcasters knows why a simple read is not provided in Beam.

Frankly I couldn't come up with a good way to parallelise a Cypher query in a generic way. If you would read just nodes or relationships you could split up the natural internal ID address space. For a generic cypher query it's not that trivial. The "solution" then is to provide sets of parameters to the query which is then executed multiple times in parallel. With a bit of luck you can get to the same embarrassingly parallel solution.

mattcasters avatar Aug 05 '22 22:08 mattcasters

Frankly I couldn't come up with a good way to parallelise a Cypher query in a generic way.

Thanks for your answer!

I may be mistaken, but that indeed prevents using Splitable DoFn to get parallel IO, however it is still possible to have a naïve read that simply runs on a single node. I think that's what JdbcIO does — it only includes an outputParallelization parameter to shuffle the output.

With readAll, if we have only one cypher query to execute, we need to provide a one-discarded-element PCollection, which is kind of ugly.

To say things differently, this is not about the parallelisation feature, but the availability of read in the API, for convenience.

turb avatar Aug 05 '22 22:08 turb

@turb for parallel reading in jdbc, we have the jdbcShardedSelect: It takes a shardColumn and a numShards with

  • shardColumn being uniform and linear, where we can know the min and max values
  • numShards to split the shardColum space above into x buckets

we can potentially do smth like this

sc.paralellize(Range(min, shard1), Range(shard1, shard2), Range(shard2, max)) // to be computed ?
   .applyTransform(
      beam.Neo4jIO
        .readAll[Unit, T]()
        ...
        .withCypher(shardedCypher) // ensures we have $min $max filter
        .withParametersFunction((r: Range) => Map("min" -> r.min, "max" -> r.max)) 

RustedBones avatar Aug 11 '22 09:08 RustedBones

Maybe we can also create a very basic use-case in scio-example if you know one

Added one in fixup 7669f0abc01562933efd32a9108935667b74b6f4

turb avatar Aug 11 '22 10:08 turb

for parallel reading in jdbc, we have the jdbcShardedSelect

I suppose @mattcasters has an opinion on it.

However, since the Beam impl is still experimental, is it a good idea to build this kind of pattern on top of it yet?

turb avatar Aug 11 '22 10:08 turb

IMHO it's more for further direction. I'd prefer incremental development, where we can start with the propose read and write and continue discussion in other issue/PR

RustedBones avatar Aug 11 '22 10:08 RustedBones

I'd be happy to consider any and all changes to Neo4jIO. It was really hard to get it across the finish line because of all the new rules and regulations that are in effect in the Beam project which do not apply to existing code like JdbcIO. Because of this, both IOs are not the same. Neo4jIO was a bit of a challenge since there was no other clean code for an IO available. Months was spent on it. However, now that the code exists we can expand functionality. A Neo4j shard selector would indeed be something cool to consider. Feel free to create an issue in the beam project and I'd be happy to investigate.

mattcasters avatar Aug 11 '22 12:08 mattcasters

I took some liberty and pushed on your branch some integration tests for neo4j.

That's great! Many thanks.

I have locally updated the Neo4j docker image pulled by testContainers to 4.4.10 from 3.5.0, as the later has no Apple Silicon version, hence runs forever and timeouts on my laptop. Should I share this change?

turb avatar Aug 19 '22 18:08 turb

Thanks a lot for fixing the issues @turb I think the PR is good to go now. I'll open an issue in magnolify to support the RowMapper automatic derivation as neotype core package comes with a lot of extras.

RustedBones avatar Aug 23 '22 08:08 RustedBones

Thanks a lot for fixing the issues @turb I think the PR is good to go now. I'll open an issue in magnolify to support the RowMapper automatic derivation as neotype core package comes with a lot of extras.

Great! Should we squash the fixups, or will this be managed by the merger?

turb avatar Aug 23 '22 08:08 turb

we'll do a squash merge for the whole PR

RustedBones avatar Aug 23 '22 08:08 RustedBones