mleap icon indicating copy to clipboard operation
mleap copied to clipboard

pyspark "object has no attribute '_to_java'"

Open guzzijones opened this issue 5 years ago • 4 comments

I have a custom transformer that I have created according to this example: https://stackoverflow.com/questions/32331848/create-a-custom-transformer-in-pyspark-ml

I can use the model.save and my pipeline will save. Using mleap I get no attribute to_java

Here is the transformer:

from pyspark import keyword_only  ## < 2.0 -> pyspark.ml.util.keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters
# Available in PySpark >= 2.3.0 
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable  
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
from selectolax.parser import HTMLParser
class CleanHtml(
        Transformer, HasInputCol, HasOutputCol,
        # Credits https://stackoverflow.com/a/52467470
        # by https://stackoverflow.com/users/234944/benjamin-manns
        DefaultParamsReadable, DefaultParamsWritable):
        #https://www.google.com/search?q=spark+udf+transformer&oq=spark+udf+transformer&aqs=chrome..69i57.4514j1j7&sourceid=chrome&ie=UTF-8
    #stopwords = Param(Params._dummy(), "stopwords", "stopwords",
    #                 typeConverter=TypeConverters.toListString)


    @keyword_only
    def __init__(self, inputCol=None, outputCol=None):
        super(CleanHtml, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)


    # Required in Spark >= 3.0
    def setInputCol(self, value):
        """
        Sets the value of :py:attr:`inputCol`.
        """
        return self._set(inputCol=value)

    # Required in Spark >= 3.0
    def setOutputCol(self, value):
        """
        Sets the value of :py:attr:`outputCol`.
        """
        return self._set(outputCol=value)

    def _transform(self, dataset):
        #todo(aj) hashes function
        def clean_hashes(raw):
            clean_nonwords = re.compile(r'\S*[^a-zA-Z\s\-\"\']\S*')
            cleantext = re.sub(clean_nonwords,'',raw)
            return cleantext
            
        def f(raw):
            
            tree = HTMLParser(raw)

            if tree.body is None:
                return raw

            for tag in tree.css('script'):
                tag.decompose()
            for tag in tree.css('style'):
                tag.decompose()

            text = tree.body.text(separator='\n')
            text = re.sub(r'\n\s*', "\n",text)
            text = clean_hashes(text)
            print("text: " + text)
            return text.strip().strip("\n")

        t = StringType()
        out_col = self.getOutputCol()
        in_col = dataset[self.getInputCol()]
        return dataset.withColumn(out_col, udf(f, t)(in_col))

here is my pipeline:

import re
from pyspark.ml.feature import Tokenizer,StopWordsRemover, CountVectorizer,IDF,StringIndexer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

#apply extract body remove html
#https://stackoverflow.com/questions/32331848/create-a-custom-transformer-in-pyspark-ml
from pyspark.ml.classification import LogisticRegression

cleanhtml = CleanHtml(inputCol="text", outputCol="clean_text")
tokenizer = Tokenizer(inputCol="clean_text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
count_vec = CountVectorizer(inputCol='stop_tokens',outputCol='c_vec') #TF
idf = IDF(inputCol="c_vec", outputCol="tf_idf") #IDF Scaler
lr = LogisticRegression(regParam=0.01,maxIter=20,featuresCol='features', labelCol='target_int')
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector
clean_up = VectorAssembler(inputCols=['tf_idf'],outputCol='features')
pipeline = Pipeline(stages=[cleanhtml,tokenizer,stopremove,count_vec,idf,clean_up,lr])
from pyspark.ml.evaluation import BinaryClassificationEvaluator,MulticlassClassificationEvaluator
paramGrid = ParamGridBuilder()\
    .addGrid(lr.regParam, [0.1, 0.001])\
    .build()
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(labelCol="target_int",metricName='f1'),
                          numFolds=2)
mdl = crossval.fit(joined)

Any help to get the transformer to serialize is appreciated.

guzzijones avatar Feb 07 '20 17:02 guzzijones

I just tried this transformer but I get the dreaded JavaObject not callable * when i run the pipeline code. How do I find out what jar package I am missing now?

%pyspark
from pyspark import keyword_only  ## < 2.0 -> pyspark.ml.util.keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters
# Available in PySpark >= 2.3.0 
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable  
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
from selectolax.parser import HTMLParser

from pyspark.ml.util import JavaMLReadable, JavaMLWritable
from pyspark.ml.wrapper import JavaModel, JavaEstimator, JavaTransformer
from pyspark.ml.util import _jvm

class CleanHtml(
        JavaTransformer, HasInputCol, HasOutputCol,
        # Credits https://stackoverflow.com/a/52467470
        # by https://stackoverflow.com/users/234944/benjamin-manns
        JavaMLReadable,
                  JavaMLWritable,
        ):
    

    @keyword_only
    def __init__(self, inputCol=None, outputCol=None):
        super(CleanHtml, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)
        super(CleanHtml, self).__init__()
        
        map_model = self._new_java_obj("ml.combust.mleap.core.feature.CleanHtmlModel")
        self._java_obj = self._new_java_obj("org.apache.spark.ml.mleap.feature.BleanHtml", self.uid, map_model)
        self.setInputCol(inputCol)
        self.setOutputCol(outputCol)
        #https://www.google.com/sea

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)


    # Required in Spark >= 3.0
    def setInputCol(self, value):
        """
        Sets the value of :py:attr:`inputCol`.
        """
        return self._set(inputCol=value)

    # Required in Spark >= 3.0
    def setOutputCol(self, value):
        """
        Sets the value of :py:attr:`outputCol`.
        """
        return self._set(outputCol=value)

    def _transform(self, dataset):
        #todo(aj) hashes function
        def clean_hashes(raw):
            clean_nonwords = re.compile(r'\S*[^a-zA-Z\s\-\"\']\S*')
            cleantext = re.sub(clean_nonwords,'',raw)
            return cleantext
            
        def f(raw):
            
            tree = HTMLParser(raw)

            if tree.body is None:
                return raw

            for tag in tree.css('script'):
                tag.decompose()
            for tag in tree.css('style'):
                tag.decompose()

            text = tree.body.text(separator='\n')
            text = re.sub(r'\n\s*', "\n",text)
            text = clean_hashes(text)
            print("text: " + text)
            return text.strip().strip("\n")

        t = StringType()
        out_col = self.getOutputCol()
        in_col = dataset[self.getInputCol()]
        return dataset.withColumn(out_col, udf(f, t)(in_col))

guzzijones avatar Feb 07 '20 18:02 guzzijones

Here is the stacktrace:

Fail to execute line 9: cleanhtml = CleanHtml(inputCol="text", outputCol="clean_text")
Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-3059625262437356812.py", line 375, in <module>
    exec(code, _zcUserQueryNameSpace)
  File "<stdin>", line 9, in <module>
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/__init__.py", line 110, in wrapper
    return func(self, **kwargs)
  File "<stdin>", line 30, in __init__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/ml/wrapper.py", line 67, in _new_java_obj
    return java_obj(*java_args)
TypeError: 'JavaPackage' object is not callable

guzzijones avatar Feb 07 '20 19:02 guzzijones

@guzzijones I have been working on this for a while, it's unfortunately much more difficult to get this working in pyspark, because you need to either:

  1. Implement a java/scala transformer classes that your object can reference as it's java object (like this

  2. Add an attribute definition to your class called _to_java and implement that (I think this is probably the better path, however, I do not have a good method of doing so. A custom example is here

Any help would be greatly appreciated if anyone has done this!

Ben-Epstein avatar Mar 16 '20 19:03 Ben-Epstein

@guzzijones @Ben-Epstein We have two custom transformers now available in pyspark based on the work from @juhoautio and @lucagiovagnoli, the code changes are here https://github.com/combust/mleap/pull/666, perhaps you can take a look at how these two are implemented? Thank you!

ancasarb avatar May 01 '20 21:05 ancasarb