spark-libFM
spark-libFM copied to clipboard
bad performance when faced to large data set
I have a large data set(2000w rows), which features are (user, topic)(5000 one-hot columns) And label is (0, 1) Logistic regression's AUC could easily reach 0.84, but FM's AUC is just around 0.5, or 0.46 perhaps.
The parameter I used (both SGD and LBFGS I've tried):
val model = FMWithSGD.train(training, task = 1, numIterations = 100, stepSize = 0.15,
miniBatchFraction = 1.0, dim = (true, true, 10), regParam = (0, 0, 0), initStd = 0.1)
And
val model = FMWithLBFGS.train(training, task = 1, numIterations = 20,
numCorrections = 5, dim = (true, true, 20), regParam = (0, 0, 0), initStd = 0.1);
Could you point out how can I get a better performance with FM?
@geekan I apologize for replying late. I have found that FM is sensitive to Batch-Size. Large Batch-Size usually result in bad convergence, while small one will hurt parallelism. I have try to use AdaGrad/AdaDelta/... instead of SGD, but dont fix this issue. LBFGS do not work well neither. I am still trying to fix this issue in another way.
多谢,用中文吧:那么如何解决这个问题呢?我感觉ZEN的FM收敛情况会更好一点。
好的。是的ZEN的收敛性更好,但是似乎要慢很多,如果我没记错,它的实现是基于Graphx的。 我现在尝试的方法是,先在小数据集上预先训练出一个差不多的模型,然后在全量数据上进行refine。
意思就是小数据集训练出初始model,再用较小的step迭代大模型吗? 这样是否可以完全避免无法收敛的问题呢?
是这样的。等我做了足够的实验,才能确定是否有效。
Hi RuiFeng, I notice that you said you have tried FM with Adagrad/AdaDelta, did you find them useful/? how much gain do they have comparing with SGD?
是这样的。等我做了足够的实验,才能确定是否有效。
我使用几条样本训练出来的结果也不好,都无法区分0/1: ` import org.apache.spark.ml.feature._ import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.linalg.DenseVector import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.ml.{Pipeline, PipelineStage} import org.apache.spark.sql.functions import com.zenmen.wkread.SparkFunSuite import org.apache.spark.mllib.regression.FMWithSGD import org.apache.spark.mllib.regression.LabeledPoint import org.scalatest.FunSuite
import scala.collection.mutable.ArrayBuffer
class FactorizationMachineCtrModelTest extends FunSuite with SparkFunSuite {
private val getProb1 = functions.udf((v: Vector) => v.toArray(1)) private val getKeys: Map[String, Double] => Seq[String] = { input: Map[String, Double] => input.keySet.toSeq } val keyUdf = functions.udf(getKeys)
test("testTrain") { val data0 = spark.createDataFrame(Seq( (Array("1"), 1.0, Map("a" -> 0.1, "b" -> 0.2, "c" -> 0.3), 1), (Array("2"), 10.0, Map("d" -> 0.1, "e" -> 0.2, "c" -> 0.3), 0), (Array("3"), 20.0, Map("x" -> 0.1, "a" -> 0.2, "b" -> 0.3), 0), (Array("4"), 15.0, Map("c" -> 0.1, "b" -> 0.2, "w" -> 0.3), 0), (Array("5"), 18.0, Map("c" -> 0.1, "b" -> 0.2, "w" -> 0.3), 0), (Array("6"), 25.0, Map("c" -> 0.1, "b" -> 0.2, "w" -> 0.3), 1), (Array("6"), 5.0, Map("a" -> 0.1, "b" -> 0.2, "d" -> 0.3), 0), (Array("7"), 30.0, Map("c" -> 0.1, "b" -> 0.2, "w" -> 0.3), 0)) ) .toDF("book_id", "pv", "myInputCol0", "label")
val data = data0.withColumn("myInputCol", keyUdf(functions.col("myInputCol0")))
.drop("myInputCol0")
data.show(10)
data.printSchema()
val pipelineStage = new ArrayBuffer[PipelineStage]()
val bookFiter = new CountVectorizer()
.setInputCol("book_id")
.setOutputCol("book_id_vec")
.setMinDF(1)
.setMinTF(1)
.setBinary(true)
pipelineStage += bookFiter
val doubleDiscretizer = new QuantileDiscretizer()
.setInputCol("pv")
.setOutputCol("pv_bucket")
.setNumBuckets(3)
pipelineStage += doubleDiscretizer
val buckCountVec = new OneHotEncoder()
.setInputCol("pv_bucket")
.setOutputCol("pv_bucket_vec")
pipelineStage += buckCountVec
val myFiter = new CountVectorizer()
.setInputCol("myInputCol")
.setOutputCol("myInputCol1_vec")
.setMinDF(1)
.setMinTF(1)
.setBinary(true)
pipelineStage += myFiter
val vectorAsCols = Array("pv_bucket_vec", "book_id_vec", "myInputCol1_vec")
val vectorAssembler = new VectorAssembler().setInputCols(vectorAsCols).setOutputCol("vectorFeature")
pipelineStage += vectorAssembler
val scaler = new Normalizer()
.setInputCol("vectorFeature")
.setOutputCol("scaledFeatures")
pipelineStage += scaler
val featurePipeline = new Pipeline().setStages(pipelineStage.toArray)
val pipeLineModel = featurePipeline.fit(data)
val trainData = pipeLineModel.transform(data)
val formatSamples = trainData.rdd.map(row => {
new LabeledPoint(row.getAs[Int]("label").toDouble, Vectors.fromML(row.getAs[DenseVector]("scaledFeatures")))
})
val fmModel = FMWithSGD
.train(formatSamples,
task = 1,
numIterations = 20,
stepSize = 0.1,
miniBatchFraction = 0.5,
dim = (true, true, 4),
regParam = (0, 0, 0),
initStd = 0.1)
// val predict = fmModel.predict(formatSamples.map(.features)).zip(formatSamples.map(.label)) // predict.take(10).foreach(println) // val predDf = fmModel.predict(trainData) val dataFinal = predDf.select( Seq(functions.concat_ws(",", functions.col("book_id")), functions.col("pv"), functions.concat_ws(",", functions.col("myInputCol")), getProb1(functions.col("probability")).alias("prob1")): _*) dataFinal.show()
} } `
any update here?