pyspark "object has no attribute '_to_java'"
I have a custom transformer that I have created according to this example: https://stackoverflow.com/questions/32331848/create-a-custom-transformer-in-pyspark-ml
I can use the model.save and my pipeline will save.
Using mleap I get no attribute to_java
Here is the transformer:
from pyspark import keyword_only ## < 2.0 -> pyspark.ml.util.keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters
# Available in PySpark >= 2.3.0
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
from selectolax.parser import HTMLParser
class CleanHtml(
Transformer, HasInputCol, HasOutputCol,
# Credits https://stackoverflow.com/a/52467470
# by https://stackoverflow.com/users/234944/benjamin-manns
DefaultParamsReadable, DefaultParamsWritable):
#https://www.google.com/search?q=spark+udf+transformer&oq=spark+udf+transformer&aqs=chrome..69i57.4514j1j7&sourceid=chrome&ie=UTF-8
#stopwords = Param(Params._dummy(), "stopwords", "stopwords",
# typeConverter=TypeConverters.toListString)
@keyword_only
def __init__(self, inputCol=None, outputCol=None):
super(CleanHtml, self).__init__()
kwargs = self._input_kwargs
self.setParams(**kwargs)
@keyword_only
def setParams(self, inputCol=None, outputCol=None):
kwargs = self._input_kwargs
return self._set(**kwargs)
# Required in Spark >= 3.0
def setInputCol(self, value):
"""
Sets the value of :py:attr:`inputCol`.
"""
return self._set(inputCol=value)
# Required in Spark >= 3.0
def setOutputCol(self, value):
"""
Sets the value of :py:attr:`outputCol`.
"""
return self._set(outputCol=value)
def _transform(self, dataset):
#todo(aj) hashes function
def clean_hashes(raw):
clean_nonwords = re.compile(r'\S*[^a-zA-Z\s\-\"\']\S*')
cleantext = re.sub(clean_nonwords,'',raw)
return cleantext
def f(raw):
tree = HTMLParser(raw)
if tree.body is None:
return raw
for tag in tree.css('script'):
tag.decompose()
for tag in tree.css('style'):
tag.decompose()
text = tree.body.text(separator='\n')
text = re.sub(r'\n\s*', "\n",text)
text = clean_hashes(text)
print("text: " + text)
return text.strip().strip("\n")
t = StringType()
out_col = self.getOutputCol()
in_col = dataset[self.getInputCol()]
return dataset.withColumn(out_col, udf(f, t)(in_col))
here is my pipeline:
import re
from pyspark.ml.feature import Tokenizer,StopWordsRemover, CountVectorizer,IDF,StringIndexer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
#apply extract body remove html
#https://stackoverflow.com/questions/32331848/create-a-custom-transformer-in-pyspark-ml
from pyspark.ml.classification import LogisticRegression
cleanhtml = CleanHtml(inputCol="text", outputCol="clean_text")
tokenizer = Tokenizer(inputCol="clean_text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
count_vec = CountVectorizer(inputCol='stop_tokens',outputCol='c_vec') #TF
idf = IDF(inputCol="c_vec", outputCol="tf_idf") #IDF Scaler
lr = LogisticRegression(regParam=0.01,maxIter=20,featuresCol='features', labelCol='target_int')
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector
clean_up = VectorAssembler(inputCols=['tf_idf'],outputCol='features')
pipeline = Pipeline(stages=[cleanhtml,tokenizer,stopremove,count_vec,idf,clean_up,lr])
from pyspark.ml.evaluation import BinaryClassificationEvaluator,MulticlassClassificationEvaluator
paramGrid = ParamGridBuilder()\
.addGrid(lr.regParam, [0.1, 0.001])\
.build()
crossval = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=MulticlassClassificationEvaluator(labelCol="target_int",metricName='f1'),
numFolds=2)
mdl = crossval.fit(joined)
Any help to get the transformer to serialize is appreciated.
I just tried this transformer but I get the dreaded JavaObject not callable * when i run the pipeline code. How do I find out what jar package I am missing now?
%pyspark
from pyspark import keyword_only ## < 2.0 -> pyspark.ml.util.keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters
# Available in PySpark >= 2.3.0
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
from selectolax.parser import HTMLParser
from pyspark.ml.util import JavaMLReadable, JavaMLWritable
from pyspark.ml.wrapper import JavaModel, JavaEstimator, JavaTransformer
from pyspark.ml.util import _jvm
class CleanHtml(
JavaTransformer, HasInputCol, HasOutputCol,
# Credits https://stackoverflow.com/a/52467470
# by https://stackoverflow.com/users/234944/benjamin-manns
JavaMLReadable,
JavaMLWritable,
):
@keyword_only
def __init__(self, inputCol=None, outputCol=None):
super(CleanHtml, self).__init__()
kwargs = self._input_kwargs
self.setParams(**kwargs)
super(CleanHtml, self).__init__()
map_model = self._new_java_obj("ml.combust.mleap.core.feature.CleanHtmlModel")
self._java_obj = self._new_java_obj("org.apache.spark.ml.mleap.feature.BleanHtml", self.uid, map_model)
self.setInputCol(inputCol)
self.setOutputCol(outputCol)
#https://www.google.com/sea
@keyword_only
def setParams(self, inputCol=None, outputCol=None):
kwargs = self._input_kwargs
return self._set(**kwargs)
# Required in Spark >= 3.0
def setInputCol(self, value):
"""
Sets the value of :py:attr:`inputCol`.
"""
return self._set(inputCol=value)
# Required in Spark >= 3.0
def setOutputCol(self, value):
"""
Sets the value of :py:attr:`outputCol`.
"""
return self._set(outputCol=value)
def _transform(self, dataset):
#todo(aj) hashes function
def clean_hashes(raw):
clean_nonwords = re.compile(r'\S*[^a-zA-Z\s\-\"\']\S*')
cleantext = re.sub(clean_nonwords,'',raw)
return cleantext
def f(raw):
tree = HTMLParser(raw)
if tree.body is None:
return raw
for tag in tree.css('script'):
tag.decompose()
for tag in tree.css('style'):
tag.decompose()
text = tree.body.text(separator='\n')
text = re.sub(r'\n\s*', "\n",text)
text = clean_hashes(text)
print("text: " + text)
return text.strip().strip("\n")
t = StringType()
out_col = self.getOutputCol()
in_col = dataset[self.getInputCol()]
return dataset.withColumn(out_col, udf(f, t)(in_col))
Here is the stacktrace:
Fail to execute line 9: cleanhtml = CleanHtml(inputCol="text", outputCol="clean_text")
Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-3059625262437356812.py", line 375, in <module>
exec(code, _zcUserQueryNameSpace)
File "<stdin>", line 9, in <module>
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/__init__.py", line 110, in wrapper
return func(self, **kwargs)
File "<stdin>", line 30, in __init__
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/ml/wrapper.py", line 67, in _new_java_obj
return java_obj(*java_args)
TypeError: 'JavaPackage' object is not callable
@guzzijones I have been working on this for a while, it's unfortunately much more difficult to get this working in pyspark, because you need to either:
-
Implement a java/scala transformer classes that your object can reference as it's java object (like this
-
Add an attribute definition to your class called _to_java and implement that (I think this is probably the better path, however, I do not have a good method of doing so. A custom example is here
Any help would be greatly appreciated if anyone has done this!
@guzzijones @Ben-Epstein We have two custom transformers now available in pyspark based on the work from @juhoautio and @lucagiovagnoli, the code changes are here https://github.com/combust/mleap/pull/666, perhaps you can take a look at how these two are implemented? Thank you!