streaming-matrix-factorization icon indicating copy to clipboard operation
streaming-matrix-factorization copied to clipboard

Model must be trained before starting prediction

Open chapter09 opened this issue 9 years ago • 10 comments

My code is as follows, mostly just like the code given in the Usage of README.md. I find there is a if to justify whether the model is empty in LatentMatrixFactorization.scala. Currently I don't know to run this prediction since it seems hard to make the training and prediction in sequential.

val ratings: DStream[Rating[Long]] =
  inputFile.map(line => Rating(line.split(",")(0).toInt,
                               line.split(",")(1).toInt,
                               line.split(",")(2).toFloat)) // Your input stream of Ratings

val algorithm = new StreamingLatentMatrixFactorization(streamingMFParams)
algorithm.trainOn(ratings)

val pInputFile = ssc.socketTextStream("10.2.3.7", 10088)

val testStream: DStream[(Long, Long)] =  // stream of (user, product) pairs to predict on
 pInputFile.map(line => (line.split(",")(0).toInt, line.split(",")(1).toInt))

val predictions: DStream[Rating[Long]] = algorithm.predictOn(testStream)
predictions.print()
ssc.start()
ssc.awaitTermination()

chapter09 avatar Jan 13 '16 06:01 chapter09

@chapter09 yes, I have the same issue.

there is a race condition where the model needs to be trained before we can perform our first prediction, so the predict path fails and the job dies right away.

and unfortunately, there doesn't seem to be a Model.save() or load() functionality, so, for now, we need the training and predicting to live in the same job.

@brkyvz are we missing something? can you advise on the best way to productionize and share the model between the training path and the prediction path?

are you thinking we should use something like Tachyon (trying to avoid this for now)?

or perhaps call native java model.serialize() to share the model between the 2 different jobs/runtimes? i've seen some folks using native java serialization when faced with a model that doesn't currently support save()/load().

note: we can address the long-running OOM concern separately per the discussion here: https://github.com/brkyvz/streaming-matrix-factorization/issues/1

for now, i just want a short-lived demo of incremental matrix factorization.

happy to submit a pull request, but would first like to discuss the design you had in mind.

thanks!!

cfregly avatar Jan 16 '16 22:01 cfregly

also, does the prediction path require pulling in all of spark? or can we deploy it on a simple tomcat server, for example?

glancing at the code, this doesn't look possible as the predict() method uses RDDs, SparkContext, Broadcast variables, etc.

is the idea to put the prediction path behind a Spark streaming app that puts the prediction back on a kafka queue (for example) that the prediction-caller listens on?

i realize that the prediction layer is a large effort that is actively being worked on within the Spark ML community, so i assume this is the best we can do for now?

cfregly avatar Jan 16 '16 22:01 cfregly

A lack of save / load here is what might make me stick to vanilla MLlib.

fosskers avatar Mar 11 '16 00:03 fosskers

@fosskers : save/load is absolutely critical and will be built very soon. again, i need this for an conference at the end of next month, so it will be done before then, for sure.

in the meantime, we should be able to use Kryo - or even Java Serialization - to save/load the model. kinda hacky, but should unblock us.

we're looking to do the exact same thing, it seems. let's keep in touch on this.

cfregly avatar Mar 11 '16 00:03 cfregly

Yes please, I'm looking to use this for production. That said, @brkyvz said in the other thread that he doesn't consider the streaming functionality production-ready?

fosskers avatar Mar 11 '16 00:03 fosskers

@fosskers yup, that's where we come in. :)

cfregly avatar Mar 11 '16 00:03 cfregly

Any news on this?

fosskers avatar Apr 06 '16 16:04 fosskers

taking this offline with Colin. will update soon.

cfregly avatar Apr 06 '16 18:04 cfregly

@fosskers

here's are links to a more-production ready version of this repo per our recent convo...

refactored and updated code from this repo: https://github.com/fluxcapacitor/pipeline/tree/master/myapps/spark/streaming/src/main/scala/com/advancedspark/streaming/ml/incremental

Use of the above code inside a Spark Streaming job pulling from Kafka:

https://github.com/fluxcapacitor/pipeline/blob/master/myapps/spark/streaming/src/main/scala/com/advancedspark/streaming/rating/ml/TrainMFIncremental.scala

might want to close this and watch future development at the new repo @ https://github.com/fluxcapacitor/pipeline

cfregly avatar May 24 '16 04:05 cfregly

Thanks @cfregly , I'll pass this on to my old coworkers.

fosskers avatar May 24 '16 22:05 fosskers