TransmogrifAI icon indicating copy to clipboard operation
TransmogrifAI copied to clipboard

FeatureBuilder.fromDataFrame should automatically infer advanced types

Open tovbinm opened this issue 5 years ago • 9 comments

Problem Currently FeatureBuilder.fromDataFrame only infers a set of primitive TransmogrifAI types directly mapped from dataframe schema, such as Text, Real etc. But more advanced types, such as PickList, Email, Phone etc. are not inferred, because it requires some knowledge of actual data in the dataframe.

Solution Estimate value distribution for each feature and deduct a more appropriate column type. Potentially reuse logic from CSVSchemaUtils.infer, SmartTextVectorizer and RawFeatureFilter.

Alternatives N/A

tovbinm avatar Aug 17 '18 04:08 tovbinm

This need not be automatic. Could we pass in a schema with more advanced types?

snabar avatar Sep 14 '18 14:09 snabar

Just to say that this feature is very important to lower the barrier of automl usage

albertodema avatar Nov 11 '18 16:11 albertodema

@snabar Do you want to dynamically specify specific feature types? You might try to dynamically convert the type you want using the Row subscript, but there are problems with saving the model. my test sample code:

def main(args: Array[String]): Unit = {
        if (args == null && args.length != 5){
                throw new Exception("four parameters are required Usage <database> <table> <id> <label> <index>")
        }
        val Array(database,table,label,id,pickListIndex) = args//the index of Row which you want to convert to PickList feature
        println(s"args = ${args.mkString(",")}")

        val conf = new SparkConf()
        conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
        implicit val session = SparkSession.builder().appName(s"${this.getClass.getSimpleName}")
                .config(conf).enableHiveSupport().master("local[*]")
                .getOrCreate()
        val frame = session.sql(s"select * from ${database}.${table}").withColumn(label,col(label).cast(DoubleType))
        frame.show(false)

        val (response,feature) = FeatureBuilder.fromDataFrame[RealNN](frame,label)
        println(s"response = ${response}")
        
        println("============== OP Features ==============")
        val realOpFeatures = ArrayBuffer[Feature[_ <: FeatureType]]()
        for (i <- 0 until(feature.size)){
                if (feature.apply(i).name != id){
                        if (i == pickListIndex.toInt){
                                val pickListFeature = FeatureBuilder.PickList[Row](feature.apply(i).name).extract(_.get(pickListIndex.toInt).toString.toPickList).asPredictor
                                realOpFeatures.append(pickListFeature)
                        }else{
                                realOpFeatures.append(feature.apply(i))
                        }
                }
        }
        realOpFeatures.foreach(println(_))
        //Auto feature engin
        val transmogrifyFeature = realOpFeatures.transmogrify()
        //Optionally check the features with a sanity checker
        val checkedFeature = response.sanityCheck(transmogrifyFeature,removeBadFeatures = true)
        //Define the model we want to use
        val prediction = BinaryClassificationModelSelector.withTrainValidationSplit(
            modelTypesToUse = Seq(OpLogisticRegression)
        ).setInput(response, checkedFeature).getOutput()
        //model evaluator
        val evaluator = Evaluators.BinaryClassification().setLabelCol(label).setPredictionCol(prediction)
        val workflow = new OpWorkflow().setInputDataset(frame,(row: Row)=>row.get(0).toString).setResultFeatures(prediction)

        val model = workflow.train()

        println(s"Model Summary:\n ${model.summaryPretty()}")
        val (scores, metrics) = model.scoreAndEvaluate(evaluator = evaluator)
        println(s"Metrics:\n ${metrics}")
        model.save("E:\\models\\model_01",true)
        session.stop()
    }

The following exception is thrown when saving the model:

Exception in thread "main" java.lang.RuntimeException: Failed to write out stage 'FeatureGeneratorStage_00000000000d'
	at com.salesforce.op.stages.OpPipelineStageWriter.writeToJson(OpPipelineStageWriter.scala:81)
	at com.salesforce.op.OpWorkflowModelWriter$$anonfun$3.apply(OpWorkflowModelWriter.scala:131)
	at com.salesforce.op.OpWorkflowModelWriter$$anonfun$3.apply(OpWorkflowModelWriter.scala:131)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
	at com.salesforce.op.OpWorkflowModelWriter.stagesJArray(OpWorkflowModelWriter.scala:131)
	at com.salesforce.op.OpWorkflowModelWriter.stagesJArray(OpWorkflowModelWriter.scala:108)
	at com.salesforce.op.OpWorkflowModelWriter.toJson(OpWorkflowModelWriter.scala:83)
	at com.salesforce.op.OpWorkflowModelWriter.toJsonString(OpWorkflowModelWriter.scala:68)
	at com.salesforce.op.OpWorkflowModelWriter.saveImpl(OpWorkflowModelWriter.scala:58)
	at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:103)
	at com.salesforce.op.OpWorkflowModelWriter$.save(OpWorkflowModelWriter.scala:193)
	at com.salesforce.op.OpWorkflowModel.save(OpWorkflowModel.scala:221)
	at TestApp$.main(TestApp.scala:71)
	at TestApp.main(TestApp.scala)
Caused by: java.lang.RuntimeException: Argument 'extractFn' [TestApp$$anonfun$main$1$$anonfun$2] cannot be serialized. Make sure TestApp$$anonfun$main$1$$anonfun$2 has either no-args ctor or is an object, and does not have any external dependencies, e.g. use any out of scope variables.
	at com.salesforce.op.stages.OpPipelineStageSerializationFuns$class.serializeArgument(OpPipelineStageReaderWriter.scala:234)
	at com.salesforce.op.stages.DefaultValueReaderWriter.serializeArgument(DefaultValueReaderWriter.scala:48)
	at com.salesforce.op.stages.DefaultValueReaderWriter$$anonfun$write$1.apply(DefaultValueReaderWriter.scala:70)
	at com.salesforce.op.stages.DefaultValueReaderWriter$$anonfun$write$1.apply(DefaultValueReaderWriter.scala:69)
	at scala.util.Try$.apply(Try.scala:192)
	at com.salesforce.op.stages.DefaultValueReaderWriter.write(DefaultValueReaderWriter.scala:69)
	at com.salesforce.op.stages.FeatureGeneratorStageReaderWriter.write(FeatureGeneratorStage.scala:189)
	at com.salesforce.op.stages.FeatureGeneratorStageReaderWriter.write(FeatureGeneratorStage.scala:129)
	at com.salesforce.op.stages.OpPipelineStageWriter.writeToJson(OpPipelineStageWriter.scala:80)
	... 18 more
Caused by: java.lang.RuntimeException: Failed to create an instance of class 'TestApp$$anonfun$main$1$$anonfun$2'. Class has to either have a no-args ctor or be an object.
	at com.salesforce.op.utils.reflection.ReflectionUtils$.newInstance(ReflectionUtils.scala:106)
	at com.salesforce.op.utils.reflection.ReflectionUtils$.newInstance(ReflectionUtils.scala:87)
	at com.salesforce.op.stages.OpPipelineStageSerializationFuns$class.serializeArgument(OpPipelineStageReaderWriter.scala:231)
	... 26 more
Caused by: java.lang.NoSuchFieldException: MODULE$
	at java.lang.Class.getField(Class.java:1703)
	at com.salesforce.op.utils.reflection.ReflectionUtils$.newInstance(ReflectionUtils.scala:102)
	... 28 more

@tovbinm What's wrong with me writing this, and why is this exception thrown?

shenzgang avatar Sep 27 '19 09:09 shenzgang

It happens due to the way we serialize stages with our models. We do not allow serializing any lambda expressions such as .extract(_.get(pickListIndex.toInt).toString.toPickList), instead one has to define it as an explicit concrete class.

I recommend to rewrite your code as follows:

val (response,features) = FeatureBuilder.fromDataFrame[RealNN](frame, label)
println(s"response = ${response}")

val realOpFeatures = features.zipWithIndex.map { 
    case (feature, i) if i == id => feature 
    case (feature, i) if feature.isSutypeOf[isSubtypeOf] =>
        feature.asInstanceOf[FeatureLike[Text]].map(f => new ToPickListTransformer(i).setInput(f).getOutput())
}

While ToPickListTransformer is top-level concrete class defined as follows:

class ToPickListTransformer(pickListIndex: Int, uid: String = UID[ToPickListTransformer]) extends
  UnaryTransformer[Text, PickList](operationName = "toPickList", uid = uid) {
  def transformFn: Text=> PickList= (v: Text) => v.value.toPickList
}

tovbinm avatar Sep 27 '19 14:09 tovbinm

@tovbinm There may be some problems here: uid: String = UID[PickListTransformer]) Error message: Cannot resolve symbol PickListTransformer and val realOpFeatures = features.zipWithIndex.map { case (feature, i) if i == id => feature case (feature, i) => feature.map(new ToPickListTransformer(i))//compile error } Type mismatch, expected: FeatureType => NotInferedB, actual: ToPickListTransformer

shenzgang avatar Oct 17 '19 04:10 shenzgang

see corrected version above. thanks for pointing out the error.

tovbinm avatar Oct 17 '19 04:10 tovbinm

@tovbinm,Thank you for your reply!but Maybe there are still some problems!

at ToPickListTransformer class
Error:(15, 9) type arguments [org.apache.spark.sql.Row,com.salesforce.op.features.types.PickList] do not conform to class UnaryTransformer's type parameter bounds [I <: com.salesforce.op.features.types.FeatureType,O <: com.salesforce.op.features.types.FeatureType]
        UnaryTransformer[Row, PickList](operationName = "toPickList", uid = uid) {
And
case (feature, i) => feature.map(f=> new ToPickListTransformer(i).setInput(f).getOutput())//complie error!

shenzgang avatar Oct 17 '19 07:10 shenzgang

I corrected the above snippet. But it would be helpful to know what is the underlying schema in your dataset. Can you please paste the output of frame.schema.printTreeString()?

tovbinm avatar Oct 18 '19 20:10 tovbinm

Trying to think if there is something to reuse from frameless :thinking:

https://typelevel.org/frameless/TypedEncoder.html https://github.com/typelevel/frameless/tree/master/dataset/src/test/scala/frameless

SemanticBeeng avatar Sep 11 '20 12:09 SemanticBeeng