meilisync
meilisync copied to clipboard
error: required argument is not an integer
run meilisync start
, init sync mysql data success.
after start increment sync data from "mysql" to MeiliSearch..., crash!
What's your mysql version? Did you enable binlog as row format?
What's your mysql version? Did you enable binlog as row format?
- mysql Ver 8.0.32-0ubuntu0.22.04.2 for Linux on x86_64 ((Ubuntu))
- binlog is row format
confusing...
Thanks for your reply. Does mysql need to check other config?
No, looks like a proto error, but I never see it in my local
Thiis is the binlog detail config.
This is mine:
Ok, thanks, I will check this.
Having the same issue
2024-03-01 18:05:36.262 | INFO | meilisync.main:_:101 - Start increment sync data from "SourceType.mysql" to MeiliSearch...
╭───────────────────── Traceback (most recent call last) ──────────────────────╮
│ /meilisync/meilisync/main.py:140 in start │
│ │
│ 137 │ │ lock = asyncio.Lock() │
│ 138 │ │ await asyncio.gather(_(), interval()) │
│ 139 │ │
│ ❱ 140 │ asyncio.run(run()) │
│ 141 │
│ 142 │
│ 143 @app.command(help="Refresh all data by swap index") │
│ │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │ _ = <function start.<locals>._ at 0x7f4cd1568a40> │ │
│ │ collection = <meilisync.event.EventCollection object at │ │
│ │ 0x7f4cd2bdf9e0> │ │
│ │ context = <click.core.Context object at 0x7f4cd2d2d7c0> │ │
│ │ current_progress = { │ │
│ │ │ 'master_log_file': 'binlog.022055', │ │
│ │ │ 'master_log_position': 839805975 │ │
│ │ } │ │
│ │ interval = <function start.<locals>.interval at 0x7f4cd1568e00> │ │
│ │ lock = <asyncio.locks.Lock object at 0x7f4cd1576090 │ │
│ │ [unlocked]> │ │
│ │ meili = <meilisync.meili.Meili object at 0x7f4cd2bdf920> │ │
│ │ meili_settings = MeiliSearch( │ │
│ │ │ api_url='http://127.0.0.1:7708', │ │
│ │ │ api_key='UYo8!74f3I@UYG87wf8uiy23o*uygffweg', │ │
│ │ │ insert_size=1000, │ │
│ │ │ insert_interval=10 │ │
│ │ ) │ │
│ │ progress = <meilisync.progress.file.File object at │ │
│ │ 0x7f4cd5b16900> │ │
│ │ run = <function start.<locals>.run at 0x7f4cd156a340> │ │
│ │ settings = Settings( │ │
│ │ │ plugins=[], │ │
│ │ │ progress=Progress( │ │
│ │ │ │ type=<ProgressType.file: 'file'> │ │
│ │ │ ), │ │
│ │ │ debug=False, │ │
│ │ │ source=Source( │ │
│ │ │ │ type=<SourceType.mysql: 'mysql'>, │ │
│ │ │ │ database='inextapi', │ │
│ │ │ │ host='127.0.0.1', │ │
│ │ │ │ port=3306, │ │
│ │ │ │ user='root', │ │
│ │ │ │ password='NfvljiJOp9erghpu9HJLBIFIPURwgbpu3' │ │
│ │ │ ), │ │
│ │ │ meilisearch=MeiliSearch( │ │
│ │ │ │ api_url='http://127.0.0.1:7708', │ │
│ │ │ │ api_key='UYo8!74f3I@UYG87wf8uiy23o*uygffweg', │ │
│ │ │ │ insert_size=1000, │ │
│ │ │ │ insert_interval=10 │ │
│ │ │ ), │ │
│ │ │ sync=[ │ │
│ │ │ │ Sync( │ │
│ │ │ │ │ plugins=[], │ │
│ │ │ │ │ table='macs', │ │
│ │ │ │ │ pk='id', │ │
│ │ │ │ │ full=True, │ │
│ │ │ │ │ index=None, │ │
│ │ │ │ │ fields={ │ │
│ │ │ │ │ │ 'id': None, │ │
│ │ │ │ │ │ 'mac': None, │ │
│ │ │ │ │ │ 'soc_serial': None, │ │
│ │ │ │ │ │ 'sn': None, │ │
│ │ │ │ │ │ 'last_ip': None, │ │
│ │ │ │ │ │ 'isp_id': None │ │
│ │ │ │ │ } │ │
│ │ │ │ ) │ │
│ │ │ ], │ │
│ │ │ sentry=None │ │
│ │ ) │ │
│ │ source = <meilisync.source.mysql.MySQL object at │ │
│ │ 0x7f4cd2157da0> │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /usr/local/lib/python3.12/asyncio/runners.py:194 in run │
│ │
│ 191 │ │ │ "asyncio.run() cannot be called from a running event loop" │
│ 192 │ │
│ 193 │ with Runner(debug=debug, loop_factory=loop_factory) as runner: │
│ ❱ 194 │ │ return runner.run(main) │
│ 195 │
│ 196 │
│ 197 def _cancel_all_tasks(loop): │
│ │
│ ╭──────────────────────────────── locals ────────────────────────────────╮ │
│ │ debug = None │ │
│ │ loop_factory = None │ │
│ │ main = <coroutine object start.<locals>.run at 0x7f4cd152f5a0> │ │
│ │ runner = <asyncio.runners.Runner object at 0x7f4cd2bdf950> │ │
│ ╰────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /usr/local/lib/python3.12/asyncio/runners.py:118 in run │
│ │
│ 115 │ │ │
│ 116 │ │ self._interrupt_count = 0 │
│ 117 │ │ try: │
│ ❱ 118 │ │ │ return self._loop.run_until_complete(task) │
│ 119 │ │ except exceptions.CancelledError: │
│ 120 │ │ │ if self._interrupt_count > 0: │
│ 121 │ │ │ │ uncancel = getattr(task, "uncancel", None) │
│ │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │ context = <_contextvars.Context object at 0x7f4cd1595f00> │ │
│ │ coro = <coroutine object start.<locals>.run at 0x7f4cd152f5a0> │ │
│ │ self = <asyncio.runners.Runner object at 0x7f4cd2bdf950> │ │
│ │ sigint_handler = functools.partial(<bound method Runner._on_sigint of │ │
│ │ <asyncio.runners.Runner object at 0x7f4cd2bdf950>>, │ │
│ │ main_task=<Task finished name='Task-4' │ │
│ │ coro=<start.<locals>.run() done, defined at │ │
│ │ /meilisync/meilisync/main.py:135> │ │
│ │ exception=error('required argument is not an │ │
│ │ integer')>) │ │
│ │ task = <Task finished name='Task-4' coro=<start.<locals>.run() │ │
│ │ done, defined at /meilisync/meilisync/main.py:135> │ │
│ │ exception=error('required argument is not an integer')> │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /usr/local/lib/python3.12/asyncio/base_events.py:685 in run_until_complete │
│ │
│ 682 │ │ if not future.done(): │
│ 683 │ │ │ raise RuntimeError('Event loop stopped before Future comp │
│ 684 │ │ │
│ ❱ 685 │ │ return future.result() │
│ 686 │ │
│ 687 │ def stop(self): │
│ 688 │ │ """Stop running the event loop. │
│ │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │ future = <Task finished name='Task-4' coro=<start.<locals>.run() done, │ │
│ │ defined at /meilisync/meilisync/main.py:135> │ │
│ │ exception=error('required argument is not an integer')> │ │
│ │ new_task = False │ │
│ │ self = <_UnixSelectorEventLoop running=False closed=True │ │
│ │ debug=False> │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /meilisync/meilisync/main.py:138 in run │
│ │
│ 135 │ async def run(): │
│ 136 │ │ nonlocal lock │
│ 137 │ │ lock = asyncio.Lock() │
│ ❱ 138 │ │ await asyncio.gather(_(), interval()) │
│ 139 │ │
│ 140 │ asyncio.run(run()) │
│ 141 │
│ │
│ ╭────────────────────────────── locals ───────────────────────────────╮ │
│ │ _ = <function start.<locals>._ at 0x7f4cd1568a40> │ │
│ │ interval = <function start.<locals>.interval at 0x7f4cd1568e00> │ │
│ │ lock = <asyncio.locks.Lock object at 0x7f4cd1576090 [unlocked]> │ │
│ ╰─────────────────────────────────────────────────────────────────────╯ │
│ │
│ /meilisync/meilisync/main.py:102 in _ │
│ │
│ 99 │ │ │ │ │ │ f'No data found for table "{settings.source.da │
│ 100 │ │ │ │ │ ) │
│ 101 │ │ logger.info(f'Start increment sync data from "{settings.source │
│ ❱ 102 │ │ async for event in source: │
│ 103 │ │ │ if settings.debug: │
│ 104 │ │ │ │ logger.debug(event) │
│ 105 │ │ │ current_progress = event.progress │
│ │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │ collection = <meilisync.event.EventCollection object at │ │
│ │ 0x7f4cd2bdf9e0> │ │
│ │ current_progress = { │ │
│ │ │ 'master_log_file': 'binlog.022055', │ │
│ │ │ 'master_log_position': 839805975 │ │
│ │ } │ │
│ │ event = ProgressEvent( │ │
│ │ │ progress={ │ │
│ │ │ │ 'master_log_file': 'binlog.022055', │ │
│ │ │ │ 'master_log_position': 839805975 │ │
│ │ │ } │ │
│ │ ) │ │
│ │ lock = <asyncio.locks.Lock object at 0x7f4cd1576090 │ │
│ │ [unlocked]> │ │
│ │ meili = <meilisync.meili.Meili object at 0x7f4cd2bdf920> │ │
│ │ meili_settings = MeiliSearch( │ │
│ │ │ api_url='http://127.0.0.1:7708', │ │
│ │ │ api_key='UYo8!74f3I@UYG87wf8uiy23o*uygffweg', │ │
│ │ │ insert_size=1000, │ │
│ │ │ insert_interval=10 │ │
│ │ ) │ │
│ │ progress = <meilisync.progress.file.File object at │ │
│ │ 0x7f4cd5b16900> │ │
│ │ settings = Settings( │ │
│ │ │ plugins=[], │ │
│ │ │ progress=Progress( │ │
│ │ │ │ type=<ProgressType.file: 'file'> │ │
│ │ │ ), │ │
│ │ │ debug=False, │ │
│ │ │ source=Source( │ │
│ │ │ │ type=<SourceType.mysql: 'mysql'>, │ │
│ │ │ │ database='inextapi', │ │
│ │ │ │ host='127.0.0.1', │ │
│ │ │ │ port=3306, │ │
│ │ │ │ user='root', │ │
│ │ │ │ password='NfvljiJOp9erghpu9HJLBIFIPURwgbpu3' │ │
│ │ │ ), │ │
│ │ │ meilisearch=MeiliSearch( │ │
│ │ │ │ api_url='http://127.0.0.1:7708', │ │
│ │ │ │ api_key='UYo8!74f3I@UYG87wf8uiy23o*uygffweg', │ │
│ │ │ │ insert_size=1000, │ │
│ │ │ │ insert_interval=10 │ │
│ │ │ ), │ │
│ │ │ sync=[ │ │
│ │ │ │ Sync( │ │
│ │ │ │ │ plugins=[], │ │
│ │ │ │ │ table='macs', │ │
│ │ │ │ │ pk='id', │ │
│ │ │ │ │ full=True, │ │
│ │ │ │ │ index=None, │ │
│ │ │ │ │ fields={ │ │
│ │ │ │ │ │ 'id': None, │ │
│ │ │ │ │ │ 'mac': None, │ │
│ │ │ │ │ │ 'soc_serial': None, │ │
│ │ │ │ │ │ 'sn': None, │ │
│ │ │ │ │ │ 'last_ip': None, │ │
│ │ │ │ │ │ 'isp_id': None │ │
│ │ │ │ │ } │ │
│ │ │ │ ) │ │
│ │ │ ], │ │
│ │ │ sentry=None │ │
│ │ ) │ │
│ │ source = <meilisync.source.mysql.MySQL object at │ │
│ │ 0x7f4cd2157da0> │ │
│ │ sync = Sync( │ │
│ │ │ plugins=[], │ │
│ │ │ table='macs', │ │
│ │ │ pk='id', │ │
│ │ │ full=True, │ │
│ │ │ index=None, │ │
│ │ │ fields={ │ │
│ │ │ │ 'id': None, │ │
│ │ │ │ 'mac': None, │ │
│ │ │ │ 'soc_serial': None, │ │
│ │ │ │ 'sn': None, │ │
│ │ │ │ 'last_ip': None, │ │
│ │ │ │ 'isp_id': None │ │
│ │ │ } │ │
│ │ ) │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /meilisync/meilisync/source/mysql.py:113 in __aiter__ │
│ │
│ 110 │ │ │ │ │ │ data = event.rows[0]["values"] │
│ 111 │ │ │ │ │ elif isinstance(event, UpdateRowsEvent): │
│ 112 │ │ │ │ │ │ event_type = EventType.update │
│ ❱ 113 │ │ │ │ │ │ data = event.rows[0]["after_values"] │
│ 114 │ │ │ │ │ elif isinstance(event, DeleteRowsEvent): │
│ 115 │ │ │ │ │ │ event_type = EventType.delete │
│ 116 │ │ │ │ │ │ data = event.rows[0]["values"] │
│ │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │ event = <asyncmy.replication.row_events.UpdateRowsEvent object at │ │
│ │ 0x7f4cd115e0c0> │ │
│ │ event_type = <EventType.update: 'update'> │ │
│ │ self = <meilisync.source.mysql.MySQL object at 0x7f4cd2157da0> │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /usr/local/lib/python3.12/site-packages/asyncmy/replication/row_events.py:45 │
│ 3 in rows │
│ │
│ 450 │ @property │
│ 451 │ def rows(self): │
│ 452 │ │ if self._rows is None: │
│ ❱ 453 │ │ │ self._fetch_rows() │
│ 454 │ │ return self._rows │
│ 455 │
│ 456 │
│ │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │ self = <asyncmy.replication.row_events.UpdateRowsEvent object at │ │
│ │ 0x7f4cd115e0c0> │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /usr/local/lib/python3.12/site-packages/asyncmy/replication/row_events.py:44 │
│ 8 in _fetch_rows │
│ │
│ 445 │ │ │ return │
│ 446 │ │ │
│ 447 │ │ while self.packet.read_bytes < self.event_size: │
│ ❱ 448 │ │ │ self._rows.append(self._fetch_one_row()) │
│ 449 │ │
│ 450 │ @property │
│ 451 │ def rows(self): │
│ │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │ self = <asyncmy.replication.row_events.UpdateRowsEvent object at │ │
│ │ 0x7f4cd115e0c0> │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /usr/local/lib/python3.12/site-packages/asyncmy/replication/row_events.py:51 │
│ 4 in _fetch_one_row │
│ │
│ 511 │ │
│ 512 │ def _fetch_one_row(self): │
│ 513 │ │ row = { │
│ ❱ 514 │ │ │ "before_values": self._read_column_data(self.columns_prese │
│ 515 │ │ │ "after_values": self._read_column_data(self.columns_presen │
│ 516 │ │ } │
│ 517 │
│ │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │ self = <asyncmy.replication.row_events.UpdateRowsEvent object at │ │
│ │ 0x7f4cd115e0c0> │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /usr/local/lib/python3.12/site-packages/asyncmy/replication/row_events.py:20 │
│ 9 in _read_column_data │
│ │
│ 206 │ │ │ elif column.type == GEOMETRY: │
│ 207 │ │ │ │ values[name] = self.packet.read_length_coded_pascal_st │
│ 208 │ │ │ elif column.type == JSON: │
│ ❱ 209 │ │ │ │ values[name] = self.packet.read_binary_json(column.len │
│ 210 │ │ │ else: │
│ 211 │ │ │ │ raise NotImplementedError("Unknown MySQL column type: │
│ 212 │
│ │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │ cols_bitmap = b'\xff\xff\xff' │ │
│ │ column = <asyncmy.replication.column.Column object at │ │
│ │ 0x7f4cd115dfa0> │ │
│ │ i = 20 │ │
│ │ name = 'status' │ │
│ │ nb_columns = 24 │ │
│ │ null_bitmap = b'\x04\x00 ' │ │
│ │ null_bitmap_index = 20 │ │
│ │ self = <asyncmy.replication.row_events.UpdateRowsEvent │ │
│ │ object at 0x7f4cd115e0c0> │ │
│ │ unsigned = False │ │
│ │ values = { │ │
│ │ │ 'id': 402723, │ │
│ │ │ 'mac': 'FC:A4:7A:AC:46:64', │ │
│ │ │ 'old_mac': None, │ │
│ │ │ 'sn': 'ITV521801479', │ │
│ │ │ 'soc_serial': 'ac00141568840761b0e', │ │
│ │ │ 'push_client_id': │ │
│ │ 'F5708619141EEBAC0B0AE0C5D70433BF', │ │
│ │ │ 'device_id': 300, │ │
│ │ │ 'isp_id': 59, │ │
│ │ │ 'isp_net_locked': 0, │ │
│ │ │ 'blocked': 0, │ │
│ │ │ ... +10 │ │
│ │ } │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /usr/local/lib/python3.12/site-packages/asyncmy/replication/packets.py:344 │
│ in read_binary_json │
│ │
│ 341 │ │ self.unread(payload) │
│ 342 │ │ t = self.read_uint8() │
│ 343 │ │ │
│ ❱ 344 │ │ return self.read_binary_json_type(t, length) │
│ 345 │ │
│ 346 │ def read_binary_json_type(self, t, length): │
│ 347 │ │ large = t in (JSONB_TYPE_LARGE_OBJECT, JSONB_TYPE_LARGE_ARRAY) │
│ │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │ length = 13008 │ │
│ │ payload = b'\x00\x02\x00\xcf2\x12\x00\x06\x00\x18\x00\x06\x00\x00\x1e\x… │ │
│ │ \x00\x03\x00#\x00\x03\x00&\x00\x03\x00)\x00\x03\x00\x0c,\x00\… │ │
│ │ self = <asyncmy.replication.packets.BinLogPacket object at │ │
│ │ 0x7f4cd115e060> │ │
│ │ size = 4 │ │
│ │ t = 0 │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /usr/local/lib/python3.12/site-packages/asyncmy/replication/packets.py:349 │
│ in read_binary_json_type │
│ │
│ 346 │ def read_binary_json_type(self, t, length): │
│ 347 │ │ large = t in (JSONB_TYPE_LARGE_OBJECT, JSONB_TYPE_LARGE_ARRAY) │
│ 348 │ │ if t in (JSONB_TYPE_SMALL_OBJECT, JSONB_TYPE_LARGE_OBJECT): │
│ ❱ 349 │ │ │ return self.read_binary_json_object(length - 1, large) │
│ 350 │ │ elif t in (JSONB_TYPE_SMALL_ARRAY, JSONB_TYPE_LARGE_ARRAY): │
│ 351 │ │ │ return self.read_binary_json_array(length - 1, large) │
│ 352 │ │ elif t in (JSONB_TYPE_STRING,): │
│ │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │ large = False │ │
│ │ length = 13008 │ │
│ │ self = <asyncmy.replication.packets.BinLogPacket object at │ │
│ │ 0x7f4cd115e060> │ │
│ │ t = 0 │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /usr/local/lib/python3.12/site-packages/asyncmy/replication/packets.py:439 │
│ in read_binary_json_object │
│ │
│ 436 │ │ │ │ data = value_type_inlined_lengths[i][2] │
│ 437 │ │ │ else: │
│ 438 │ │ │ │ t = value_type_inlined_lengths[i][0] │
│ ❱ 439 │ │ │ │ data = self.read_binary_json_type(t, length) │
│ 440 │ │ │ out[keys[i]] = data │
│ 441 │ │ │
│ 442 │ │ return out │
│ │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │ elements = 2 │ │
│ │ i = 0 │ │
│ │ key_offset_lengths = [(18, 6), (24, 6)] │ │
│ │ keys = [b'health', b'status'] │ │
│ │ large = False │ │
│ │ length = 13007 │ │
│ │ out = {} │ │
│ │ self = <asyncmy.replication.packets.BinLogPacket │ │
│ │ object at 0x7f4cd115e060> │ │
│ │ size = 13007 │ │
│ │ t = 0 │ │
│ │ value_type_inlined_lengths = [(0, 30, None), (0, 120, None)] │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /usr/local/lib/python3.12/site-packages/asyncmy/replication/packets.py:349 │
│ in read_binary_json_type │
│ │
│ 346 │ def read_binary_json_type(self, t, length): │
│ 347 │ │ large = t in (JSONB_TYPE_LARGE_OBJECT, JSONB_TYPE_LARGE_ARRAY) │
│ 348 │ │ if t in (JSONB_TYPE_SMALL_OBJECT, JSONB_TYPE_LARGE_OBJECT): │
│ ❱ 349 │ │ │ return self.read_binary_json_object(length - 1, large) │
│ 350 │ │ elif t in (JSONB_TYPE_SMALL_ARRAY, JSONB_TYPE_LARGE_ARRAY): │
│ 351 │ │ │ return self.read_binary_json_array(length - 1, large) │
│ 352 │ │ elif t in (JSONB_TYPE_STRING,): │
│ │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │ large = False │ │
│ │ length = 13007 │ │
│ │ self = <asyncmy.replication.packets.BinLogPacket object at │ │
│ │ 0x7f4cd115e060> │ │
│ │ t = 0 │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /usr/local/lib/python3.12/site-packages/asyncmy/replication/packets.py:439 │
│ in read_binary_json_object │
│ │
│ 436 │ │ │ │ data = value_type_inlined_lengths[i][2] │
│ 437 │ │ │ else: │
│ 438 │ │ │ │ t = value_type_inlined_lengths[i][0] │
│ ❱ 439 │ │ │ │ data = self.read_binary_json_type(t, length) │
│ 440 │ │ │ out[keys[i]] = data │
│ 441 │ │ │
│ 442 │ │ return out │
│ │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │ elements = 1 │ │
│ │ i = 0 │ │
│ │ key_offset_lengths = [(11, 11)] │ │
│ │ keys = [b'temperature'] │ │
│ │ large = False │ │
│ │ length = 13006 │ │
│ │ out = {} │ │
│ │ self = <asyncmy.replication.packets.BinLogPacket │ │
│ │ object at 0x7f4cd115e060> │ │
│ │ size = 90 │ │
│ │ t = 0 │ │
│ │ value_type_inlined_lengths = [(0, 22, None)] │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /usr/local/lib/python3.12/site-packages/asyncmy/replication/packets.py:349 │
│ in read_binary_json_type │
│ │
│ 346 │ def read_binary_json_type(self, t, length): │
│ 347 │ │ large = t in (JSONB_TYPE_LARGE_OBJECT, JSONB_TYPE_LARGE_ARRAY) │
│ 348 │ │ if t in (JSONB_TYPE_SMALL_OBJECT, JSONB_TYPE_LARGE_OBJECT): │
│ ❱ 349 │ │ │ return self.read_binary_json_object(length - 1, large) │
│ 350 │ │ elif t in (JSONB_TYPE_SMALL_ARRAY, JSONB_TYPE_LARGE_ARRAY): │
│ 351 │ │ │ return self.read_binary_json_array(length - 1, large) │
│ 352 │ │ elif t in (JSONB_TYPE_STRING,): │
│ │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │ large = False │ │
│ │ length = 13006 │ │
│ │ self = <asyncmy.replication.packets.BinLogPacket object at │ │
│ │ 0x7f4cd115e060> │ │
│ │ t = 0 │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /usr/local/lib/python3.12/site-packages/asyncmy/replication/packets.py:439 │
│ in read_binary_json_object │
│ │
│ 436 │ │ │ │ data = value_type_inlined_lengths[i][2] │
│ 437 │ │ │ else: │
│ 438 │ │ │ │ t = value_type_inlined_lengths[i][0] │
│ ❱ 439 │ │ │ │ data = self.read_binary_json_type(t, length) │
│ 440 │ │ │ out[keys[i]] = data │
│ 441 │ │ │
│ 442 │ │ return out │
│ │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │ elements = 4 │ │
│ │ i = 0 │ │
│ │ key_offset_lengths = [(32, 3), (35, 3), (38, 3), (41, 3)] │ │
│ │ keys = [b'cpu', b'ddr', b'gpu', b'vee'] │ │
│ │ large = False │ │
│ │ length = 13005 │ │
│ │ out = {} │ │
│ │ self = <asyncmy.replication.packets.BinLogPacket │ │
│ │ object at 0x7f4cd115e060> │ │
│ │ size = 68 │ │
│ │ t = 12 │ │
│ │ value_type_inlined_lengths = [ │ │
│ │ │ (12, 44, None), │ │
│ │ │ (12, 50, None), │ │
│ │ │ (12, 56, None), │ │
│ │ │ (12, 62, None) │ │
│ │ ] │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /usr/local/lib/python3.12/site-packages/asyncmy/replication/packets.py:353 │
│ in read_binary_json_type │
│ │
│ 350 │ │ elif t in (JSONB_TYPE_SMALL_ARRAY, JSONB_TYPE_LARGE_ARRAY): │
│ 351 │ │ │ return self.read_binary_json_array(length - 1, large) │
│ 352 │ │ elif t in (JSONB_TYPE_STRING,): │
│ ❱ 353 │ │ │ return self.read_variable_length_string() │
│ 354 │ │ elif t in (JSONB_TYPE_LITERAL,): │
│ 355 │ │ │ value = self.read_uint8() │
│ 356 │ │ │ if value == JSONB_LITERAL_NULL: │
│ │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │ large = False │ │
│ │ length = 13005 │ │
│ │ self = <asyncmy.replication.packets.BinLogPacket object at │ │
│ │ 0x7f4cd115e060> │ │
│ │ t = 12 │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /usr/local/lib/python3.12/site-packages/asyncmy/replication/packets.py:254 │
│ in read_variable_length_string │
│ │
│ 251 │ │ length = 0 │
│ 252 │ │ bits_read = 0 │
│ 253 │ │ while byte & 0x80 != 0: │
│ ❱ 254 │ │ │ byte = struct.pack("!B", self.read(1)) │
│ 255 │ │ │ length = length | ((byte & 0x7F) << bits_read) │
│ 256 │ │ │ bits_read = bits_read + 7 │
│ 257 │ │ return self.read(length) │
│ │
│ ╭───────────────────────────────── locals ─────────────────────────────────╮ │
│ │ bits_read = 0 │ │
│ │ byte = 128 │ │
│ │ length = 0 │ │
│ │ self = <asyncmy.replication.packets.BinLogPacket object at │ │
│ │ 0x7f4cd115e060> │ │
│ ╰──────────────────────────────────────────────────────────────────────────╯ │
╰──────────────────────────────────────────────────────────────────────────────╯
`SHOW VARIABLES LIKE "%bin%";`
──────────────────────────────────────────────────────────────────────────────
| bind_address | * |
| binlog_cache_size | 32768 |
| binlog_checksum | CRC32 |
| binlog_direct_non_transactional_updates | OFF |
| binlog_encryption | OFF |
| binlog_error_action | ABORT_SERVER |
| binlog_expire_logs_auto_purge | ON |
| binlog_expire_logs_seconds | 86400 |
| binlog_format | ROW |
| binlog_group_commit_sync_delay | 0 |
| binlog_group_commit_sync_no_delay_count | 0 |
| binlog_gtid_simple_recovery | ON |
| binlog_max_flush_queue_time | 0 |
| binlog_order_commits | ON |
| binlog_rotate_encryption_master_key_at_startup | OFF |
| binlog_row_event_max_size | 8192 |
| binlog_row_image | FULL |
| binlog_row_metadata | MINIMAL |
| binlog_row_value_options | |
| binlog_rows_query_log_events | OFF |
| binlog_stmt_cache_size | 32768 |
| binlog_transaction_compression | OFF |
| binlog_transaction_compression_level_zstd | 3 |
| binlog_transaction_dependency_history_size | 25000 |
| binlog_transaction_dependency_tracking | WRITESET |
| innodb_api_enable_binlog | OFF |
| log_bin | ON |
| log_bin_basename | /var/lib/mysql/binlog |
| log_bin_index | /var/lib/mysql/binlog.index |
| log_bin_trust_function_creators | OFF |
| log_statements_unsafe_for_binlog | ON |
| max_binlog_cache_size | 18446744073709547520 |
| max_binlog_size | 1073741824 |
| max_binlog_stmt_cache_size | 18446744073709547520 |
| mysqlx_bind_address | * |
| sql_log_bin | ON |
| sync_binlog | 1 |
──────────────────────────────────────────────────────────────────────────────
the reason should be because some column of the database has JSON format data. below is what my data generate, this should let you easy to identify the problem
import pandas as pd from faker import Faker import random import json from sqlalchemy import create_engine, Column, Integer, String, DateTime, JSON, Text from sqlalchemy.orm import sessionmaker, declarative_base import time
db_user = 'root' db_password = 'password' db_host = 'localhost' db_name = 'database'
engine = create_engine(f'mysql+mysqlconnector://{db_user}:{db_password}@{db_host}/{db_name}')
Base = declarative_base()
class FakeData(Base): tablename = 'conversation'
id = Column(Integer, primary_key=True, autoincrement=True)
participant = Column(JSON)
date = Column(DateTime)
type = Column(String(50))
application = Column(String(50))
content = Column(Text)
date_timestamp = Column(Integer)
attachment = Column(String(255))
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
fake = Faker() fake_zh = Faker('zh_CN')
def generate_fake_data(num_rows):
fake_data = []
phone_roles = ['Caller', 'Callee']
email_roles = ['Sender', 'Receiver']
emails = [fake.email() for _ in range(1000)]
phone_numbers = [fake.phone_number() for _ in range(300)]
for num in range(num_rows):
application = random.choice(['reutersFST', 'reuters', 'bloomberg', 'exchange', 'deskphone'])
if application in ['reutersFST', 'reuters']:
record_type = 'chat'
elif application == 'bloomberg':
record_type = random.choice(['chat', 'email'])
elif application == 'exchange':
record_type = 'email'
elif application == 'deskphone':
record_type = 'phone'
if record_type == 'chat':
participant = [{'role': 'participant', 'email': random.choice(emails)} for _ in range(random.randint(1, 6))]
content = f'{fake.date_time()}\n {fake_zh.paragraph()}\n# {fake.sentence()} {fake.paragraph()}'
elif record_type == 'phone':
participant = [{'role': random.choice(phone_roles), 'email': random.choice(phone_numbers)}]
content = f'{fake.paragraph()}\n{fake_zh.paragraph()}{fake.paragraph()}\n{fake_zh.paragraph()}{fake.paragraph()}\n{fake_zh.paragraph()}'
elif record_type == 'email':
sender = {'role': 'Sender', 'email': random.choice(emails)}
receivers = [{'role': 'Receiver', 'email': random.choice(emails)} for _ in range(random.randint(1, 5))]
participant = [sender] + receivers
content = f'Receive Time: {fake.date_time()}\nSender Time: {fake.date_time()}\nReceive Address: {fake.address()}\nSend Address: {fake.address()}\nSubject: {fake.sentence()}{fake_zh.sentence()}\nBody: {fake.paragraph()}{fake_zh.paragraph()}\nAttachment: {fake.file_path()}'
date = fake.date_time()
date_timestamp = int(time.mktime(date.timetuple()))
fake_row = {
'participant': json.dumps(participant),
'date': date,
'type': record_type,
'application': application,
'content': content,
'date_timestamp': date_timestamp,
'attachment': f'/home/spdblon/{fake.word()}/{fake.date()}/file{random.randint(1, 100)}',
}
fake_data.append(fake_row)
return fake_data
def insert_fake_data(num_rows, batch_size=10000): Session = sessionmaker(bind=engine) session = Session()
for i in range(0, num_rows, batch_size):
batch_data = generate_fake_data(batch_size)
session.bulk_insert_mappings(FakeData, batch_data)
session.commit()
print(f'Inserted {i + batch_size} records')
insert_fake_data(10000)