Model must be trained before starting prediction
My code is as follows, mostly just like the code given in the Usage of README.md. I find there is a if to justify whether the model is empty in LatentMatrixFactorization.scala. Currently I don't know to run this prediction since it seems hard to make the training and prediction in sequential.
val ratings: DStream[Rating[Long]] =
inputFile.map(line => Rating(line.split(",")(0).toInt,
line.split(",")(1).toInt,
line.split(",")(2).toFloat)) // Your input stream of Ratings
val algorithm = new StreamingLatentMatrixFactorization(streamingMFParams)
algorithm.trainOn(ratings)
val pInputFile = ssc.socketTextStream("10.2.3.7", 10088)
val testStream: DStream[(Long, Long)] = // stream of (user, product) pairs to predict on
pInputFile.map(line => (line.split(",")(0).toInt, line.split(",")(1).toInt))
val predictions: DStream[Rating[Long]] = algorithm.predictOn(testStream)
predictions.print()
ssc.start()
ssc.awaitTermination()
@chapter09 yes, I have the same issue.
there is a race condition where the model needs to be trained before we can perform our first prediction, so the predict path fails and the job dies right away.
and unfortunately, there doesn't seem to be a Model.save() or load() functionality, so, for now, we need the training and predicting to live in the same job.
@brkyvz are we missing something? can you advise on the best way to productionize and share the model between the training path and the prediction path?
are you thinking we should use something like Tachyon (trying to avoid this for now)?
or perhaps call native java model.serialize() to share the model between the 2 different jobs/runtimes? i've seen some folks using native java serialization when faced with a model that doesn't currently support save()/load().
note: we can address the long-running OOM concern separately per the discussion here: https://github.com/brkyvz/streaming-matrix-factorization/issues/1
for now, i just want a short-lived demo of incremental matrix factorization.
happy to submit a pull request, but would first like to discuss the design you had in mind.
thanks!!
also, does the prediction path require pulling in all of spark? or can we deploy it on a simple tomcat server, for example?
glancing at the code, this doesn't look possible as the predict() method uses RDDs, SparkContext, Broadcast variables, etc.
is the idea to put the prediction path behind a Spark streaming app that puts the prediction back on a kafka queue (for example) that the prediction-caller listens on?
i realize that the prediction layer is a large effort that is actively being worked on within the Spark ML community, so i assume this is the best we can do for now?
A lack of save / load here is what might make me stick to vanilla MLlib.
@fosskers : save/load is absolutely critical and will be built very soon. again, i need this for an conference at the end of next month, so it will be done before then, for sure.
in the meantime, we should be able to use Kryo - or even Java Serialization - to save/load the model. kinda hacky, but should unblock us.
we're looking to do the exact same thing, it seems. let's keep in touch on this.
Yes please, I'm looking to use this for production. That said, @brkyvz said in the other thread that he doesn't consider the streaming functionality production-ready?
@fosskers yup, that's where we come in. :)
Any news on this?
taking this offline with Colin. will update soon.
@fosskers
here's are links to a more-production ready version of this repo per our recent convo...
refactored and updated code from this repo: https://github.com/fluxcapacitor/pipeline/tree/master/myapps/spark/streaming/src/main/scala/com/advancedspark/streaming/ml/incremental
Use of the above code inside a Spark Streaming job pulling from Kafka:
https://github.com/fluxcapacitor/pipeline/blob/master/myapps/spark/streaming/src/main/scala/com/advancedspark/streaming/rating/ml/TrainMFIncremental.scala
might want to close this and watch future development at the new repo @ https://github.com/fluxcapacitor/pipeline
Thanks @cfregly , I'll pass this on to my old coworkers.