messenger-monitor-bundle icon indicating copy to clipboard operation
messenger-monitor-bundle copied to clipboard

Performance issues when `processed_messages` gets "large"

Open bendavies opened this issue 9 months ago • 36 comments

Hi,

Loading the dashboard is taking 20-30 seconds for us when processed_messages becomes a few million. We currently have 6,552,864 rows in that table, so not really that big.

I know this table can be purged, but 6 million rows doesn't seem like enough to warrant that.

query 1 that takes up to 20 seconds:

SELECT DISTINCT
	id_15,
	MIN(sclr_16) AS dctrn_minrownum
FROM
	(
		SELECT
			p0_.run_id AS run_id_0,
			p0_.attempt AS attempt_1,
			p0_.message_type AS message_type_2,
			p0_.description AS description_3,
			p0_.dispatched_at AS dispatched_at_4,
			p0_.received_at AS received_at_5,
			p0_.finished_at AS finished_at_6,
			p0_.wait_time AS wait_time_7,
			p0_.handle_time AS handle_time_8,
			p0_.memory_usage AS memory_usage_9,
			p0_.transport AS transport_10,
			p0_.tags AS tags_11,
			p0_.failure_type AS failure_type_12,
			p0_.failure_message AS failure_message_13,
			p0_.results AS results_14,
			p0_.id AS id_15,
			ROW_NUMBER() OVER (
				ORDER BY
					p0_.finished_at DESC
			) AS sclr_16
		FROM
			processed_messages p0_
	) dctrn_result
GROUP BY
	id_15
ORDER BY
	dctrn_minrownum ASC
LIMIT
	15;

Second query which takes 10 seconds to load the statistics page:

SELECT
	count(DISTINCT p0_.id) AS sclr_0
FROM
	processed_messages p0_
GROUP BY
	p0_.message_type;

I've not looked into improving these yet. just raising for visibility.

Thanks, Ben

bendavies avatar Feb 19 '25 14:02 bendavies

hi @kbond .

I've looked into this. The first query is being produced by LimitSubqueryOutputWalker and I could not make it fast with any indexing.

LimitSubqueryOutputWalker is being used because no resultModifier or hydrationMode is specified. https://github.com/zenstruck/collection/blob/fc126936e76d24ba6c5ad49df6b1e7eefe9cb62c/src/Collection/Doctrine/ORM/EntityResult.php#L331-L333

So, I've forced setUseOutputWalkers(false) to be called by overriding the controller action and specifying a modifier which does nothing:

    #[Route('/_recent-messages', name: 'zenstruck_messenger_monitor_recent_messages_widget')]
    public function recentMessagesWidget(
        ViewHelper $helper,
    ): Response {
        return $this->render('@ZenstruckMessengerMonitor/components/recent_messages.html.twig', [
            'messages' => Specification::new()
                ->snapshot($helper->storage())
                ->messages()
                ->as(static fn (ProcessedMessage $data): ProcessedMessage => $data),
            'helper' => $helper,
        ]);
    }

I've also created an index on finished_at:

CREATE INDEX idx_processed_messages_finished_at ON public.processed_messages (finished_at);

I can't see a nicer way of fixing this at the moment, but this takes the recent-messages action from 30 seconds to ~100ms.

What do you think is the proper way to fix this?

bendavies avatar Mar 19 '25 12:03 bendavies

Hey Ben, thanks for looking into this!

https://github.com/zenstruck/collection/blob/fc126936e76d24ba6c5ad49df6b1e7eefe9cb62c/src/Collection/Doctrine/ORM/EntityResult.php#L331-L333

So, I've forced setUseOutputWalkers(false) to be called by overriding the controller action and specifying a modifier which does nothing

To clarify, setting setUseOutputWalkers(false) in zenstruck/collection reduces from 30 seconds to 100ms or the index or both together?

I've also created an index on finished_at:

I don't believe I can add this to the mapped entity in the bundle but can document this for sure!

kbond avatar Mar 19 '25 12:03 kbond

To clarify, setting setUseOutputWalkers(false) in zenstruck/collection reduces from 30 seconds to 100ms or the index or both together?

calling setUseOutputWalkers(false) causes LimitSubqueryOutputWalker to not be used which changes the sql from:

SELECT DISTINCT
	id_15,
	MIN(sclr_16) AS dctrn_minrownum
FROM
	(
		SELECT
			p0_.run_id AS run_id_0,
			p0_.attempt AS attempt_1,
			p0_.message_type AS message_type_2,
			p0_.description AS description_3,
			p0_.dispatched_at AS dispatched_at_4,
			p0_.received_at AS received_at_5,
			p0_.finished_at AS finished_at_6,
			p0_.wait_time AS wait_time_7,
			p0_.handle_time AS handle_time_8,
			p0_.memory_usage AS memory_usage_9,
			p0_.transport AS transport_10,
			p0_.tags AS tags_11,
			p0_.failure_type AS failure_type_12,
			p0_.failure_message AS failure_message_13,
			p0_.results AS results_14,
			p0_.id AS id_15,
			ROW_NUMBER() OVER (
				ORDER BY
					p0_.finished_at DESC
			) AS sclr_16
		FROM
			processed_messages p0_
	) dctrn_result
GROUP BY
	id_15
ORDER BY
	dctrn_minrownum ASC
LIMIT
	15;

to:

SELECT DISTINCT
	p0_.id AS id_0,
	p0_.finished_at AS finished_at_1
FROM
	processed_messages p0_
ORDER BY
	p0_.finished_at DESC
LIMIT
	15;

but that's still pretty slow on a large table, so the index on finished_at makes it fast

bendavies avatar Mar 19 '25 12:03 bendavies

I don't believe I can add this to the mapped entity in the bundle but can document this for

I think you can probably l isten to postGenerateSchema and do

$table = $schema->getTable('processed_messages');
$table->addIndex(['finished_at'], 'idx_processed_messages_finished_at');

bendavies avatar Mar 19 '25 12:03 bendavies

So an index w/o setUseOutputWalkers(false) doesn't improve performance?

I think you can probably l isten to postGenerateSchema and do

We can't assume the user named the table processed_messages. I think the best we can do is add to this code block:

// src/Entity/ProcessedMessage.php

namespace App\Entity;

use Zenstruck\Messenger\Monitor\History\Model\ProcessedMessage as BaseProcessedMessage;
use Doctrine\ORM\Mapping as ORM;

#[ORM\Entity(readOnly: true)]
#[ORM\Table('processed_messages')]
#[ORM\Index(name: 'idx_processed_messages_finished_at', fields: ["finishedAt"])] // !add!
class ProcessedMessage extends BaseProcessedMessage
{
    #[ORM\Id]
    #[ORM\GeneratedValue]
    #[ORM\Column]
    private ?int $id = null;

    public function id(): ?int
    {
        return $this->id;
    }
}

kbond avatar Mar 19 '25 13:03 kbond

So an index w/o setUseOutputWalkers(false) doesn't improve performance?

It does not. It speeds up the inner select of the previous query, but the MIN(sclr_16) AS dctrn_minrownum and GROUP BY id_15 causes a sort on (min((row_number() OVER (?)))), p0_.id (which is then done on disk)

https://explain.depesz.com/s/spOe

I think you can probably l isten to postGenerateSchema and do

We can't assume the user named the table processed_messages. I think the best we can do is add to this code block:

// src/Entity/ProcessedMessage.php

namespace App\Entity;

use Zenstruck\Messenger\Monitor\History\Model\ProcessedMessage as BaseProcessedMessage; use Doctrine\ORM\Mapping as ORM;

#[ORM\Entity(readOnly: true)] #[ORM\Table('processed_messages')] #[ORM\Index(name: 'idx_processed_messages_finished_at', fields: ["finishedAt"])] // !add! class ProcessedMessage extends BaseProcessedMessage { #[ORM\Id] #[ORM\GeneratedValue] #[ORM\Column] private ?int $id = null;

public function id(): ?int
{
    return $this->id;
}

}

nice!

bendavies avatar Mar 19 '25 14:03 bendavies

What about using ->asArray()?

    #[Route('/_recent-messages', name: 'zenstruck_messenger_monitor_recent_messages_widget')]
    public function recentMessagesWidget(
        ViewHelper $helper,
    ): Response {
        return $this->render('@ZenstruckMessengerMonitor/components/recent_messages.html.twig', [
            'messages' => Specification::new()->snapshot($helper->storage())->messages()->asArray(),
            'helper' => $helper,
        ]);
    }

kbond avatar Mar 19 '25 14:03 kbond

no that doesn't work as the templates use methods in the entities.

bendavies avatar Mar 19 '25 14:03 bendavies

Oh right, I misread your code above.

So is this slowness only on the dashboard - not on /history?

kbond avatar Mar 19 '25 14:03 kbond

In zenstruck/collection, I already have EntityResult->disableFetchJoins() which modifies the behaviour of the paginator. I'm thinking to add EntityResult->enableOutputWalkers() && EntityResult->enableOutputWalkers() also. Then, in this bundle, use ->disableOutputWalkers() where applicable.

kbond avatar Mar 19 '25 15:03 kbond

So is this slowness only on the dashboard - not on /history?

yes, history as well, actually. but it doesn't look like i can apply my fix as easily there.

In zenstruck/collection, I already have EntityResult->disableFetchJoins() which modifies the behaviour of the paginator. I'm thinking to add EntityResult->enableOutputWalkers() && EntityResult->enableOutputWalkers() also. Then, in this bundle, use ->disableOutputWalkers() where applicable.

ah, so the fix can be changed to use disableFetchJoins instead:

        return $this->render('@ZenstruckMessengerMonitor/components/recent_messages.html.twig', [
            /** @phpstan-ignore method.notFound */
            'messages' => Specification::new()
                ->snapshot($helper->storage())
                ->messages()
                ->disableFetchJoins(),
            'helper' => $helper,
        ]);

bendavies avatar Mar 19 '25 15:03 bendavies

ah, so the fix can be changed to use disableFetchJoins instead:

So wait, ->disableFetchJoins() + adding an index to finishedAt has the same effect?

kbond avatar Mar 19 '25 15:03 kbond

yes because disable fetch joins will also stop the paginator rewriting the query https://github.com/doctrine/orm/blob/3.3.x/src/Tools/Pagination/Paginator.php#L115

bendavies avatar Mar 19 '25 15:03 bendavies

we always want to disable fetchJoinCollection in this bundle as ProcessedMessages has no relations

bendavies avatar Mar 19 '25 16:03 bendavies

Ah ok, cool, so to fix this, change:

https://github.com/zenstruck/messenger-monitor-bundle/blob/fd3fcc0929f0a91f2fd3edb6fe954be706874547/src/History/Storage/ORMStorage.php#L53

to

return (new EntityResult($this->queryBuilderFor($specification))->disableFetchJoins();

and document adding the index?

kbond avatar Mar 19 '25 16:03 kbond

that won't fix history will it?

bendavies avatar Mar 19 '25 16:03 bendavies

I think it would - at least for query that lists messages. Wondering if the other queries (methods in ORMStorage) can be optimized?

kbond avatar Mar 19 '25 16:03 kbond

I'll need you to confirm on your giant database 😆

kbond avatar Mar 19 '25 16:03 kbond

you are right, it does fix it, and you are correct again, there are other slow queries on the history page

Image

bendavies avatar Mar 19 '25 16:03 bendavies

Might need some additional indexes?

kbond avatar Mar 19 '25 16:03 kbond

SELECT COUNT(p0_.finished_at) AS sclr_0 FROM processed_messages p0_ WHERE p0_.failure_type IS NULL LIMIT 1

SELECT COUNT(p0_.finished_at) AS sclr_0 FROM processed_messages p0_ WHERE p0_.failure_type IS NOT NULL LIMIT 1

these can be sped up with an index on failure_type and changing COUNT(p0_.finished_at) to COUNT(*)

bendavies avatar Mar 19 '25 16:03 bendavies

SELECT AVG(p0_.wait_time) AS sclr_0 FROM processed_messages p0_ LIMIT 1;
SELECT AVG(p0_.handle_time) AS sclr_0 FROM processed_messages p0_ LIMIT 1;

cannot be sped up. a better way to do with would be to store a running average in a separate statistics table.

bendavies avatar Mar 19 '25 16:03 bendavies

SELECT COUNT(*) AS dctrn_count FROM (SELECT DISTINCT id_15 FROM (SELECT p0_.run_id AS run_id_0, p0_.attempt AS attempt_1, p0_.message_type AS message_type_2, p0_.description AS description_3, p0_.dispatched_at AS dispatched_at_4, p0_.received_at AS received_at_5, p0_.finished_at AS finished_at_6, p0_.wait_time AS wait_time_7, p0_.handle_time AS handle_time_8, p0_.memory_usage AS memory_usage_9, p0_.transport AS transport_10, p0_.tags AS tags_11, p0_.failure_type AS failure_type_12, p0_.failure_message AS failure_message_13, p0_.results AS results_14, p0_.id AS id_15 FROM processed_messages p0_ ORDER BY p0_.finished_at DESC) dctrn_result) dctrn_table

done by https://github.com/doctrine/orm/blob/3.3.x/src/Tools/Pagination/CountOutputWalker.php

looking into this

bendavies avatar Mar 19 '25 16:03 bendavies

that comes from here

so OutputWalkers need to be disabled here for CountOutputWalker to not be used.

bendavies avatar Mar 19 '25 16:03 bendavies

these can be sped up with an index on failure_type and changing COUNT(p0_.finished_at) to COUNT(*)

I feel like there was a problem using COUNT(*) but I'll check. Does just adding an index to failure_type help at all?

so OutputWalkers need to be disabled here for CountOutputWalker to not be used.

Ok, so I'll have to add this method to zenstruck/collection, then use it in ORMStorage->filter().

cannot be sped up. a better way to do with would be to store a running average in a separate statistics table.

Hmm, that's a hard one - don't think we'll be able to improve this in a generic way.

kbond avatar Mar 19 '25 20:03 kbond

these can be sped up with an index on failure_type and changing COUNT(p0_.finished_at) to COUNT(*)

I feel like there was a problem using COUNT(*) but I'll check. Does just adding an index to failure_type help at all?

Index helps but count(*) is twice as fast on my database..

so OutputWalkers need to be disabled here for CountOutputWalker to not be used.

Ok, so I'll have to add this method to zenstruck/collection, then use it in ORMStorage->filter().

cannot be sped up. a better way to do with would be to store a running average in a separate statistics table.

Hmm, that's a hard one - don't think we'll be able to improve this in a generic way.

Should be pretty easy? We can store and calculate a cumulative average.

CREATE TABLE running_averages (
    metric_name VARCHAR(100) PRIMARY KEY,
    current_average NUMERIC NOT NULL,
    count INTEGER NOT NULL
);

UPDATE running_averages
SET 
    current_average = ((current_average * count) + new_value) / (count + 1),
    count = count + 1
WHERE metric_name = 'your_metric';

This approach is efficient because you don't need to store all the individual values - just the current average and count.

The math behind the update:

The sum of all existing values = current_average × count When we add the new value, the new sum = (current_average × count) + new_value The new count = count + 1 The new average = new sum ÷ new count

bendavies avatar Mar 19 '25 21:03 bendavies

And just thought that this could quite easily be extended to store an average per message type.

bendavies avatar Mar 19 '25 21:03 bendavies

these can be sped up with an index on failure_type and changing COUNT(p0_.finished_at) to COUNT(*)

I feel like there was a problem using COUNT(*) but I'll check. Does just adding an index to failure_type help at all?

using COUNT(p0_.finished_at): 3,826.321 ms doesn't appear to use the index https://explain.depesz.com/s/sCYX

using COUNT(*): 282.213 ms https://explain.depesz.com/s/LCdm

bendavies avatar Mar 19 '25 22:03 bendavies

We can store and calculate a cumulative average.

What about the "period" averages/counts?

Should be pretty easy?

I mean hard to make this compatible with all database platforms. We'd need to create a new entity and make sure all the queries can be made with the out of the box orm query builder.

This would also now create 2 queries for every message processed.

kbond avatar Mar 20 '25 02:03 kbond

We can store and calculate a cumulative average.

What about the "period" averages/counts?

I forgot about those. it's certainly possible. We could store averages aggregated per day. Then querying for averages over the last day/week/month/year would be easy.

(for the "last hour" period we'd still have to query the source data)

Should be pretty easy?

I mean hard to make this compatible with all database platforms. We'd need to create a new entity and make sure all the queries can be made with the out of the box orm query builder.

This would also now create 2 queries for every message processed.

Why orm query builder? Are you opposed to dql/sql queries? as long as the sql is portable ofcourse.

bendavies avatar Mar 20 '25 22:03 bendavies