es query incorrectly using the pg `primary_key` on update operations [elasticsearch.exceptions.RequestError: RequestError]
PGSync version: 2.4.0
Postgres version: debezium/postgres:15
Elasticsearch version: 7.17.7
Redis version: 7.0
Python version: 3.7
Problem Description:
es query incorrectly using the pg primary_key in the search doc on update operations, even though the primary_key is not referenced in the postgres update command.
Here's a rough skeleton of the schema
{
"database": "postgres",
"index": "events",
"nodes": {
"table": "Events",
"schema": "prd_profile",
"columns": [
"ID",
"State",
"Type",
"Title"
],
"transform":{
"mapping": {
"eventid":{"type":"keyword"},
"State":{"type":"integer"},
"Type":{"type":"integer"},
"Title":{
"type": "search_as_you_type",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
} },
"children": [
{
"table": "Organizations",
"schema": "prd_profile",
"columns": [
"ID",
"ProfileID",
"TwitterID"
],
"transform":{
"rename":{
"ID": "org_id",
"ProfileID": "org_profileid",
"TwitterID": "org_twitterid"},
"mapping": {
"orgid":{"type":"keyword"},
"org_profileid":{"type":"keyword"},
"org_twitterid":{"type":"keyword"} }}
,
"relationship": {
"variant": "object",
"type": "one_to_one",
"foreign_key": {
"parent": ["OrgID"],
"child": ["ID"]
}
},
"children": [
{"table": "OrgProfiles",
"schema": "prd_profile",
"columns": [
"DisplayName",
"ProfileID"
],
"transform":{
"mapping": {
"Sector":{"type":"keyword"},
"ProfileID":{"type":"keyword"},
"ProfilePicture":{"type":"text"}
}} ,
"relationship": {
"variant": "object",
"type": "one_to_one",
"foreign_key": {
"parent": ["ProfileID"],
"child": ["ProfileID"]
}
}}
]
}
]
}
}
When running this update query:
update prd_profile."OrgProfiles" set "DisplayName" = 'test' where "ProfileID" = '5ebbb9fa-9d33-4741-a22d-2a7dc4b95c0c';
elasticsearch dsl query generated tries to map the ProfileID value (which is not the PK) to the DBID value (the PK)
2022-12-16 04:16:22.607:DEBUG:elasticsearch: > {"query":{"bool":{"filter":[{"bool":{"should":[{"terms":{"_meta.Organizations.DBID":["5ebbb9fa-9d33-4741-a22d-2a7dc4b95c0c"]}},{"terms":{"_meta.Organizations.DBID.keyword":["5ebbb9fa-9d33-4741-a22d-2a7dc4b95c0c"]}}]}}]}},"_source":{"excludes":["*"]},"sort":"_doc"}
Error Message (if any):
2022-12-16 04:16:22.607:DEBUG:elasticsearch: > {"query":{"bool":{"filter":[{"bool":{"should":[{"terms":{"_meta.Organizations.DBID":["5ebbb9fa-9d33-4741-a22d-2a7dc4b95c0c"]}},{"terms":{"_meta.Organizations.DBID.keyword":["5ebbb9fa-9d33-4741-a22d-2a7dc4b95c0c"]}}]}}]}},"_source":{"excludes":["*"]},"sort":"_doc"}
2022-12-16 04:16:22.676:ERROR:pgsync.elastichelper: Exception RequestError(400, 'search_phase_execution_exception', 'failed to create query: For input string: "5ebbb9fa-9d33-4741-a22d-2a7dc4b95c0c"')
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/pgsync/elastichelper.py", line 141, in bulk
raise_on_error=raise_on_error,
File "/usr/local/lib/python3.7/site-packages/pgsync/elastichelper.py", line 196, in _bulk
ignore_status=ignore_status,
File "/usr/local/lib/python3.7/site-packages/elasticsearch/helpers/actions.py", line 484, in parallel_bulk
actions, chunk_size, max_chunk_bytes, client.transport.serializer
File "/usr/local/lib/python3.7/multiprocessing/pool.py", line 748, in next
raise value
File "/usr/local/lib/python3.7/multiprocessing/pool.py", line 121, in worker
result = (True, func(*args, **kwds))
File "/usr/local/lib/python3.7/multiprocessing/pool.py", line 140, in _helper_reraises_exception
raise ex
File "/usr/local/lib/python3.7/multiprocessing/pool.py", line 292, in _guarded_task_generation
for i, x in enumerate(iterable):
File "/usr/local/lib/python3.7/site-packages/elasticsearch/helpers/actions.py", line 155, in _chunk_actions
for action, data in actions:
File "/usr/local/lib/python3.7/site-packages/pgsync/sync.py", line 805, in _payloads
payloads,
File "/usr/local/lib/python3.7/site-packages/pgsync/sync.py", line 609, in _update_op
fields,
File "/usr/local/lib/python3.7/site-packages/pgsync/elastichelper.py", line 232, in _search
for hit in search.scan():
File "/usr/local/lib/python3.7/site-packages/elasticsearch_dsl/search.py", line 731, in scan
for hit in scan(es, query=self.to_dict(), index=self._index, **self._params):
File "/usr/local/lib/python3.7/site-packages/elasticsearch/helpers/actions.py", line 555, in scan
body=query, scroll=scroll, size=size, request_timeout=request_timeout, **kwargs
File "/usr/local/lib/python3.7/site-packages/elasticsearch/client/utils.py", line 168, in _wrapped
return func(*args, params=params, headers=headers, **kwargs)
File "/usr/local/lib/python3.7/site-packages/elasticsearch/client/__init__.py", line 1675, in search
body=body,
File "/usr/local/lib/python3.7/site-packages/elasticsearch/transport.py", line 415, in perform_request
raise e
File "/usr/local/lib/python3.7/site-packages/elasticsearch/transport.py", line 388, in perform_request
timeout=timeout,
File "/usr/local/lib/python3.7/site-packages/elasticsearch/connection/http_urllib3.py", line 277, in perform_request
self._raise_error(response.status, raw_data)
File "/usr/local/lib/python3.7/site-packages/elasticsearch/connection/base.py", line 331, in _raise_error
status_code, error_message, additional_info
elasticsearch.exceptions.RequestError: RequestError(400, 'search_phase_execution_exception', 'failed to create query: For input string: "5ebbb9fa-9d33-4741-a22d-2a7dc4b95c0c"')
I think this was reported in #352 and fixed in this commit 3fe7ffeb
Can you please try the main branch and confirm if this resolved the issue?