spark-libFM icon indicating copy to clipboard operation
spark-libFM copied to clipboard

bad performance when faced to large data set

Open geekan opened this issue 9 years ago • 8 comments

I have a large data set(2000w rows), which features are (user, topic)(5000 one-hot columns) And label is (0, 1) Logistic regression's AUC could easily reach 0.84, but FM's AUC is just around 0.5, or 0.46 perhaps.

The parameter I used (both SGD and LBFGS I've tried):

        val model = FMWithSGD.train(training, task = 1, numIterations = 100, stepSize = 0.15, 
                miniBatchFraction = 1.0, dim = (true, true, 10), regParam = (0, 0, 0), initStd = 0.1)

And

       val model = FMWithLBFGS.train(training, task = 1, numIterations = 20, 
                numCorrections = 5, dim = (true, true, 20), regParam = (0, 0, 0), initStd = 0.1);

Could you point out how can I get a better performance with FM?

geekan avatar Nov 27 '15 02:11 geekan

@geekan I apologize for replying late. I have found that FM is sensitive to Batch-Size. Large Batch-Size usually result in bad convergence, while small one will hurt parallelism. I have try to use AdaGrad/AdaDelta/... instead of SGD, but dont fix this issue. LBFGS do not work well neither. I am still trying to fix this issue in another way.

zhengruifeng avatar Apr 14 '16 12:04 zhengruifeng

多谢,用中文吧:那么如何解决这个问题呢?我感觉ZEN的FM收敛情况会更好一点。

geekan avatar Apr 14 '16 12:04 geekan

好的。是的ZEN的收敛性更好,但是似乎要慢很多,如果我没记错,它的实现是基于Graphx的。 我现在尝试的方法是,先在小数据集上预先训练出一个差不多的模型,然后在全量数据上进行refine。

zhengruifeng avatar Apr 14 '16 12:04 zhengruifeng

意思就是小数据集训练出初始model,再用较小的step迭代大模型吗? 这样是否可以完全避免无法收敛的问题呢?

geekan avatar Apr 14 '16 14:04 geekan

是这样的。等我做了足够的实验,才能确定是否有效。

zhengruifeng avatar Apr 15 '16 01:04 zhengruifeng

Hi RuiFeng, I notice that you said you have tried FM with Adagrad/AdaDelta, did you find them useful/? how much gain do they have comparing with SGD?

VinceShieh avatar Nov 19 '16 03:11 VinceShieh

是这样的。等我做了足够的实验,才能确定是否有效。

我使用几条样本训练出来的结果也不好,都无法区分0/1: ` import org.apache.spark.ml.feature._ import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.linalg.DenseVector import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.ml.{Pipeline, PipelineStage} import org.apache.spark.sql.functions import com.zenmen.wkread.SparkFunSuite import org.apache.spark.mllib.regression.FMWithSGD import org.apache.spark.mllib.regression.LabeledPoint import org.scalatest.FunSuite

import scala.collection.mutable.ArrayBuffer

class FactorizationMachineCtrModelTest extends FunSuite with SparkFunSuite {

private val getProb1 = functions.udf((v: Vector) => v.toArray(1)) private val getKeys: Map[String, Double] => Seq[String] = { input: Map[String, Double] => input.keySet.toSeq } val keyUdf = functions.udf(getKeys)

test("testTrain") { val data0 = spark.createDataFrame(Seq( (Array("1"), 1.0, Map("a" -> 0.1, "b" -> 0.2, "c" -> 0.3), 1), (Array("2"), 10.0, Map("d" -> 0.1, "e" -> 0.2, "c" -> 0.3), 0), (Array("3"), 20.0, Map("x" -> 0.1, "a" -> 0.2, "b" -> 0.3), 0), (Array("4"), 15.0, Map("c" -> 0.1, "b" -> 0.2, "w" -> 0.3), 0), (Array("5"), 18.0, Map("c" -> 0.1, "b" -> 0.2, "w" -> 0.3), 0), (Array("6"), 25.0, Map("c" -> 0.1, "b" -> 0.2, "w" -> 0.3), 1), (Array("6"), 5.0, Map("a" -> 0.1, "b" -> 0.2, "d" -> 0.3), 0), (Array("7"), 30.0, Map("c" -> 0.1, "b" -> 0.2, "w" -> 0.3), 0)) ) .toDF("book_id", "pv", "myInputCol0", "label")

val data = data0.withColumn("myInputCol", keyUdf(functions.col("myInputCol0")))
  .drop("myInputCol0")

data.show(10)
data.printSchema()

val pipelineStage = new ArrayBuffer[PipelineStage]()

val bookFiter = new CountVectorizer()
  .setInputCol("book_id")
  .setOutputCol("book_id_vec")
  .setMinDF(1)
  .setMinTF(1)
  .setBinary(true)
pipelineStage += bookFiter

val doubleDiscretizer = new QuantileDiscretizer()
  .setInputCol("pv")
  .setOutputCol("pv_bucket")
  .setNumBuckets(3)
pipelineStage += doubleDiscretizer

val buckCountVec = new OneHotEncoder()
  .setInputCol("pv_bucket")
  .setOutputCol("pv_bucket_vec")
pipelineStage += buckCountVec


val myFiter = new CountVectorizer()
  .setInputCol("myInputCol")
  .setOutputCol("myInputCol1_vec")
  .setMinDF(1)
  .setMinTF(1)
  .setBinary(true)
pipelineStage += myFiter


val vectorAsCols = Array("pv_bucket_vec", "book_id_vec", "myInputCol1_vec")

val vectorAssembler = new VectorAssembler().setInputCols(vectorAsCols).setOutputCol("vectorFeature")
pipelineStage += vectorAssembler

val scaler = new Normalizer()
  .setInputCol("vectorFeature")
  .setOutputCol("scaledFeatures")
pipelineStage += scaler

val featurePipeline = new Pipeline().setStages(pipelineStage.toArray)
val pipeLineModel = featurePipeline.fit(data)
val trainData = pipeLineModel.transform(data)

val formatSamples = trainData.rdd.map(row => {
  new LabeledPoint(row.getAs[Int]("label").toDouble, Vectors.fromML(row.getAs[DenseVector]("scaledFeatures")))
})

val fmModel = FMWithSGD
  .train(formatSamples,
    task = 1,
    numIterations = 20,
    stepSize = 0.1,
    miniBatchFraction = 0.5,
    dim = (true, true, 4),
    regParam = (0, 0, 0),
    initStd = 0.1)

// val predict = fmModel.predict(formatSamples.map(.features)).zip(formatSamples.map(.label)) // predict.take(10).foreach(println) // val predDf = fmModel.predict(trainData) val dataFinal = predDf.select( Seq(functions.concat_ws(",", functions.col("book_id")), functions.col("pv"), functions.concat_ws(",", functions.col("myInputCol")), getProb1(functions.col("probability")).alias("prob1")): _*) dataFinal.show()

} } `

marvinxu-free avatar Feb 03 '20 09:02 marvinxu-free

any update here?

jiqiujia avatar May 02 '20 09:05 jiqiujia