Elastica
Elastica copied to clipboard
Impossible to create pipeline with multiple processors with same name
Edit 1
Last sample workaround in https://github.com/ruflin/Elastica/issues/1810#issuecomment-717393980
Original
Hi,
Use Elasticsearch 7.9.2 Elastica 7.0.0
I would like to create the following pipeline from PHP with Elastica :
PUT _ingest/pipeline/pageattachment
{
"description": "Extract attachment information",
"processors": [
{
"attachment": {
"field": "pageDescBinary",
"indexed_chars": -1,
"target_field": "desc_attachment"
}
},
{
"remove": {
"field": "pageDescBinary"
}
},
{
"attachment": {
"field": "pageSpreadsheetBinary",
"indexed_chars": -1,
"target_field": "spreadsheet_attachment"
}
},
{
"remove": {
"field": "pageSpreadsheetBinary"
}
}
]
}
Through manual curl query and some indexation test it works well, but when I want to do this by PHP all processors type which appears more that one time are erased (only the last added is keeped)
$pipeline = new Pipeline($client);
$pipeline->setId('pageattachment')->setDescription('Extract attachment information');
//Create attachment processor
$attachproc = new Attachment('pageDescBinary');
$attachproc->setIndexedChars(-1);
$attachproc->setTargetField('desc_attachment');
//Create remove processor
$removeproc = new Remove('pageDescBinary');
//Create second attachment processor
$attachproc2 = new Attachment('pageSpreadsheetBinary');
$attachproc2->setIndexedChars(-1);
$attachproc2->setTargetField('spreadsheet_attachment');
//Create second remove processor
$removeproc2 = new Remove('pageSpreadsheetBinary');
//Add processors to the pipeline
$pipeline->addProcessor($attachproc);
$pipeline->addProcessor($removeproc);
$pipeline->addProcessor($attachproc2);
$pipeline->addProcessor($removeproc2);
$response = $pipeline->create();
But then the produced pipeline is :
{
"pageattachment": {
"description": "Extract attachment information",
"processors": [
{
"attachment": {
"field": "pageSpreadsheetBinary",
"indexed_chars": -1,
"target_field": "spreadsheet_attachment"
},
"remove": {
"field": "pageSpreadsheetBinary"
}
}
]
}
}
As we can see, only the second 'attachement' and 'remove' processors are keeped. The problem is caused by the associative array used by Pipeline class which associate the processor type as a key :
Array (
[processors] => Array (
[attachment] => Array (
[field] => pageSpreadsheetBinary
[indexed_chars] => -1
[target_field] => spreadsheet_attachment
)
[remove] => Array (
[field] => pageSpreadsheetBinary
)
)
)
I tried to find a workarround by using setRawProcessors
$pipeline = new Pipeline($client);
$pipeline->setId('pageattachment')->setDescription('Extract attachment information');
$processors = [
'processors' => [
[
'attachment' => [
'field' => 'pageDescBinary',
'indexed_chars' => -1,
'target_field' => 'desc_attachment',
],
],
[
'remove' => [
'field' => 'pageDescBinary',
],
],
[
'attachment' => [
'field' => 'pageSpreadsheetBinary',
'indexed_chars' => -1,
'target_field' => 'spreadsheet_attachment',
],
],
[
'remove' => [
'field' => 'pageSpreadsheetBinary',
],
],
],
];
$pipeline->setRawProcessors($processors);
$response = $pipeline->create();
But this produces an error on Elasticsearch (certainly because php uses 0,1,2... as keys)
[2020-10-19T14:20:04,866][WARN ][r.suppressed ] [lc-master-1] path: /_ingest/pipeline/pageattachment, params: {id=pageattachment}
java.lang.ClassCastException: class java.util.ArrayList cannot be cast to class java.util.Map (java.util.ArrayList and java.util.Map are in module java.base of loader 'bootstrap')
at org.elasticsearch.ingest.ConfigurationUtils.readProcessorConfigs(ConfigurationUtils.java:334) ~[elasticsearch-7.9.2.jar:7.9.2]
at org.elasticsearch.ingest.Pipeline.create(Pipeline.java:74) ~[elasticsearch-7.9.2.jar:7.9.2]
at org.elasticsearch.ingest.IngestService.validatePipeline(IngestService.java:435) ~[elasticsearch-7.9.2.jar:7.9.2]
at org.elasticsearch.ingest.IngestService.putPipeline(IngestService.java:340) ~[elasticsearch-7.9.2.jar:7.9.2]
at org.elasticsearch.action.ingest.PutPipelineTransportAction.lambda$masterOperation$0(PutPipelineTransportAction.java:88) ~[elasticsearch-7.9.2.jar:7.9.2]
at org.elasticsearch.action.ActionListener$1.onResponse(ActionListener.java:63) [elasticsearch-7.9.2.jar:7.9.2]
at org.elasticsearch.action.support.ContextPreservingActionListener.onResponse(ContextPreservingActionListener.java:43) [elasticsearch-7.9.2.jar:7.9.2]
at org.elasticsearch.action.support.TransportAction$1.onResponse(TransportAction.java:89) [elasticsearch-7.9.2.jar:7.9.2]
at org.elasticsearch.action.support.TransportAction$1.onResponse(TransportAction.java:83) [elasticsearch-7.9.2.jar:7.9.2]
at org.elasticsearch.action.support.nodes.TransportNodesAction$AsyncAction.finishHim(TransportNodesAction.java:241) [elasticsearch-7.9.2.jar:7.9.2]
at org.elasticsearch.action.support.nodes.TransportNodesAction$AsyncAction.onOperation(TransportNodesAction.java:218) [elasticsearch-7.9.2.jar:7.9.2]
at org.elasticsearch.action.support.nodes.TransportNodesAction$AsyncAction.access$000(TransportNodesAction.java:147) [elasticsearch-7.9.2.jar:7.9.2]
at org.elasticsearch.action.support.nodes.TransportNodesAction$AsyncAction$1.handleResponse(TransportNodesAction.java:196) [elasticsearch-7.9.2.jar:7.9.2]
at org.elasticsearch.action.support.nodes.TransportNodesAction$AsyncAction$1.handleResponse(TransportNodesAction.java:188) [elasticsearch-7.9.2.jar:7.9.2]
at org.elasticsearch.transport.TransportService$6.handleResponse(TransportService.java:632) [elasticsearch-7.9.2.jar:7.9.2]
at org.elasticsearch.transport.TransportService$ContextRestoreResponseHandler.handleResponse(TransportService.java:1162) [elasticsearch-7.9.2.jar:7.9.2]
at org.elasticsearch.transport.TransportService$ContextRestoreResponseHandler.handleResponse(TransportService.java:1162) [elasticsearch-7.9.2.jar:7.9.2]
at org.elasticsearch.transport.TransportService$DirectResponseChannel.processResponse(TransportService.java:1240) [elasticsearch-7.9.2.jar:7.9.2]
at org.elasticsearch.transport.TransportService$DirectResponseChannel.sendResponse(TransportService.java:1220) [elasticsearch-7.9.2.jar:7.9.2]
at org.elasticsearch.transport.TaskTransportChannel.sendResponse(TaskTransportChannel.java:52) [elasticsearch-7.9.2.jar:7.9.2]
at org.elasticsearch.action.support.nodes.TransportNodesAction$NodeTransportHandler.messageReceived(TransportNodesAction.java:249) [elasticsearch-7.9.2.jar:7.9.2]
at org.elasticsearch.action.support.nodes.TransportNodesAction$NodeTransportHandler.messageReceived(TransportNodesAction.java:245) [elasticsearch-7.9.2.jar:7.9.2]
at org.elasticsearch.xpack.security.transport.SecurityServerTransportInterceptor$ProfileSecuredRequestHandler$1.doRun(SecurityServerTransportInterceptor.java:257) [x-pack-security-7.9.2.jar:7.9.2]
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-7.9.2.jar:7.9.2]
at org.elasticsearch.xpack.security.transport.SecurityServerTransportInterceptor$ProfileSecuredRequestHandler.messageReceived(SecurityServerTransportInterceptor.java:315) [x-pack-security-7.9.2.jar:7.9.2]
at org.elasticsearch.transport.RequestHandlerRegistry.processMessageReceived(RequestHandlerRegistry.java:72) [elasticsearch-7.9.2.jar:7.9.2]
at org.elasticsearch.transport.TransportService$8.doRun(TransportService.java:800) [elasticsearch-7.9.2.jar:7.9.2]
at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:737) [elasticsearch-7.9.2.jar:7.9.2]
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-7.9.2.jar:7.9.2]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.lang.Thread.run(Thread.java:834) [?:?]
Will try to use Pipeline processor as second workaround and do nested pipelines. Open for better workaround because this one isn't ideal.
Probably the best workaround would be to add full or at least have some support for this "natively" in Elastica?
I think it will need some enhancements of how processors are stored in Pipeline (maybe even params) and/or how Pipeline is serialized in order to be sent to Elasticsearch.
But I still don't understand enough that part of the code to be sure of how to do that.
I saw you opened a PR, probably that will make the discussion easier.
So!
I found a much better workaround (might be an idea to follow)
use Elastica\Processor\Attachment;
use Elastica\Processor\Lowercase;
use Elastica\Processor\Remove;
use Elastica\Request;
//Create first attachment processor
$attachprocDesc = new Attachment('descBinary');
$attachprocDesc->setIndexedChars(-1);
$attachprocDesc->setTargetField('desc_attachment');
//Create first remove processor
$removeprocDesc = new Remove('descBinary');
//Create second attachment processor (used in a foreach processor)
$attachprocNeweditor = new Attachment('_ingest._value.contentBinary');
$attachprocNeweditor->setIndexedChars(-1);
$attachprocNeweditor->setTargetField('_ingest._value.content');
//Create second remove processor (used in a foreach processor)
$removeprocNeweditor = new Remove('_ingest._value.contentBinary');
$pipelineId = 'mypipeline';
$pipeline = [
'description' => 'a pipeline',
'processors' => [
$attachprocDesc->toArray(), //1st attachment
$removeprocDesc->toArray(), //1st remove
[ //1st foreach (manual because not implemented in Elastica)
'foreach' => [
'field' => 'subContents',
'ignore_missing' => true,
'processor' => $attachprocNeweditor->toArray(), //2nd attachment
],
],
[ //2nd foreach (manual because not implemented in Elastica)
'foreach' => [
'field' => 'subContents',
'ignore_missing' => true,
'processor' => $removeprocNeweditor->toArray(), //2nd remove
],
],
(new Lowercase('somefield'))->toArray(),
],
];
$path = "_ingest/pipeline/{$pipelineId}";
$client->request($path, Request::PUT, json_encode($pipeline));
Will produce the following pipeline (named 'mypipeline')
{
"description": "a pipeline",
"processors": [
{
"attachment": {
"field": "descBinary",
"indexed_chars": -1,
"target_field": "desc_attachment"
}
},
{
"remove": {
"field": "descBinary"
}
},
{
"foreach": {
"field": "subContents",
"ignore_missing": true,
"processor": {
"attachment": {
"field": "_ingest._value.contentBinary",
"indexed_chars": -1,
"target_field": "_ingest._value.content"
}
}
}
},
{
"foreach": {
"field": "subContents",
"ignore_missing": true,
"processor": {
"remove": {
"field": "_ingest._value.contentBinary"
}
}
}
},
{
"lowercase": {
"field": "somefield"
}
}
]
}