pgsync icon indicating copy to clipboard operation
pgsync copied to clipboard

Error syncing up when materialized view record no longer exists or removed

Open camilomontoyau opened this issue 3 years ago • 2 comments

PGSync version: 2.3.2

Postgres version: 13.2

Elasticsearch version: 7.17.4

Redis version: 7.0.4

Python version: 3.10.6

Problem Description: I'm only mapping a materialized view, this materialized view has a unique index but, from what I know, materialized views can't have primary keys Error happens when materialized view is refreshed and one or some of its records disappeared or got removed. I'm noticing that pgsync only checks for primary keys but it doesn't check for unique keys which makes it throw an error when trying to remove the deleted record from elasticsearch

Screen Shot 2022-09-01 at 9 52 13 AM

Error Message (if any):

2022-09-01 15:35:01.878:ERROR:pgsync.elastichelper: Exception 'id'
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/pgsync/elastichelper.py", line 130, in bulk
    self._bulk(
  File "/usr/local/lib/python3.10/site-packages/pgsync/elastichelper.py", line 187, in _bulk
    for _ in helpers.parallel_bulk(
  File "/usr/local/lib/python3.10/site-packages/elasticsearch/helpers/actions.py", line 472, in parallel_bulk
    for result in pool.imap(
  File "/usr/local/lib/python3.10/multiprocessing/pool.py", line 873, in next
    raise value
  File "/usr/local/lib/python3.10/multiprocessing/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
  File "/usr/local/lib/python3.10/multiprocessing/pool.py", line 144, in _helper_reraises_exception
    raise ex
  File "/usr/local/lib/python3.10/multiprocessing/pool.py", line 391, in _guarded_task_generation
    for i, x in enumerate(iterable):
  File "/usr/local/lib/python3.10/site-packages/elasticsearch/helpers/actions.py", line 155, in _chunk_actions
    for action, data in actions:
  File "/usr/local/lib/python3.10/site-packages/pgsync/sync.py", line 841, in _payloads
    filters = self._delete_op(
  File "/usr/local/lib/python3.10/site-packages/pgsync/sync.py", line 666, in _delete_op
    self.root_primary_values: list = [
  File "/usr/local/lib/python3.10/site-packages/pgsync/sync.py", line 667, in <listcomp>
    payload_data[key] for key in self.root.model.primary_keys
KeyError: 'id'
 0:00:00.233008 (0.23 sec)
Traceback (most recent call last):
  File "/usr/local/bin/pgsync", line 7, in <module>
    sync.main()
  File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1130, in __call__
    return self.main(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1055, in main
    rv = self.invoke(ctx)
  File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1404, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/usr/local/lib/python3.10/site-packages/click/core.py", line 760, in invoke
    return __callback(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/pgsync/sync.py", line 1416, in main
    sync.pull()
  File "/usr/local/lib/python3.10/site-packages/pgsync/sync.py", line 1221, in pull
    self.logical_slot_changes(txmin=txmin, txmax=txmax)
  File "/usr/local/lib/python3.10/site-packages/pgsync/sync.py", line 435, in logical_slot_changes
    self.es.bulk(self.index, self._payloads(payloads))
  File "/usr/local/lib/python3.10/site-packages/pgsync/elastichelper.py", line 130, in bulk
    self._bulk(
  File "/usr/local/lib/python3.10/site-packages/pgsync/elastichelper.py", line 187, in _bulk
    for _ in helpers.parallel_bulk(
  File "/usr/local/lib/python3.10/site-packages/elasticsearch/helpers/actions.py", line 472, in parallel_bulk
    for result in pool.imap(
  File "/usr/local/lib/python3.10/multiprocessing/pool.py", line 873, in next
    raise value
  File "/usr/local/lib/python3.10/multiprocessing/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
  File "/usr/local/lib/python3.10/multiprocessing/pool.py", line 144, in _helper_reraises_exception
    raise ex
  File "/usr/local/lib/python3.10/multiprocessing/pool.py", line 391, in _guarded_task_generation
    for i, x in enumerate(iterable):
  File "/usr/local/lib/python3.10/site-packages/elasticsearch/helpers/actions.py", line 155, in _chunk_actions
    for action, data in actions:
  File "/usr/local/lib/python3.10/site-packages/pgsync/sync.py", line 841, in _payloads
    filters = self._delete_op(
  File "/usr/local/lib/python3.10/site-packages/pgsync/sync.py", line 666, in _delete_op
    self.root_primary_values: list = [
  File "/usr/local/lib/python3.10/site-packages/pgsync/sync.py", line 667, in <listcomp>
    payload_data[key] for key in self.root.model.primary_keys
KeyError: 'id'

camilomontoyau avatar Sep 01 '22 16:09 camilomontoyau

So, I have a similar problem, but I am on the main branch of the project. There the code you shared in the screenshot seems to have changed. For me, removal of the entry in the materialized view does not result in an error but is simply not propagated to elasticsearch which means I can still get the removed record as result of an elasticsearch query.

You could try your code on the main branch by installing pgsync the following way and see if you get the same results.

pip install git+https://github.com/toluaina/pgsync

More details

Here are some more details to the issue I encounter:

My postgres base table could be simplified to the following:

  • id
  • title
  • category_id
  • verified_at (null by default, a date when set)
  • deleted_at (null by default, a date when set)

Then, I have a materialized view which only returns the latest verified (but not deleted) entry for a category. Additionally there is an unique index on the id coulum of the materialized view.

Query for the materilaized view
  CREATE MATERIALIZED VIEW view_name AS
  SELECT
      DISTINCT ON (
          category_id
      ) *
  FROM
      base_table
  WHERE
      verified_at IS NOT NULL
      deleted_at IS NULL
  ORDER BY
      category_id,
      verified_at DESC

Here the distinct on (category_id) ensures that only one value for each category is returned and the sort makes sure the latest verified object is in the materialized view.

Now, when an entry in the base table is updated and its deleted_at property is set to a date and subsequently a update to the materialized view is performed, the entry no longer is in the materialized view, as expected. However, pgsync does not propagate this change to elasticsearch and it is still available there.

Now, in contrast, when I only update the title field of an entry in the base table and the materialized view, this change will be passed through to elasticsearch.

Also, when I actually delete an entry from the base table and subsequently the materilaized view, this entry will also be deleted from elasticsearch.

So it seems there only is a problem when updating on the base table results in a deletion from the materialized view.

Further issue

When through the update another entry is removed from the materialized view (in the example because it is an older entry for the category and therefore should be ignored from now on) this is not synced as well. The same goes for an insertion of an entry to the materialized view after another row was deleted (the new entry for the category was deleted/or its deleted_at column set to a value and now the older entry should be shown again) though I only had limited testing ability for this since the entry did not get removed in the first place.

However, I am not sure how far this behaviour is supposed to be covered by pgsync or in the scope of this issue. If the latter is not the case I would be open to create a seperate issue for this.

niklhut avatar Sep 12 '22 16:09 niklhut

So you suggesting that you deleted a row and this was not deleted in Elasticsearch? Are you able to put together a working example? Are you manually changing the materialized view?

toluaina avatar Oct 05 '22 20:10 toluaina