elastically icon indicating copy to clipboard operation
elastically copied to clipboard

[Minor] Class attributes being private prevents code from being easily adapted

Open ThibautSF opened this issue 5 years ago • 5 comments

Hi,

I began to use elastically and implement it. But due to some specific use, I need to extend some classes and overload some methods. For example, overload the IndexBuilder.createIndex($indexName) method in order to change how index name is generated. But it's impossible to access private attributes like $this->configurationDirectory. Another example is to add request param to the bulk request generated in Indexer.getCurrentBulk() like set a pipeline $this->currentBulk->setRequestParam('pipeline', $pipelineID); (this part is pretty similar to #31)

Note: Another solution could be having more getters&setters for class attributes

ThibautSF avatar Oct 07 '20 10:10 ThibautSF

Hi! Thanks for reaching out!

Change how index name is generated

What would you need for example?

We have a PREFIX option, but that's all at the moment. I'm completely ok adding an option to change the date format, but using a completely non date-related name is not desirable.

For example the method purgeOldIndices needs to sort indexes by date.

Was that your plan/need?

Change to createIndex

This ticket #22 is still in my mind, allowing to fetch the mapping/setting from another source than YAML.

Access $this->configurationDirectory

You can use $client->getConfig(Client::CONFIG_MAPPINGS_DIRECTORY) it's the same :+1:

Pipeline to the bulk

You are right, #31 should be able that too! Thanks!

Let me know if this is helpful and what we can do to help!

damienalexandre avatar Oct 15 '20 15:10 damienalexandre

Change how index name is generated

What would you need for example?

We have a PREFIX option, but that's all at the moment. I'm completely ok adding an option to change the date format, but using a completely non-date-related name is not desirable.

I discovered just after posting my message the CONFIG_INDEX_PREFIX which is indeed what I needed. I just wanted to add more info before the index name. I have multiple groups, each group have its own indexes -> so now each index name has the group id as prefix

I will explain again what I have done now since my implementation is in a more advanced stage. I wanted to add pipelines to bulk requests but also add a bytes size limit for bulk requests (-> I use the ingest-attachement pipeline and send request with some file binaries, thus queries might be too heavy)

I extend two classes (which are copied from src code with protected attributes and overridden in composer autoload):

  • Client (mainly protected $indexer;) -> in CustomClient class
  • Indexer (mainly protected $currentBulk;) -> in CustomIndexer class

CustomClient

use JoliCode\Elastically\Client;
...

class CustomClient extends Client
{
    const CONFIG_BULK_BYTES_SIZE = 'elastically_bulk_bytes_size'; //New conf

    public function __construct($config = [], ?callable $callback = null, ?LoggerInterface $logger = null)
    {
        parent::__construct($config, $callback, $logger);
    }

    public function getIndexer(): Indexer //Use a custom indexer -> OOP
    {
        if (!$this->indexer) {
            $this->indexer = new CustomIndexer($this, $this->getSerializer(), $this->getConfigValue(self::CONFIG_BULK_SIZE, 100), $this->getConfigValue(self::CONFIG_BULK_BYTES_SIZE, (int) (ini_get('post_max_size')) * 1.0e+6));
        }

        return $this->indexer;
    }
}

As you see, I extend the class in order to add a new conf value and to override getIndexer(): Indexer in order to instantiate and return my CustomIndexer (a child of Indexer).

CustomIndexer

use JoliCode\Elastically\Indexer;
...

class LabCollectorIndexer extends Indexer
{
    protected $bulkBytesMaxSize; //New attribute

    public function __construct(Client $client, SerializerInterface $serializer, int $bulkMaxSize = 100, int $bulkBytesMaxSize) //Extended constructor
    {
        parent::__construct($client, $serializer, $bulkMaxSize ?? 100, $serializer);
        $this->bulkBytesMaxSize = $bulkBytesMaxSize;
    }

    public function scheduleIndex($index, Document $document, string $pipelineID = null) //add string $pipelineID = null to all methods which might call flush method
    {
        $document->setIndex($index instanceof Index ? $index->getName() : $index);
        if (is_object($document->getData())) {
            $context = $this->client->getSerializerContext(get_class($document->getData()));
            $document->setData($this->serializer->serialize($document->getData(), 'json', $context));
        }

        $docsize = $this->getBytesSize($document) * 2; // After adding a document $currentBulk size increase 2 times the document size

        if ($docsize > $this->bulkBytesMaxSize) {
            //Document too heavy
            //TODO return exception
            return;
        }

        $queuesize = $this->getQueueBytesSize();
        $response = null;
        if ($this->getQueueSize() + 1 >= $this->bulkMaxSize || $queuesize + $docsize >= $this->bulkBytesMaxSize) {
            //Queue too long or heavy in case we add the document -> send the bulk
            try {
                $response = $this->flush($pipelineID); //store response
            } catch (ResponseException $e) {
                $response = $e->getResponseSet(); //Get response even in case of exception (ie in my case, tka exception with encrypted files)
            }
        }

        $this->getCurrentBulk($pipelineID)->addDocument($document, Bulk\Action::OP_TYPE_INDEX);

        return $response; //Return the bulk response or null
    }

    // Note: insert modifications of scheduleDelete, scheduleUpdate and scheduleCreate which are very similar (but I want to reduce this issue post size)

    public function getQueueBytesSize() //New method
    {
        if (null === $this->currentBulk) {
            return 0;
        }

        return $this->getBytesSize($this->currentBulk->getActions());
    }

    public function getBytesSize($arr): int //New method
    {
        //TODO might need improvements
        $tot = 0;

        if (is_array($arr)) {
            foreach ($arr as $a) {
                $tot += $this->getBytesSize($a);
            }
        }
        if (is_string($arr)) {
            $tot += strlen($arr);
        }
        if (is_int($arr)) {
            $tot += PHP_INT_SIZE;
        }
        if (is_object($arr)) {
            $tot += strlen(serialize($arr));
        }

        return $tot;
    }

    protected function getCurrentBulk(string $pipelineID = null): Bulk //Override to add pipeline
    {
        if (!($this->currentBulk instanceof Bulk)) {
            $this->currentBulk = new Bulk($this->client);

            if (isset($pipelineID)) {
                $this->currentBulk->setRequestParam('pipeline', $pipelineID);
            }
        }

        return $this->currentBulk;
    }
}

Implementation of bulk pipelines and max byte size. Also, add a return of Elastica ResponseSet for each intermediary flush -> I use that to track document status and errors at index

ThibautSF avatar Oct 16 '20 13:10 ThibautSF

As I read my code again, getCurrentBulk override in CustomIndexer might be useless since I also can replace the line

$this->getCurrentBulk($pipelineID)->addDocument($document, Bulk\Action::OP_TYPE_INDEX);

with the lines

$bulk = $this->getCurrentBulk($pipelineID);
if (isset($pipelineID)) {
    $bulk ->setRequestParam('pipeline', $pipelineID); //Note: It will reset the value at each schedule* call
}
$bulk->addDocument($document, Bulk\Action::OP_TYPE_INDEX);

ThibautSF avatar Oct 16 '20 14:10 ThibautSF

Thanks for sharing.

Just found this setter on the Elastica Document $doc->setPipeline('coucou'); but it does not work on Bulk, it's only for one-shot indexing.

One possible implementation we could do is adding the pipeline as a Indexer option, like you did for bulkBytesMaxSize (that way you create one indexer per pipeline you want to use). ping #31 for reference.

damienalexandre avatar Oct 16 '20 14:10 damienalexandre

Just found this setter on the Elastica Document $doc->setPipeline('coucou'); but it does not work on Bulk, it's only for one-shot indexing.

Yeah, it was my first try to add a pipeline (before adding to CustomIndexer) and you remind me that I still have this line

/**
 * Index a FileSystem file.
 *
 * @param string  $attachId The file id in DB
 * @param string  $hash     An hash of the file (ie: filename or content, depends on what we want to be identified as unique), will be used as Elasticsearch index ID
 * @param string  $contents The file content (will be encoded in base64)
 * @param string  $filepath The direct path to the file
 * @param string  $dbtable  The file id in DB
 * @param Indexer $indexer  The Indexer instance
 * @param Index   $index    The Index instance
 *
 * @return null|ResponseSet
 */
function indexFSFile(string $attachId, string $hash, string $contents, string $filepath, string $dbtable, Indexer $indexer, Index $index)
{
    $b64 = base64_encode($contents);

    $path_parts = pathinfo($filepath);

    $dto = MyIndexedFile::createFSdoc($attachId, $b64, $dbtable, $path_parts['basename'], $path_parts['dirname'], $filepath);

    $doc = new Document($hash, $dto);
    $doc->setPipeline('fileattachment'); //TODO useless because it's a bulk request

    return $indexer->scheduleIndex($index, $doc, 'fileattachment');
}

One possible implementation we could do is adding the pipeline as a Indexer option, like you did for bulkBytesMaxSize (that way you create one indexer per pipeline you want to use). ping #31 for reference.

This is how I create the pipeline (at index creation) using only Elastica code (apart the $client which is an instance of CustomClient, which extends Elastically\Client which extends etc...)

//Add pipeline for ingest-attachement
$pipeline = new Pipeline($client);
$pipeline->setId('fileattachment')->setDescription('Extract attachment information');

//Create attachment processor pipeline -> set the field where are file binaries and infinite indexed chars
$attachproc = new Attachment('contentBinary');
$attachproc->setIndexedChars(-1);

//Create remove processor pipeline -> remove the file binaries after contentBinary is indexed (reduce weight)
$removeprc = new Remove('contentBinary');

//Add processors in the correct order to the pipeline
$pipeline->addProcessor($attachproc);
$pipeline->addProcessor($removeprc);
//Create the pipeline
$response = $pipeline->create();

And this 'fileattachement' pipeline ID is given to bulk in the first shared code in this post (last line). But indeed, through Indexer option, it could be useful 👍 Could also be a mapping based on index name (because for one client we might want to have index 'A' to use 'pipelineA' and index 'B' to use 'pipelineB') -> thus might even mean (yeah I'm extending again the idea) being able to set the pipeline in an equivalent way than the YAML mappings conf (ping #31 again)

ThibautSF avatar Oct 16 '20 15:10 ThibautSF