spark-nlp icon indicating copy to clipboard operation
spark-nlp copied to clipboard

Python Custom Annotator support for Lightpipelines

Open JoaoBone opened this issue 2 years ago • 6 comments

Is your feature request related to a problem? Please describe. We are attempting to add Python Custom Annotators inside of our Lightpipelines so we can tweak some of the outputs of the models to serve as inputs for other models. The inability to do so has been restricting the flexibility Lightpipelines can offer us, when compared to the regular ones.

Describe the solution you'd like Being able to use Python Custom Annotators inside of Lightpipelines.

Describe alternatives you've considered One workaround we have found is using custom annotators in regular pipelines, but we mostly work with lower quantities of data, so using those pipelines instead of Lightpipelines isn't nearly as fast, nor is the type of output data as flexible. We have also considered splitting the pipeline into two: a regular one and a lightpipeline that takes care of the custom annotators' "fixes", but once again, that is not optimal considering that we need to do data transformations between the lightpipeline's output and the pipeline's input, and we may want to use other models afterwards.

JoaoBone avatar Jan 25 '22 11:01 JoaoBone

Thanks @JoaoBone for the feature request. Could you please leave a full code (pipeline) that is reproducible on our side so we can asses this feature request? (could a simple Google Colab that runs end-to-end with minimum data/features to just represent the issue)

maziyarpanahi avatar Jan 25 '22 11:01 maziyarpanahi

Of course Maziyar.

Here follows some sample code with results for both scenarios (using Pipeline vs Lightpipeline):

from sparknlp.annotator import *
from sparknlp_jsl.annotator import *
from sparknlp.base import *
from sparknlp.functions import *
from sparknlp.annotation import Annotation

from pyspark.ml.param.shared import HasInputCol, HasOutputCol
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable  
import pyspark.sql.functions as F
from pyspark.sql import Row

spark = # SETUP SPARK SESSION (omitted)

# Custom Annotator (Only appends " - CHECKED" to the result of the annotations)
class DocumentChecker(Transformer,
                      HasInputCol,
                      HasOutputCol,
                      DefaultParamsReadable,
                      DefaultParamsWritable):

    @keyword_only
    def __init__(self):
        super(DocumentChecker, self).__init__()

    def setInputCol(self, value):
        """
        Sets the value of :py:attr:`inputCol`.
        """
        self.input_col_name = value
        return self._set(inputCol=value)

    def setOutputCol(self, value):
        """
        Sets the value of :py:attr:`outputCol`.
        """
        self.output_col_name = value
        return self._set(outputCol=value)

    def _transform(self, dataset):

        def document_checker(annotations):
                checked_annotations = []
                for annotation in annotations:
                        dict_annotation = annotation.asDict()
                        dict_annotation['result'] = dict_annotation['result']+" - CHECKED"
                        new_annotation = Row(**dict_annotation) # ** Unpacks the dictionary so it can be directly used in the Row constructor
                        checked_annotations.append(new_annotation)
                return checked_annotations

        udf_document_checker = F.udf(lambda x: document_checker(x), returnType=Annotation.arrayType())
        dataset = dataset.select("*").withColumn(self.output_col_name, udf_document_checker(dataset[self.input_col_name]))
        return dataset
        
        
        
#Pipeline
documentAssembler = DocumentAssembler()\
                    .setInputCol("Plaintext")\
                    .setOutputCol("document")

documentChecker = DocumentChecker()\
                  .setInputCol("document")\
                  .setOutputCol("checked_document")

pipeline = Pipeline(stages=[documentAssembler,
                            documentChecker])

Here are the 2 scenarios I mentioned:

1. Using pipeline directly

test_string = "Sample text"
test_data = spark.createDataFrame([[test_string]]).toDF("Plaintext")
pipeline.fit(test_data).transform(test_data).show(truncate=False)

Output:

image

2. Creating a lightpipeline to annotate

test_string = "Sample text"
empty_data = spark.createDataFrame([[""]]).toDF("Plaintext")
light_pipeline = LightPipeline(pipeline.fit(empty_data))
light_pipeline.fullAnnotate(test_string)

Output:


AttributeError                            Traceback (most recent call last)
/tmp/ipykernel_5929/3808117872.py in <module>
      1 empty_data = spark.createDataFrame([[""]]).toDF("Plaintext")
----> 2 light_pipeline = LightPipeline(pipeline.fit(empty_data))
      3 light_pipeline.fullAnnotate(test_string)

~/ubuntuEnvs/SelectData/lib/python3.7/site-packages/sparknlp/base.py in __init__(self, pipelineModel, parse_embeddings)
     77     def __init__(self, pipelineModel, parse_embeddings=False):
     78         self.pipeline_model = pipelineModel
---> 79         self._lightPipeline = _internal._LightPipeline(pipelineModel, parse_embeddings).apply()
     80 
     81     @staticmethod

~/ubuntuEnvs/SelectData/lib/python3.7/site-packages/sparknlp/internal.py in __init__(self, pipelineModel, parse_embeddings)
    265 class _LightPipeline(ExtendedJavaWrapper):
    266     def __init__(self, pipelineModel, parse_embeddings):
--> 267         super(_LightPipeline, self).__init__("com.johnsnowlabs.nlp.LightPipeline", pipelineModel._to_java(),
    268                                              parse_embeddings)
    269 

~/ubuntuEnvs/SelectData/lib/python3.7/site-packages/pyspark/ml/pipeline.py in _to_java(self)
    331         java_stages = gateway.new_array(cls, len(self.stages))
    332         for idx, stage in enumerate(self.stages):
--> 333             java_stages[idx] = stage._to_java()
    334 
    335         _java_obj =\

AttributeError: 'DocumentChecker' object has no attribute '_to_java'

JoaoBone avatar Jan 25 '22 12:01 JoaoBone

Thanks @JoaoBone - this is a great example to reproduce the issue. We'll work on this to see if it's possible to allow a custom annotator when LightPipeline is being used. (by allowing I mean programmatically possible to have unregistered annotator in LightPipeline without previously defined in the code base)

maziyarpanahi avatar Jan 25 '22 12:01 maziyarpanahi

Thank you @maziyarpanahi. Looking forward to it!

JoaoBone avatar Jan 25 '22 12:01 JoaoBone

This issue is stale because it has been open 120 days with no activity. Remove stale label or comment or this will be closed in 5 days

github-actions[bot] avatar Jun 16 '22 00:06 github-actions[bot]

This issue is stale because it has been open 120 days with no activity. Remove stale label or comment or this will be closed in 5 days

github-actions[bot] avatar Oct 19 '22 00:10 github-actions[bot]