spark-lucenerdd
spark-lucenerdd copied to clipboard
Support for Linker with Query type
Describe the solution you'd like
I'd like to use Lucence query builder to generate my queries instead of the query String. The toString
method does not guarantee compatibility and it fails for some cases.
For example :
val addressTerms = Seq("harvard","denver")
val booleanQueryBuilder = new BooleanQuery.Builder()
addressTerms.foreach(t=>{
booleanQueryBuilder.add(new FuzzyQuery(new Term("address", t)), BooleanClause.Occur.SHOULD)
})
val booleanQuery: BooleanQuery = booleanQueryBuilder.setMinimumNumberShouldMatch(addressTerms.size/2).build()
val query = booleanQuery.toString()
Produces the following exception :
(address:harvard~2 address:denver~2 )~3': Encountered " <FUZZY_SLOP> "~3 "" at line 1, column 95.
Doing a search and replace for the token "~" is not really an option because the query won't behave as expected
Hm, is the generated QueryString the one you describe? The outer ~3
seems wrong to me.
The reason why the library does not support Lucene queries is that Lucene boolean queries are not serializable. Hence, I decided to allow only QueryString.
Is the generated QueryString the one you describe? I used the toString method from the Query , but it does not seem to work as expected.
From the user's list I found this explanation :
ToString has never been guaranteed to be re-parsable, even in Lucene
The reason why the library does not support Lucene queries is that Lucene boolean queries are not serializable. Hence, I decided to allow only QueryString.
Can we build a custom Object and wrap the query similar to what is described here?
Is the generated QueryString the one you describe? I used the toString method from the Query , but it does not seem to work as expected.
From the user's list I found this explanation :
ToString has never been guaranteed to be re-parsable, even in Lucene
The reason why the library does not support Lucene queries is that Lucene boolean queries are not serializable. Hence, I decided to allow only QueryString.
Can we build a custom Object and wrap the query similar to what is described here?
Yes, you can definetely build a custom Object and wrap the query as described in your link above. The only issue is that query object cannot be defined during runtime, only at compile time and submitted to the executors.
For example, I use the "trick" mentioned in the link for Lucene's analyzers that are not serializable, see
https://github.com/zouzias/spark-lucenerdd/blob/master/src/main/scala/org/zouzias/spark/lucenerdd/analyzers/AnalyzerConfigurable.scala
Yes, you can definetely build a custom Object and wrap the query as described in your link above. The only issue is that query object cannot be defined during runtime, only at compile time and submitted to the executors.
What can we do about this?
At the moment , I can simulate the min_should_match manually , but ideally I'd like to use the facilities lucene provide and all the different queries we could run using their builders
I think you have a valid point here and put some effort on this.
I have a prototype (see below), where you can specify a linker as a function Row => Query
. Will this work for you? I tested it a bit and do not get a serialization error!
https://github.com/zouzias/spark-lucenerdd/pull/154/files#diff-4a4a384e7770d218cc77733cd7d485a9R547
Thank you a lot for the feedback / suggestion, I think such an improvement will improve a lot the API of LuceneRDD
.
If you agree on the feature, I can quickly make a snapshot release to test.
Test example usage: https://github.com/zouzias/spark-lucenerdd/pull/154/files#diff-24d0755b325bb4d9735aee65b9ae52c3R83
Test CI: https://travis-ci.org/zouzias/spark-lucenerdd/jobs/504098544#L1532
I believe it looks as expected but it is hard for me to tell from just the diff. I would need to test and see what happens. Please release a snapshot so I can test it.
Thank you a lot for the effort , this will definitely improve the API if implemented successfully.
It is currently available under 0.3.6-SNAPSHOT
. Feel free to test.
One of my concerns is that since we specify the Query
now, there might be a mismatched between query time analysis (tokenization for example) and index time analysis.
It is currently available under
0.3.6-SNAPSHOT
. Feel free to test.One of my concerns is that since we specify the
Query
now, there might be a mismatched between query time analysis (tokenization for example) and index time analysis.
I believe it will be a mismatch if we don't specify the analyzer with the query. It is there a way we could 'overwrite ' it before running the query? For example , how can I set up a custom analyzer?
I am actually not sure who analyzers work at the moment with this project other than the config file. Could you please clarify it?
I tested the query building and it works as expected. You did an amazing job , thank you!
The only question that remains is the analyzer.
I believe something it is not working as expected.
Given my config :
lucenerdd {
// Name of analyzer as it is under Lucene's package org.apache.lucene.analysis.XX
analyzer.name = "StandardAnalyzer"
// Analyzer name must be "ngram"
analyzer {
ngram.mingram = 2
ngram.maxgram = 5
}
// Similarity scoring for Lucenes
similarity.name = "bm25" // anything else will default to Lucene classic similarity
// Supported linkage methods
// "collectbroadcast" : Collects the RDD that contains the queries (to be used only if query RDD
// fits in spark driver's memory)
//
// "cartesian" : Uses cartesian product between the partitions of the queries RDD and the partitions
// of LuceneRDD. Note it duplicates each partition of LuceneRDD n times where n is the number of
// partitions of the queries RDD.
linker.method = "cartesian"
index {
// Lucene index storage
// Use 'disk' to store the index in Java's temp directory
// Otherwise the index will be stored in memory
store.mode = "disk"
stringfields{
// Analyze text fields or not
analyzed = true
// Text fields options as in org.apache.lucene.index.IndexOptions
//
// Other options are:
// "DOCS"
// "DOCS_AND_FREQS"
// "DOCS_AND_FREQS_AND_POSITIONS"
// "DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS"
// "NONE"
options = "DOCS"
// Omit terms norms
terms.omitnorms = false
// Store term positions
terms.positions = false
// Store Term vectors (set true, otherwise LuceneRDD.termVectors(fieldName) will fail)
terms.vectors = true
}
}
// Maximum value on topK queries
query.topk.maxvalue = 5000
// Default value of number of returned results
query.topk.default = 300
// Default value of number of faceted results
query.facets.number.default = 10
}
val A = Seq(("L.L.C", "123 Main Street" , "US" , "1")).toDF("name","address","mkt_cd","map_id")
val B = Seq(("LLC", "123 Main Street" , "US" , "1")).toDF("name","address","mkt_cd","map_id")
val linkerQuery: Row => Query = row => {
val name = getString(row, "name")
val address = getString(row, "address")
val booleanQuery = new BooleanQuery.Builder()
val nameTerms = analyze(name, new StandardAnalyzer)
val addressTerms = analyze(address, new StandardAnalyzer)
if (StringUtils.isNotBlank(name) && StringUtils.isNotBlank(address)) {
val nameQuery = new BooleanQuery.Builder()
nameTerms.foreach(t => {
nameQuery.add(new FuzzyQuery(new Term("name", t),getFuzziness(t)), BooleanClause.Occur.SHOULD)
})
val addressQuery = new BooleanQuery.Builder()
addressTerms.foreach(t => {
addressQuery.add(new FuzzyQuery(new Term("address", t),getFuzziness(t)), BooleanClause.Occur.SHOULD)
})
nameQuery.setMinimumNumberShouldMatch(nameTerms.size / 2)
addressQuery.setMinimumNumberShouldMatch(addressTerms.size / 2)
booleanQuery.add(nameQuery.build(),BooleanClause.Occur.MUST)
booleanQuery.add(addressQuery.build(),BooleanClause.Occur.MUST)
}
val query: BooleanQuery = booleanQuery
.build()
logInfo(query.toString())
query
}
19/03/12 13:36:53 INFO Linker: +(name:llc~1) +((address:main~2 address:123~1 address:street~2)~1)
val linkedResults = LuceneRDD.blockEntityLinkage(B, A, linkerQuery , blockingFields, blockingFields,9999)
// returns 0 results
Normally, using Standard Analyzer at index time , I expect LLC and L.L.C to match. Is this incorrect?
My assumption is incorrect. The character "." is not part of the characters removed by the Standard Analyzer.
Query / index time analysis can be tricky.
I believe this issue can be considered done unless we find any other bug. #156 addresses the possibility of custom analyzers so it is probably better to move the conversation there.
As far as I know , queries use the analyzer defined by the type or a custom one at search time. Using a query string or an object should not make any difference in that (?)
It is currently available under
0.3.6-SNAPSHOT
. Feel free to test.One of my concerns is that since we specify the
Query
now, there might be a mismatched between query time analysis (tokenization for example) and index time analysis.
It seems that not all the examples allow a Query. For example this example does not seem to work with a Query type
def link[T1](other : org.apache.spark.rdd.RDD[T1], searchQueryGen : scala.Function1[T1, scala.Predef.String], topK : scala.Int = { /* compiled code */ }, linkerMethod : scala.Predef.String = { /* compiled code */ })(implicit evidence$4 : scala.reflect.ClassTag[T1]) : org.apache.spark.rdd.RDD[scala.Tuple2[T1, scala.Array[org.zouzias.spark.lucenerdd.models.SparkScoreDoc]]]
Many other types do not support query as well. For example :
def dedup[T1: ClassTag](searchQueryGen: T1 => String,
topK: Int = DefaultTopK,
linkerMethod: String = getLinkerMethod)
: RDD[(T1, Array[SparkScoreDoc])] = {
// FIXME: is this asInstanceOf necessary?
link[T1](this.asInstanceOf[RDD[T1]], searchQueryGen, topK, linkerMethod)
}
I am aware you specified which types are supported in your release , but I believe we should re-open it to track the work needed for other types.
Yes only the block linkage methods support Query type for now.
On Tue, 19 Mar 2019, 15:59 Yeikel, [email protected] wrote:
Many other types do not support query as well. For example :
def dedup[T1: ClassTag](searchQueryGen: T1 => String, topK: Int = DefaultTopK, linkerMethod: String = getLinkerMethod) : RDD[(T1, Array[SparkScoreDoc])] = { // FIXME: is this asInstanceOf necessary? link[T1](this.asInstanceOf[RDD[T1]], searchQueryGen, topK, linkerMethod) }
— You are receiving this because you were assigned. Reply to this email directly, view it on GitHub https://github.com/zouzias/spark-lucenerdd/issues/148#issuecomment-474408984, or mute the thread https://github.com/notifications/unsubscribe-auth/AByNKORkBl_G95aDQpCnhSQAW93rL2ikks5vYPs1gaJpZM4amwAZ .
Yes only the block linkage methods support Query type for now. … On Tue, 19 Mar 2019, 15:59 Yeikel, @.***> wrote: Many other types do not support query as well. For example : def dedup[T1: ClassTag](searchQueryGen: T1 => String, topK: Int = DefaultTopK, linkerMethod: String = getLinkerMethod) : RDD[(T1, Array[SparkScoreDoc])] = { // FIXME: is this asInstanceOf necessary? link[T1](this.asInstanceOf[RDD[T1]], searchQueryGen, topK, linkerMethod) } — You are receiving this because you were assigned. Reply to this email directly, view it on GitHub <#148 (comment)>, or mute the thread https://github.com/notifications/unsubscribe-auth/AByNKORkBl_G95aDQpCnhSQAW93rL2ikks5vYPs1gaJpZM4amwAZ .
Can I help to increase the support? Or do you have it planned? I'd like to test all the different options in this library using a query rather than a query string
I tried to enhance it but I am not sure why it works sometimes and why it is producing a serialization error for me . How were you able to serialize it?
I am currently trying to add it to luceneRDD.linkByQuery
and I am ruining the test class LuceneRDDRecordLinkageSpec.scala
Job aborted due to stage failure: Task 5.0 in stage 1.0 (TID 12) had a not serializable result: org.apache.lucene.search.TermQuery
Serialization stack:
- object not serializable (class: org.apache.lucene.search.TermQuery, value: _1:spa)
- field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
- object (class scala.Tuple2, (2,_1:spa))
- element of array (index: 0)
- array (class [Lscala.Tuple2;, size 1)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 5.0 in stage 1.0 (TID 12) had a not serializable result: org.apache.lucene.search.TermQuery
Serialization stack:
- object not serializable (class: org.apache.lucene.search.TermQuery, value: _1:spa)
- field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
- object (class scala.Tuple2, (2,_1:spa))
- element of array (index: 0)
- array (class [Lscala.Tuple2;, size 1)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
at org.zouzias.spark.lucenerdd.LuceneRDD.link(LuceneRDD.scala:225)
at org.zouzias.spark.lucenerdd.LuceneRDD.linkByQuery(LuceneRDD.scala:189)
at org.zouzias.spark.lucenerdd.LuceneRDDRecordLinkageSpec$$anonfun$1.apply(LuceneRDDRecordLinkageSpec.scala:58)
at org.zouzias.spark.lucenerdd.LuceneRDDRecordLinkageSpec$$anonfun$1.apply(LuceneRDDRecordLinkageSpec.scala:43)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.FlatSpecLike$$anon$1.apply(FlatSpecLike.scala:1682)
at org.scalatest.TestSuite$class.withFixture(TestSuite.scala:196)
at org.scalatest.FlatSpec.withFixture(FlatSpec.scala:1685)
at org.scalatest.FlatSpecLike$class.invokeWithFixture$1(FlatSpecLike.scala:1679)
at org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1692)
at org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1692)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:286)
at org.scalatest.FlatSpecLike$class.runTest(FlatSpecLike.scala:1692)
at org.zouzias.spark.lucenerdd.LuceneRDDRecordLinkageSpec.org$scalatest$BeforeAndAfterEach$$super$runTest(LuceneRDDRecordLinkageSpec.scala:29)
at org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:221)
at org.zouzias.spark.lucenerdd.LuceneRDDRecordLinkageSpec.runTest(LuceneRDDRecordLinkageSpec.scala:29)
at org.scalatest.FlatSpecLike$$anonfun$runTests$1.apply(FlatSpecLike.scala:1750)
at org.scalatest.FlatSpecLike$$anonfun$runTests$1.apply(FlatSpecLike.scala:1750)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:393)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:381)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:381)
at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:370)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:407)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:381)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:381)
at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:376)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:458)
at org.scalatest.FlatSpecLike$class.runTests(FlatSpecLike.scala:1750)
at org.scalatest.FlatSpec.runTests(FlatSpec.scala:1685)
at org.scalatest.Suite$class.run(Suite.scala:1124)
at org.scalatest.FlatSpec.org$scalatest$FlatSpecLike$$super$run(FlatSpec.scala:1685)
at org.scalatest.FlatSpecLike$$anonfun$run$1.apply(FlatSpecLike.scala:1795)
at org.scalatest.FlatSpecLike$$anonfun$run$1.apply(FlatSpecLike.scala:1795)
at org.scalatest.SuperEngine.runImpl(Engine.scala:518)
at org.scalatest.FlatSpecLike$class.run(FlatSpecLike.scala:1795)
at org.zouzias.spark.lucenerdd.LuceneRDDRecordLinkageSpec.org$scalatest$BeforeAndAfterAll$$super$run(LuceneRDDRecordLinkageSpec.scala:29)
at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
at org.zouzias.spark.lucenerdd.LuceneRDDRecordLinkageSpec.run(LuceneRDDRecordLinkageSpec.scala:29)
at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1349)
at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1343)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1343)
at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1012)
at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1011)
at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1509)
at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1011)
at org.scalatest.tools.Runner$.run(Runner.scala:850)
at org.scalatest.tools.Runner.run(Runner.scala)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:131)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)
So if linker.method = "collectbroadcast"
, I see this exception. When linker.method = "cartesian"
it works fine.
I still don't understand how it is not failing for the tests you wrote.
I added a little bit more of support in this commit https://github.com/yeikel/spark-lucenerdd/commit/3e408c7fbd884aaeaae14a9add0c4151dd904017.
Could you please review it?