Elastica icon indicating copy to clipboard operation
Elastica copied to clipboard

Impossible to create pipeline with multiple processors with same name

Open ThibautSF opened this issue 5 years ago • 4 comments

Edit 1

Last sample workaround in https://github.com/ruflin/Elastica/issues/1810#issuecomment-717393980

Original

Hi,

Use Elasticsearch 7.9.2 Elastica 7.0.0

I would like to create the following pipeline from PHP with Elastica :

PUT _ingest/pipeline/pageattachment
{
    "description": "Extract attachment information",
    "processors": [
        {
            "attachment": {
                "field": "pageDescBinary",
                "indexed_chars": -1,
                "target_field": "desc_attachment"
            }
        },
        {
            "remove": {
                "field": "pageDescBinary"
            }
        },
        {
            "attachment": {
                "field": "pageSpreadsheetBinary",
                "indexed_chars": -1,
                "target_field": "spreadsheet_attachment"
            }
        },
        {
            "remove": {
                "field": "pageSpreadsheetBinary"
            }
        }
    ]
}

Through manual curl query and some indexation test it works well, but when I want to do this by PHP all processors type which appears more that one time are erased (only the last added is keeped)

$pipeline = new Pipeline($client);
$pipeline->setId('pageattachment')->setDescription('Extract attachment information');

//Create attachment processor
$attachproc = new Attachment('pageDescBinary');
$attachproc->setIndexedChars(-1);
$attachproc->setTargetField('desc_attachment');

//Create remove processor 
$removeproc = new Remove('pageDescBinary');

//Create second attachment processor 
$attachproc2 = new Attachment('pageSpreadsheetBinary');
$attachproc2->setIndexedChars(-1);
$attachproc2->setTargetField('spreadsheet_attachment');

//Create second remove processor 
$removeproc2 = new Remove('pageSpreadsheetBinary');

//Add processors to the pipeline
$pipeline->addProcessor($attachproc);
$pipeline->addProcessor($removeproc);
$pipeline->addProcessor($attachproc2);
$pipeline->addProcessor($removeproc2);

$response = $pipeline->create();

But then the produced pipeline is :

{
  "pageattachment": {
    "description": "Extract attachment information",
    "processors": [
      {
        "attachment": {
          "field": "pageSpreadsheetBinary",
          "indexed_chars": -1,
          "target_field": "spreadsheet_attachment"
        },
        "remove": {
          "field": "pageSpreadsheetBinary"
        }
      }
    ]
  }
}

As we can see, only the second 'attachement' and 'remove' processors are keeped. The problem is caused by the associative array used by Pipeline class which associate the processor type as a key :

Array ( 
	[processors] => Array ( 
		[attachment] => Array ( 
			[field] => pageSpreadsheetBinary 
			[indexed_chars] => -1 
			[target_field] => spreadsheet_attachment 
		) 
		[remove] => Array (
			[field] => pageSpreadsheetBinary 
		)
	)
)

I tried to find a workarround by using setRawProcessors

$pipeline = new Pipeline($client);
$pipeline->setId('pageattachment')->setDescription('Extract attachment information');

$processors = [
    'processors' => [
        [
            'attachment' => [
                'field' => 'pageDescBinary',
                'indexed_chars' => -1,
                'target_field' => 'desc_attachment',
            ],
        ],
        [
            'remove' => [
                'field' => 'pageDescBinary',
            ],
        ],
        [
            'attachment' => [
                'field' => 'pageSpreadsheetBinary',
                'indexed_chars' => -1,
                'target_field' => 'spreadsheet_attachment',
            ],
        ],
        [
            'remove' => [
                'field' => 'pageSpreadsheetBinary',
            ],
        ],
    ],
];

$pipeline->setRawProcessors($processors);
$response = $pipeline->create();

But this produces an error on Elasticsearch (certainly because php uses 0,1,2... as keys)

[2020-10-19T14:20:04,866][WARN ][r.suppressed             ] [lc-master-1] path: /_ingest/pipeline/pageattachment, params: {id=pageattachment}
java.lang.ClassCastException: class java.util.ArrayList cannot be cast to class java.util.Map (java.util.ArrayList and java.util.Map are in module java.base of loader 'bootstrap')
        at org.elasticsearch.ingest.ConfigurationUtils.readProcessorConfigs(ConfigurationUtils.java:334) ~[elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.ingest.Pipeline.create(Pipeline.java:74) ~[elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.ingest.IngestService.validatePipeline(IngestService.java:435) ~[elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.ingest.IngestService.putPipeline(IngestService.java:340) ~[elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.action.ingest.PutPipelineTransportAction.lambda$masterOperation$0(PutPipelineTransportAction.java:88) ~[elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.action.ActionListener$1.onResponse(ActionListener.java:63) [elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.action.support.ContextPreservingActionListener.onResponse(ContextPreservingActionListener.java:43) [elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.action.support.TransportAction$1.onResponse(TransportAction.java:89) [elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.action.support.TransportAction$1.onResponse(TransportAction.java:83) [elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.action.support.nodes.TransportNodesAction$AsyncAction.finishHim(TransportNodesAction.java:241) [elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.action.support.nodes.TransportNodesAction$AsyncAction.onOperation(TransportNodesAction.java:218) [elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.action.support.nodes.TransportNodesAction$AsyncAction.access$000(TransportNodesAction.java:147) [elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.action.support.nodes.TransportNodesAction$AsyncAction$1.handleResponse(TransportNodesAction.java:196) [elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.action.support.nodes.TransportNodesAction$AsyncAction$1.handleResponse(TransportNodesAction.java:188) [elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.transport.TransportService$6.handleResponse(TransportService.java:632) [elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.transport.TransportService$ContextRestoreResponseHandler.handleResponse(TransportService.java:1162) [elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.transport.TransportService$ContextRestoreResponseHandler.handleResponse(TransportService.java:1162) [elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.transport.TransportService$DirectResponseChannel.processResponse(TransportService.java:1240) [elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.transport.TransportService$DirectResponseChannel.sendResponse(TransportService.java:1220) [elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.transport.TaskTransportChannel.sendResponse(TaskTransportChannel.java:52) [elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.action.support.nodes.TransportNodesAction$NodeTransportHandler.messageReceived(TransportNodesAction.java:249) [elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.action.support.nodes.TransportNodesAction$NodeTransportHandler.messageReceived(TransportNodesAction.java:245) [elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.xpack.security.transport.SecurityServerTransportInterceptor$ProfileSecuredRequestHandler$1.doRun(SecurityServerTransportInterceptor.java:257) [x-pack-security-7.9.2.jar:7.9.2]
        at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.xpack.security.transport.SecurityServerTransportInterceptor$ProfileSecuredRequestHandler.messageReceived(SecurityServerTransportInterceptor.java:315) [x-pack-security-7.9.2.jar:7.9.2]
        at org.elasticsearch.transport.RequestHandlerRegistry.processMessageReceived(RequestHandlerRegistry.java:72) [elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.transport.TransportService$8.doRun(TransportService.java:800) [elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:737) [elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-7.9.2.jar:7.9.2]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
        at java.lang.Thread.run(Thread.java:834) [?:?]

Will try to use Pipeline processor as second workaround and do nested pipelines. Open for better workaround because this one isn't ideal.

ThibautSF avatar Oct 19 '20 13:10 ThibautSF

Probably the best workaround would be to add full or at least have some support for this "natively" in Elastica?

ruflin avatar Oct 21 '20 07:10 ruflin

I think it will need some enhancements of how processors are stored in Pipeline (maybe even params) and/or how Pipeline is serialized in order to be sent to Elasticsearch.

But I still don't understand enough that part of the code to be sure of how to do that.

ThibautSF avatar Oct 21 '20 07:10 ThibautSF

I saw you opened a PR, probably that will make the discussion easier.

ruflin avatar Oct 22 '20 09:10 ruflin

So!

I found a much better workaround (might be an idea to follow)

use Elastica\Processor\Attachment;
use Elastica\Processor\Lowercase;
use Elastica\Processor\Remove;
use Elastica\Request;

//Create first attachment processor
$attachprocDesc = new Attachment('descBinary');
$attachprocDesc->setIndexedChars(-1);
$attachprocDesc->setTargetField('desc_attachment');

//Create first remove processor
$removeprocDesc = new Remove('descBinary');

//Create second attachment processor (used in a foreach processor)
$attachprocNeweditor = new Attachment('_ingest._value.contentBinary');
$attachprocNeweditor->setIndexedChars(-1);
$attachprocNeweditor->setTargetField('_ingest._value.content');

//Create second remove processor (used in a foreach processor)
$removeprocNeweditor = new Remove('_ingest._value.contentBinary');

$pipelineId = 'mypipeline';
$pipeline = [
    'description' => 'a pipeline',
    'processors' => [
        $attachprocDesc->toArray(), //1st attachment
        $removeprocDesc->toArray(), //1st remove
        [ //1st foreach (manual because not implemented in Elastica)
            'foreach' => [
                'field' => 'subContents',
                'ignore_missing' => true,
                'processor' => $attachprocNeweditor->toArray(), //2nd attachment
            ],
        ],
        [ //2nd foreach (manual because not implemented in Elastica)
            'foreach' => [
                'field' => 'subContents',
                'ignore_missing' => true,
                'processor' => $removeprocNeweditor->toArray(), //2nd remove
            ],
        ],
        (new Lowercase('somefield'))->toArray(),
    ],
];

$path = "_ingest/pipeline/{$pipelineId}";

$client->request($path, Request::PUT, json_encode($pipeline));

Will produce the following pipeline (named 'mypipeline')

{
    "description": "a pipeline",
    "processors": [
        {
            "attachment": {
                "field": "descBinary",
                "indexed_chars": -1,
                "target_field": "desc_attachment"
            }
        },
        {
            "remove": {
                "field": "descBinary"
            }
        },
        {
            "foreach": {
                "field": "subContents",
                "ignore_missing": true,
                "processor": {
                    "attachment": {
                        "field": "_ingest._value.contentBinary",
                        "indexed_chars": -1,
                        "target_field": "_ingest._value.content"
                    }
                }
            }
        },
        {
            "foreach": {
                "field": "subContents",
                "ignore_missing": true,
                "processor": {
                    "remove": {
                        "field": "_ingest._value.contentBinary"
                    }
                }
            }
        },
        {
            "lowercase": {
                "field": "somefield"
            }
        }
    ]
}

ThibautSF avatar Oct 27 '20 17:10 ThibautSF