piker
piker copied to clipboard
`marketstore`: gRPC snappy compression crash?
Been seeing this a lot lately on our std tdsb queries:
Jan 13 17:00:40 (brokerd.ib[b5900c], 5413, piker.data.feed.tsdb_backfill.<locals>.back_load_from_tsdb)) [ERROR] piker.data.marketstore marketstore.py:459 Unknown mkts QUERY error: Params(tbk=mnq.cme.20230317.ib/1Sec/OHLCV, start=None, end=2023-01-13T20:04:08+00:00, limit=800000, limit_from_start=None, columns=None)
('RPC failed with status UNKNOWN (2): snappy: corrupt input',)
Traceback (most recent call last):
File "/home/goodboy/repos/piker/piker/data/marketstore.py", line 456, in read_ohlcv
result = await client.query(params)
File "/home/goodboy/.virtualenvs/piker310/src/anyio-marketstore/src/anyio_marketstore/__init__.py", line 168, in query
reply = await self.stub.Query(reqs)
File "/home/goodboy/.virtualenvs/piker310/lib/python3.10/site-packages/purerpc/wrappers.py", line 93, in __call__
return await extract_message_from_singleton_stream(stream)
File "/home/goodboy/.virtualenvs/piker310/lib/python3.10/site-packages/purerpc/wrappers.py", line 17, in extract_message_from_singleton_stream
raise_status(event.status)
File "/home/goodboy/.virtualenvs/piker310/lib/python3.10/site-packages/purerpc/grpclib/exceptions.py", line 120, in raise_status
raise UnknownError(status.status_message)
purerpc.grpclib.exceptions.UnknownError: RPC failed with status UNKNOWN (2): snappy: corrupt input
It unfortunately prevents the prior history from being loaded..
Finally found where it's probably from...
a golang compression lib:
- repo: https://github.com/klauspost/compress
- the error msg def: https://github.com/klauspost/compress/blob/290f4cfacb3eff892555a491e3eeb569a48665e7/zstd/snappy.go#L54
- all the probably spots this might be raises :joy:
- https://github.com/klauspost/compress/blob/290f4cfacb3eff892555a491e3eeb569a48665e7/zstd/snappy.go#L130
~No idea what's up, and we should probably report it upstream, but maybe this is incentive to toy with a new tsdb already 😉 ~
MEGA LOLZ / updatez
After fighting with this again for 2 hours today, i've more or less found the likely source of the issue:
-
marketstore's grpc server by default uses so calledsnappy"variable compression" which you can disable with something like this in the yaml config:disable_variable_compression: true -
without the ^ only certain symbols will fail to load large queries (on the range of our limit (800k datums), no clue yet what the commonality is, but maybe something to do with parsing in wtv
gRPCrelatedsnappycompression lib the golang server side code is using.. -
disabling this feature with the above config entry will further result in the following RPC error (on an attempt to load the full history after a tsdb wipe and full
pikerdrestart):Unknown mkts QUERY error: Params(tbk=dogeusdt.binance/1Min/OHLCV, start=None, end=2023-04-18T03:20:00+00:00, limit=800000, limit_from_start=None, columns=None) ('RPC failed with status UNKNOWN (2): EOF',) Traceback (most recent call last): File "/home/goodboy/repos/piker/piker/service/marketstore.py", line 475, in read_ohlcv result = await client.query(params) File "/home/goodboy/repos/anyio-marketstore/src/anyio_marketstore/__init__.py", line 168, in query reply = await self.stub.Query(reqs) File "/home/goodboy/repos/purerpc/src/purerpc/wrappers.py", line 91, in __call__ return await extract_message_from_singleton_stream(stream) File "/home/goodboy/repos/purerpc/src/purerpc/wrappers.py", line 16, in extract_message_from_singleton_stream raise_status(event.status) File "/home/goodboy/repos/purerpc/src/purerpc/grpclib/exceptions.py", line 120, in raise_status raise UnknownError(status.status_message) purerpc.grpclib.exceptions.UnknownError: RPC failed with status UNKNOWN (2): EOF -
trying to use any form of
snappycompression from ouranyio-marketstoreclient (by setting it manually insidepurerpc) won't work either and instead results in wire-protocol errors from ouranyio-marketstoreclient :joy:.Traceback (most recent call last): File "/home/goodboy/repos/tractor/tractor/_runtime.py", line 212, in _invoke res = await coro File "/home/goodboy/repos/piker/piker/data/feed.py", line 481, in open_feed_bus await bus.nursery.start( File "/home/goodboy/.virtualenvs/piker310/lib/python3.10/site-packages/trio/_core/_run.py", line 1085, in start async with open_nursery() as old_nursery: File "/home/goodboy/.virtualenvs/piker310/lib/python3.10/site-packages/trio/_core/_run.py", line 850, in __aexit__ raise combined_error_from_nursery File "/home/goodboy/repos/piker/piker/data/feed.py", line 336, in allocate_persistent_feed ) = await bus.nursery.start( File "/home/goodboy/.virtualenvs/piker310/lib/python3.10/site-packages/trio/_core/_run.py", line 1085, in start async with open_nursery() as old_nursery: File "/home/goodboy/.virtualenvs/piker310/lib/python3.10/site-packages/trio/_core/_run.py", line 850, in __aexit__ raise combined_error_from_nursery File "/home/goodboy/repos/piker/piker/data/history.py", line 714, in manage_history await bus.nursery.start( File "/home/goodboy/.virtualenvs/piker310/lib/python3.10/site-packages/trio/_core/_run.py", line 1085, in start async with open_nursery() as old_nursery: File "/home/goodboy/.virtualenvs/piker310/lib/python3.10/site-packages/trio/_core/_run.py", line 850, in __aexit__ raise combined_error_from_nursery File "/home/goodboy/repos/piker/piker/data/history.py", line 401, in tsdb_backfill tsdb_history, first_tsdb_dt, last_tsdb_dt = await storage.load( File "/home/goodboy/repos/piker/piker/service/marketstore.py", line 421, in load hist = await self.read_ohlcv( File "/home/goodboy/repos/piker/piker/service/marketstore.py", line 453, in read_ohlcv syms = await client.list_symbols() File "/home/goodboy/repos/anyio-marketstore/src/anyio_marketstore/__init__.py", line 106, in list_symbols resp = await self.stub.ListSymbols( File "/home/goodboy/repos/purerpc/src/purerpc/wrappers.py", line 91, in __call__ return await extract_message_from_singleton_stream(stream) File "/home/goodboy/repos/purerpc/src/purerpc/wrappers.py", line 16, in extract_message_from_singleton_stream raise_status(event.status) File "/home/goodboy/repos/purerpc/src/purerpc/grpclib/exceptions.py", line 142, in raise_status raise InternalError(status.status_message) purerpc.grpclib.exceptions.InternalError: RPC failed with status INTERNAL: grpc: error unmarshalling request: proto: cannot parse invalid wire-format data -
pushed up a commit showing how to make this change in cf1b0b5e9c9e, it only kinda works depending on backend; generally speaking i don't think the
marketstoredevs test with this option disabled :joy::- for ex. for binance on
xmrusdt.binanceit results in the result from two bullets above - with something like
mnq.cme.ibyou get bad (really -ve?) data pushed to the 1m OHLC series?
- for ex. for binance on
What eLsE cAn wE dO!?
In summary, i think i've completely lost faith (and/or confirmed my
biases) that using anything from google is a hot mess of dumpster
and markestore service side design has to be one of the rickity-est
i've seen.
At this point I think it's a better expenditure of time to rotate to
another tsdb system and/or consider something like polars and a modern
filesystem layout/config + apache arrow files...
However, there are few more things to try since apparently i just love taking it in the face with sunk-cost-fallacy...
- [ ] try out using the std (non-async) client to do the large ts query
part that seems to be failing for certain symbols.. though i think we
should try to sidestep the grpc transport to see if we can just pull
that history whatsoever..
- https://github.com/alpacahq/pymarketstore#client
- [x] try only using
snappy-comp on thepurerpc.grpclib.buffer.MessageReadBuffer(which i think we'll have to definitely patch in support for..
I can't really seem to find anything useful related to this issue and I don't think it actually has anything to do with marketstore?
For eg. web searching ("RPC failed with status UNKNOWN (2): snappy: corrupt input") yields these mostly unhelpful findings:
- https://github.com/prometheus/prometheus/issues/9474
- https://github.com/Telefonica/prometheus-kafka-adapter/issues/40
- [x] @guilledk maybe we can disable compression somehow in
anyio-marketstoreto try and workaround this?- https://github.com/grpc/grpc/blob/master/doc/compression.md
- https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#compressed-flag
- https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#message-encoding
- [x] worth a shot probably trying to disable the snappy compression: https://github.com/python-trio/purerpc/blob/5faa35f2cdf5b36d1398a1aa6a9e681c5344060f/src/purerpc/grpclib/buffers.py#L84
- [x] @guilledk maybe we can disable compression somehow in
UPDATE see above where i did dive in and regret it..
lul, here's a dump of all the urls i was using before actually just hacking all the repos manually..
- https://github.com/python-trio/purerpc/blob/c490b0eb988b9938210623cbed3b8e6b52c1f5c5/src/purerpc/client.py
- https://github.com/python-trio/purerpc/blob/c490b0eb988b9938210623cbed3b8e6b52c1f5c5/src/purerpc/grpclib/config.py
- https://github.com/python-trio/purerpc/blob/5faa35f2cdf5b36d1398a1aa6a9e681c5344060f/src/purerpc/grpclib/buffers.py#L84
- https://github.com/python-trio/purerpc