FOSElasticaBundle icon indicating copy to clipboard operation
FOSElasticaBundle copied to clipboard

Messenger component for populate guide?

Open kricha opened this issue 3 years ago • 12 comments

Did anyone have completer or something near complete guide to use symfony/messenger for populating process? Seems that community need to have information about enabling async persister, using custom pager provider etc...

kricha avatar Jan 24 '22 20:01 kricha

I use that way: messenger.yaml:

framework:
  messenger:
    transports:
      elastica: '%env(MESSENGER_TRANSPORT_DSN)%?queue_name=elastica'
    routing:
        FOS\ElasticaBundle\Message\AsyncPersistPage: elastica

services.yaml:

  FOS\ElasticaBundle\Persister\AsyncPagerPersister:
      arguments:
          - '@fos_elastica.pager_persister_registry'
          - '@fos_elastica.pager_provider_registry'
          - '@Symfony\Component\Messenger\MessageBusInterface'
      tags:
          - { name: "fos_elastica.pager_persister", persisterName: "async" }

  FOS\ElasticaBundle\Message\Handler\AsyncPersistPageHandler:
      tags: [ messenger.message_handler ]

then run in-bundle populate command. It will dispatch paged messages to 'elastica' queue. Then consume this messages with messenger:consume

may be something i''ve done wrong but it works

saalexeev avatar Feb 04 '22 08:02 saalexeev

@saalexeev thanks. But question is more for solid contributors. I also have working version, but for me it seems ugly :D

kricha avatar Feb 07 '22 08:02 kricha

Enable messenger:

fos_elastica:
    messenger: ~

Configure transport

framework:
    messenger:
        transports:
            async_elasticsearch: '%env(MESSENGER_TRANSPORT_DSN)%/async_elasticsearch

        routing:
            FOS\ElasticaBundle\Message\AsyncPersistPage: async_elasticsearch

Configure handler:

    FOS\ElasticaBundle\Message\Handler\AsyncPersistPageHandler:
        arguments:
            - '@fos_elastica.async_pager_persister'
        tags: [ 'messenger.message_handler' ]

oleg-andreyev avatar Dec 07 '22 09:12 oleg-andreyev

Thanks @oleg-andreyev and @saalexeev , do you have any idea how I can avoid downtime (0 results nodes) when using aliases with the above configuration ?

kezek avatar Dec 12 '22 12:12 kezek

This configuration does not work with "creating" index and switching aliases. It uses existing index (or creates if missing)

oleg-andreyev avatar Dec 12 '22 12:12 oleg-andreyev

@oleg-andreyev thanks , I assumed so. The issue is that PopulateListener is triggered after the worker's dispatch. I can avoid that that appending --no-reset , but how would I know when the async workers are done in order to make the alias switch ? Any hint is appreciated.

kezek avatar Dec 12 '22 13:12 kezek

Unfortunately you cannot, it's async and order of execution is not guaranteed. The only way I can think of is monitoring queue size, and switch after it has 0 messages

oleg-andreyev avatar Dec 12 '22 13:12 oleg-andreyev

I'm willing to write up and submit a PR for some additional documentation on how to get symfony/messenger working, but I'm currently getting "The receiver 'elasticsearch' does not exist" when trying to run the consumer with bin/console messenger/consume elasticsearch -vv.

As far as I can tell, I have things configured correctly:

app/config/services/fos_elastica.yml:

fos_elastica:
  messenger: ~
  ...

services:
  FOS\ElasticaBundle\Message\Handler\AsyncPersistPageHandler:
    arguments:
      - '@fos_elastica.async_pager_persister'
    tags: [ messenger.message_handler ]

app/config/packages/messenger.yml:

framework:
    messenger:
        transports:
            # https://symfony.com/doc/current/messenger.html#transport-configuration
            elasticsearch: "doctrine://default?queue_name=elasticsearch"

        routing:
            # Route your messages to the transports
            'FOS\ElasticaBundle\Message\AsyncPersistPage': elasticsearch

Any idea why symfony doesn't seem to see my "elasticsearch" consumer/receiver/transport? Using Symfony 4 with FOSElasticaBundle v6.

boroth avatar Jan 09 '23 16:01 boroth

Thanks @oleg-andreyev and @saalexeev , do you have any idea how I can avoid downtime (0 results nodes) when using aliases with the above configuration ?

Hi. I found a solution

https://gist.github.com/vladdnepr/728e1b7b428923443f25d06984034f7a

Hope this helps

vladdnepr avatar Apr 15 '23 11:04 vladdnepr

Hey, thanks for this guide, I used it. I have another issue regarding this using style; I use messenger for populate command but, an entity is updating without using messenger system when is update on the database(orm).

✅ The populate command is processed through messenger. it is fine. ❌ Updates to the database are processing outside the queue system.

my fos_elastica.yaml like that.

fos_elastica:
    messenger:
        enabled: true
            persistence:
                driver: orm
                model:  App\Entity\User
                identifier: id
                listener:
                    defer: true
                provider: ~
                finder: ~
                elastica_to_model_transformer:
                    ignore_missing: true

What am I missing?

inarli avatar Jun 25 '23 21:06 inarli

@inarli , as far as I know, the messenger is only used for the populate command, it doesn't affect the normal persister when just updating entities through the ORM.

boroth avatar Jun 26 '23 13:06 boroth

yes, it seems like as you say.

@inarli , as far as I know, the messenger is only used for the populate command, it doesn't affect the normal persister when just updating entities through the ORM.

yes, it seems like as you say.

inarli avatar Jun 26 '23 14:06 inarli