spark-nlp
spark-nlp copied to clipboard
Python Custom Annotator support for Lightpipelines
Is your feature request related to a problem? Please describe. We are attempting to add Python Custom Annotators inside of our Lightpipelines so we can tweak some of the outputs of the models to serve as inputs for other models. The inability to do so has been restricting the flexibility Lightpipelines can offer us, when compared to the regular ones.
Describe the solution you'd like Being able to use Python Custom Annotators inside of Lightpipelines.
Describe alternatives you've considered One workaround we have found is using custom annotators in regular pipelines, but we mostly work with lower quantities of data, so using those pipelines instead of Lightpipelines isn't nearly as fast, nor is the type of output data as flexible. We have also considered splitting the pipeline into two: a regular one and a lightpipeline that takes care of the custom annotators' "fixes", but once again, that is not optimal considering that we need to do data transformations between the lightpipeline's output and the pipeline's input, and we may want to use other models afterwards.
Thanks @JoaoBone for the feature request. Could you please leave a full code (pipeline) that is reproducible on our side so we can asses this feature request? (could a simple Google Colab that runs end-to-end with minimum data/features to just represent the issue)
Of course Maziyar.
Here follows some sample code with results for both scenarios (using Pipeline vs Lightpipeline):
from sparknlp.annotator import *
from sparknlp_jsl.annotator import *
from sparknlp.base import *
from sparknlp.functions import *
from sparknlp.annotation import Annotation
from pyspark.ml.param.shared import HasInputCol, HasOutputCol
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
import pyspark.sql.functions as F
from pyspark.sql import Row
spark = # SETUP SPARK SESSION (omitted)
# Custom Annotator (Only appends " - CHECKED" to the result of the annotations)
class DocumentChecker(Transformer,
HasInputCol,
HasOutputCol,
DefaultParamsReadable,
DefaultParamsWritable):
@keyword_only
def __init__(self):
super(DocumentChecker, self).__init__()
def setInputCol(self, value):
"""
Sets the value of :py:attr:`inputCol`.
"""
self.input_col_name = value
return self._set(inputCol=value)
def setOutputCol(self, value):
"""
Sets the value of :py:attr:`outputCol`.
"""
self.output_col_name = value
return self._set(outputCol=value)
def _transform(self, dataset):
def document_checker(annotations):
checked_annotations = []
for annotation in annotations:
dict_annotation = annotation.asDict()
dict_annotation['result'] = dict_annotation['result']+" - CHECKED"
new_annotation = Row(**dict_annotation) # ** Unpacks the dictionary so it can be directly used in the Row constructor
checked_annotations.append(new_annotation)
return checked_annotations
udf_document_checker = F.udf(lambda x: document_checker(x), returnType=Annotation.arrayType())
dataset = dataset.select("*").withColumn(self.output_col_name, udf_document_checker(dataset[self.input_col_name]))
return dataset
#Pipeline
documentAssembler = DocumentAssembler()\
.setInputCol("Plaintext")\
.setOutputCol("document")
documentChecker = DocumentChecker()\
.setInputCol("document")\
.setOutputCol("checked_document")
pipeline = Pipeline(stages=[documentAssembler,
documentChecker])
Here are the 2 scenarios I mentioned:
1. Using pipeline directly
test_string = "Sample text"
test_data = spark.createDataFrame([[test_string]]).toDF("Plaintext")
pipeline.fit(test_data).transform(test_data).show(truncate=False)
Output:
2. Creating a lightpipeline to annotate
test_string = "Sample text"
empty_data = spark.createDataFrame([[""]]).toDF("Plaintext")
light_pipeline = LightPipeline(pipeline.fit(empty_data))
light_pipeline.fullAnnotate(test_string)
Output:
AttributeError Traceback (most recent call last)
/tmp/ipykernel_5929/3808117872.py in <module>
1 empty_data = spark.createDataFrame([[""]]).toDF("Plaintext")
----> 2 light_pipeline = LightPipeline(pipeline.fit(empty_data))
3 light_pipeline.fullAnnotate(test_string)
~/ubuntuEnvs/SelectData/lib/python3.7/site-packages/sparknlp/base.py in __init__(self, pipelineModel, parse_embeddings)
77 def __init__(self, pipelineModel, parse_embeddings=False):
78 self.pipeline_model = pipelineModel
---> 79 self._lightPipeline = _internal._LightPipeline(pipelineModel, parse_embeddings).apply()
80
81 @staticmethod
~/ubuntuEnvs/SelectData/lib/python3.7/site-packages/sparknlp/internal.py in __init__(self, pipelineModel, parse_embeddings)
265 class _LightPipeline(ExtendedJavaWrapper):
266 def __init__(self, pipelineModel, parse_embeddings):
--> 267 super(_LightPipeline, self).__init__("com.johnsnowlabs.nlp.LightPipeline", pipelineModel._to_java(),
268 parse_embeddings)
269
~/ubuntuEnvs/SelectData/lib/python3.7/site-packages/pyspark/ml/pipeline.py in _to_java(self)
331 java_stages = gateway.new_array(cls, len(self.stages))
332 for idx, stage in enumerate(self.stages):
--> 333 java_stages[idx] = stage._to_java()
334
335 _java_obj =\
AttributeError: 'DocumentChecker' object has no attribute '_to_java'
Thanks @JoaoBone - this is a great example to reproduce the issue. We'll work on this to see if it's possible to allow a custom annotator when LightPipeline is being used. (by allowing I mean programmatically possible to have unregistered annotator in LightPipeline without previously defined in the code base)
Thank you @maziyarpanahi. Looking forward to it!
This issue is stale because it has been open 120 days with no activity. Remove stale label or comment or this will be closed in 5 days
This issue is stale because it has been open 120 days with no activity. Remove stale label or comment or this will be closed in 5 days