druid
druid copied to clipboard
Spark Direct Readers and Writers for Druid.
Implements #9780
Description
See #9780 and linked discussions for more context. This PR adds a new module, druid-spark, containing Spark direct readers and writers for Druid. Usage is documented in the module documentation.
As discussed on the dev mailing list, a summary in human language of the UX and testing regimen follows:
UX
The entry point for users is deceptively simple: all interaction is handled through the existing spark interfaces plus configuration. To be specific, users read Druid data into a Spark dataframe via
Dataset<Row> df = sparkSession
.read()
.format("druid")
.schema(schema)
.options(propertiesMap)
.load();
and write a Spark dataframe to a Druid data source with
df
.write()
.format("druid")
.mode(SaveMode.Overwrite)
.options(propertiesMap)
.save();
The meat of the interaction is through the propertiesMap passed to the reader or writer. These properties, cataloged in the documentation mostly follow the corresponding Druid properties. If desired, there are typed helpers for setting these options in org.apache.druid.spark.DruidDataFrameReader and org.apache.druid.spark.DruidDataFrameWriter as well. Sample usages of these helpers are
import org.apache.druid.spark.DruidDataFrameReader
sparkSession
.read
.brokerHost("localhost")
.brokerPort(8082)
.metadataDbType("mysql")
.metadataUri("jdbc:mysql://druid.metadata.server:3306/druid")
.metadataUser("druid")
.metadataPassword("diurd")
.dataSource("dataSource")
.druid()
and
import org.apache.druid.spark.DruidDataFrameWriter
import org.apache.druid.spark.model.LocalDeepStorageConfig
val deepStorageConfig = new LocalDeepStorageConfig().storageDirectory("/mnt/druid/druid-segments/")
df
.write
.metadataDbType("mysql")
.metadataUri("jdbc:mysql://druid.metadata.server:3306/druid")
.metadataUser("druid")
.metadataPassword("diurd")
.version(1)
.deepStorage(deepStorageConfig)
.mode(SaveMode.Overwrite)
.dataSource("dataSource")
.druid()
There are a few key areas to be aware of:
First, due to Spark's design, DataSourceWriters can not repartition the dataframe they are responsible for writing and they have very little information about the overall partitioning. To compensate for this, the writer includes a number of partitioners out of the box. These partitioners by necessity have no context for the data they are partitioning and so will be slower than usage-specific partitioners but are suitable for prototyping. If the provided partitioners are not used, there are a few behaviors to be aware of. For "simple" ShardSpecs (LinearShardSpec and NumberedShardSpec) the writer will default to rationalizing the output segments into contiguous and complete blocks, ensuring that loading and overshadowing of the output will be handled atomically by Druid. For more complex shard spec types such as HashBasedNumberedShardSpec and SingleDimensionShardSpec, users will need to provided a partition map to the writer, linking Spark partition ids to information required to construct the corresponding Druid segments. The included Spark partitioners all provide partition maps and can be used as references for how to implement similar behavior in custom partitioners.
Second, because this code executes in a Spark cluster rather than a Druid cluster, it cannot take advantage of Druid's extension support directly. Instead, these connectors utilize a plugin registry system that parallels Druid's extensions. The complex metric types, metadata stores, and deep storage types supported by core Druid extensions are also supported out of the box with this extension, with the exception of Azure deep storage. If users wish to implement their own plugins to handle specific complex metrics types, metadata servers, shard specs, or deep storage implementations, they can register plugins to handle their use cases with the respective plugin registries before loading or saving data and the Spark connectors will use the provided logic for the corresponding tasks.
Testing
Testing is handled via a combination of unit tests and light integration tests. Testing is focused on the core Spark code that handles reading and writing data, although most functionality is covered by at least one unit test. The key classes are DruidDataSourceReader, DruidInputPartitionReader, and DruidInputPartition on the read side and DruidDataSourceWriter, DruidDataWriterFactory, and DruidDataWriter on the write side. The tests are in the corresponding *Suite classes. Additionally, there is a lightweight round-trip test in DruidDataSourceV2 suite which writes out test data to local segments, updates metadata in an embedded Derby instance, reads the segments back into a Dataframe, and confirms that the rows read in matches the rows written out. This test also confirms that the metadata entries created by the writer is correct.
The main gap in testing is the cross-compatibility matrix. I'm not sure how to repeatably test these connectors' interaction with deep storage types other than local and with metadata servers other than derby. This code has been run at scale against Druid deployments using HDFS for deep storage and Postgresql for metadata and I am aware of production users with S3 and GCP deep storage, but the Azure support is lightly modified from how the corresponding Druid extension handles configuration and otherwise untested beyond parsing.
This PR has:
- [x] been self-reviewed.
- [x] added documentation for new or modified features or behaviors.
- [x] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
- [x] added or updated version, license, or notice information in licenses.yaml
- [x] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
- [x] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
- [ ] added integration tests.
- [x] been tested in a test Druid cluster.
Key changed/added classes in this PR
spark/*
I've added some more tests, improved some comments, and added support for configuring non-default multi-value handling and indexing for string dimensions.
I also see that instructions for how to update the licenses and notices have been committed, but I'll wait on doing that until this has a little more traction π
Is anyone actively reviewing this? If not, I have a bit of a refactor I'd like to push that improves the organization of the code and hopefully makes it easier to follow.
Is anyone actively reviewing this? If not, I have a bit of a refactor I'd like to push that improves the organization of the code and hopefully makes it easier to follow.
Please post this update.
Life intervened as always, so I haven't thought about this for a while. I've just pushed that refactor I mentioned above that improves the package and class layout. The commit also bumps the target Spark and scala patch versions (to 2.4.7 & 2.12.12, respectively) to pull in security fixes, adds more unit tests, and improves the code documentation. Finally, the commit adds two new features: the ability to specify SQL-compliant or Druid default value null handling (via the reader.useDefaultValueForNull config option) and experimental support for columnar reads. See the extension documentation for more detail.
Awesome, I have had to resort adding a InitializeForTesting call from the NullHandliing module to the writer class, to ensure that NullHandling was initialized on the executors or else exceptions were thrown. Glad that is properly fixed up now.
Added the writer.useDefaultValueForNull config option as well to control how nulls are handled while writing.
I pushed up a commit to clean up the documentation as you requested @samarthjain. I also added support for building a library jar suitable for adding to a Spark cluster with mvn clean package -pl extensions-core/spark-extensions (edit 7/14/21: use mvn clean package -pl spark). To keep the size reasonable I added dependency exclusions to the pom. I haven't yet made sure that we're excluding every dependency possible, but even the obvious first cut trimmed the size by over 60%.
@JulianJaffePinterest I have tried to create a jar file from your PR branch and copied it to my spark code. I am using it to read a druid data source using the following code (I have retrieved the segments directly via the DruidMetaClient ):
val readDf = sparkSession .read .format("druid") .options(Map("segments" -> segmentsString)) .load()
but I keep on hitting the error:
icationMaster.runDriver(ApplicationMaster.scala:472) at org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$$runImpl(ApplicationMaster.scala:308) at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply$mcV$sp(ApplicationMaster.scala:248) at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:248) at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:248) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:783) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1926) at org.apache.spark.deploy.yarn.ApplicationMaster.doAsUser(ApplicationMaster.scala:782) at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:247) at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:807) at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala) Caused by: java.lang.ClassNotFoundException: Failed to find data source: druid. Please find packages at http://spark.apache.org/third-party-projects.html at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:675) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:213) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:186) at LoadData$.main(LoadData.scala:80) at LoadData.main(LoadData.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:688) Caused by: java.lang.ClassNotFoundException: druid.DefaultSource at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:652) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:652) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:652) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:652) at scala.util.Try.orElse(Try.scala:84) at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:652) ... 9 more
Not sure what I am missing here exactly but I did not see a test case for this in your PR as well.
@birTiwana the problem is that you don't have the org.apache.spark.sql.sources.DataSourceRegister resource in your META-INF/services, so the short name resolution isn't working. How are you building the jar?
If you're using mvn clean package -pl extensions-core/spark-extensions, try deleting the line in the pom that sets the ServicesResourcesTransformer (<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> ). If you don't want to include the resource in your META-INF services, you can also just use the fully qualified name instead: org.apache.druid.spark.v2.DruidDataSourceV2
org.apache.druid.spark.v2.DruidDataSourceV2
Thanks, this helped. I was building the spark-extensions jar using mvn clean package -pl extensions-core/spark-extensions
The copying it to my scala project's lib folder and then packaging the whole project using sbt assembly
Ah yes, the scala version for this extension is 2.12.12 - it won't work with 2.13.5. What Spark version are you using?
Ah yes, the scala version for this extension is 2.12.12 - it won't work with 2.13.5. What Spark version are you using?
2.4.5
I've had a number of conversations about these connectors and how to use them. Common pain points are partitioning and user-friendliness, and so I've added three new partitioners, ergonomic ways to use the new partitioners, and a semi-typed way to configure the readers and writers. The improved ergonomics do come at the cost of introducing scala implicits into the project, which I have tried to avoid to ease comprehension for other developers. However, I think the tradeoff here is worth it. See the documentation for more details for more details.
Example usages:
Configuring the reader:
import org.apache.druid.spark.DruidDataFrameReader
sparkSession
.read
.brokerHost("localhost")
.brokerPort(8082)
.metadataDbType("mysql")
.metadataUri("jdbc:mysql://druid.metadata.server:3306/druid")
.metadataUser("druid")
.metadataPassword("diurd")
.dataSource("dataSource")
.druid()
Configuring the writer:
import org.apache.druid.spark.DruidDataFrameWriter
val deepStorageConfig = new LocalDeepStorageConfig().storageDirectory("/mnt/druid/druid-segments/")
df
.write
.metadataDbType("mysql")
.metadataUri("jdbc:mysql://druid.metadata.server:3306/druid")
.metadataUser("druid")
.metadataPassword("diurd")
.version(1)
.deepStorage(deepStorageConfig)
.mode(SaveMode.Overwrite)
.dataSource("dataSource")
.druid()
Using the new partitioners and the ergonomic approach to passing the a partition map to the writer:
import org.apache.druid.spark.DruidDataFrame
import org.apache.druid.spark.DruidDataFrameWriter
val deepStorageConfig = new LocalDeepStorageConfig().storageDirectory("/mnt/druid/druid-segments/")
df
.rangePartitionerAndWrite(tsCol, tsFormat, granularityString, rowsPerPartition, partitionCol)
.metadataDbType("mysql")
.metadataUri("jdbc:mysql://druid.metadata.server:3306/druid")
.metadataUser("druid")
.metadataPassword("diurd")
.version(1)
.deepStorage(deepStorageConfig)
.mode(SaveMode.Overwrite)
.dataSource("dataSource")
.druid()
@JulianJaffePinterest - thanks for the updates. This looks good to me now. Could you rebase the PR?
@samarthjain Done! I also updated LICENSE and NOTICE for the new dependencies
Is there any hope that this gets merged? From our perspective, the native ingestion is brittle when put in a dynamic cloud like Kubernetes. Having a Spark-based ingestion would be super.
Once @JulianJaffePinterest pushes fixes to take care of Travis failures, I think this is good to be merged upstream.
@JulianJaffePinterest - looks like there are a few more failures - https://travis-ci.com/github/apache/druid/jobs/520783859
@JulianJaffePinterest providing implicits like .rangePartitionerAndWrite looks nice indeed. And it looks familiar from what we did as well ;)
You mentioned .rangePartitionerAndWrite in https://github.com/apache/druid/pull/10920#issuecomment-849417236 but there doesn't seem to be anything about it in the extension doc yet. Would it be good to add? And are there actually more implicits than that (one for each partitioner type)?
@juhoautio-rovio thanks for beta-testing the documentation π! If the implicit wrappers were to be added to the documentation, where you hope to find them? (They are actually mentioned in the extension docs, but obviously not clearly enough given that you couldn't find them. I'd love any suggestions you might have on improving the documentation.)
@JulianJaffePinterest thanks.
I'd love any suggestions you might have on improving the documentation
I would expect to see the most common implicit partitioner method used in the sample code for Writer https://github.com/JulianJaffePinterest/druid/blob/spark_druid_connector/docs/development/extensions-core/spark.md#writer
I suppose that would simply mean including the sample from your comment https://github.com/apache/druid/pull/10920#issuecomment-849417236 above: the one about Using the new partitioners and the ergonomic approach to passing the a partition map to the writer.
I would also expect a table that lists the provided partitioners + the corresponding implicit methods for using them conveniently.
They are actually mentioned in the extension docs
Do you mean the doc section Partitioning & PartitionMaps?
One partitioner class name is mentioned in this section: HashedNumberedSegmentPartitioner β what about others?
It also has this remark:
see the provided partitioners for examples
Is it just my mistake to not find where those are listed? If I should be looking at some other doc, could there be at least a link to that?
@samarthjain do you know who maintains the Travis checks on PRs? I'm trying to figure out what's happening with the intelliJ inspections test (it's the only failing check for most of my recent builds). The errors are
[ERROR] pom.xml:1790 -- Element forkCount is not allowed here #loc
[ERROR] pom.xml:1790 -- Cannot resolve symbol 'forkCount' #loc
[ERROR] pom.xml:1790 -- Cannot resolve symbol 'forkCount' #loc
[ERROR] pom.xml:1791 -- Element reuseForks is not allowed here #loc
[ERROR] pom.xml:1791 -- Element reuseForks is not allowed here #loc
[ERROR] pom.xml:1791 -- Cannot resolve symbol 'reuseForks' #loc
[ERROR] pom.xml:1791 -- Cannot resolve symbol 'reuseForks' #loc
which are args being passed to the surefire plugin when the parallel-test profile is set. I haven't made any changes to that profile (or to the base pom beyond adding the new module), and I've even removed the surefire configuration changes I made in the spark extension, but the error remains. I'm pretty sure it's erroneous, since all the other checks are able to execute maven commands. Googling hasn't turned up much beyond a suggestion to check the bundled xml schema used by the inspector so it would be great to talk to someone who knows a little more about what's happening under the hood for this check
Follow up: I solved this problem by explicitly setting the surefire version in the plugin section where the complaining configuration lived, even though the version was already set in the plugin management section.
@samarthjain whenever you get a moment it's green π
@samarthjain Canβt wait to have this merged.
Hi @JulianJaffePinterest, apologize for not having a look earlier and thank you for your patience! I know there are many people out there who want this, so thank you again for making this PR. I haven't gone through all changes because this is a giant PR. I bet none of committers would probably not able to do a careful review for such a giant PR in the near future. Do you think you can split this into smaller PRs? That will actually make the review process faster. I also have 2 comments about this project.
- What is the current state of these APIs? Is it production-ready? Or is it experimental? This should be documented.
- AFAIT, the thing you added is not a Druid extension, but spark APIs to read from or write to Druid, isn't it? If so, I think it makes more sense to place these codes in a new directory under the druid root such as
${DRUID_ROOT}/spark-apis.
@jihoonson @samarthjain has been reviewing the pr but yes I agree it's too large to effectively review. I'm not sure how to reduce it to pieces though - splitting it into just readers or just writers still means that the first pr is very large (since it would still need all the shared code). Do you have any suggestions there?
As to your questions,
- These APIs are production ready (and are in fact being used in production at multiple organizations). Some of the cross-matrix options (deep storage types, sql metadata types) are more experimental (e.g. I'm unaware of any production users writing to Azure deep storage).
- That makes sense to me from a code organization perspective. I can make that change
@jihoonson IMHO, for a completely new big feature like this, isn't it better to be submitted as 1 PR? If there is something wrong in the future, the git forensic would be much easier.
@samarthjain whenever you get a moment it's green π
I have been out over the last 2 weeks. Taking a look now.
No worries @samarthjain π
@jihoonson @samarthjain has been reviewing the pr but yes I agree it's too large to effectively review. I'm not sure how to reduce it to pieces though - splitting it into just readers or just writers still means that the first pr is very large (since it would still need all the shared code). Do you have any suggestions there?
@JulianJaffePinterest I don't fully understand how this change is structured as I'm unfamiliar with scala, so it is hard for me to suggest something good based on code structure. An idea is creating a branch for this project and committing changes to there until most of changes are committed. Since you are going to create a PR against a branch, the PR doesn't have to be atomic, but can have incomplete changes or features. This can make each PR smaller. When the branch is ready, you can make another PR to merge the branch into master. This PR can be giant but should be OK because all changes in the branch is already reviewed. This could be an option if you think it will help with review. Note that I tagged "Design Review" for this PR which means this PR requires 3 approvals from committers for its design (user APIs, important interfaces, etc) and 1 approval for code changes. The same committers can review both design and code changes.
- These APIs are production ready (and are in fact being used in production at multiple organizations). Some of the cross-matrix options (deep storage types, sql metadata types) are more experimental (e.g. I'm unaware of any production users writing to Azure deep storage).
Thanks for the answer. What is most important is not giving a wrong idea to users. Can you please document what matrix options are well tested, so that readers can get some idea about it? BTW, I don't see any new tests in .travis.yml. Unless they are running somewhere else, we should add some.
@jihoonson IMHO, for a completely new big feature like this, isn't it better to be submitted as 1 PR? If there is something wrong in the future, the git forensic would be much easier.
Every PR is squashed when it is merged into master, so there is only one commit per PR in the master branch. Making some change atomic (that means, one feature is added in one PR instead of being spread across multiple PRs) can help with release, but I'm not sure how else huge PRs can make something easier..