arctic icon indicating copy to clipboard operation
arctic copied to clipboard

[ChunkStore] Decompression error on concurrent read & write

Open ddebie opened this issue 5 years ago • 34 comments

Arctic Version

1.70.0

Arctic Store

ChunkStore

Platform and version

CentOS 7.2

Description of problem and/or code sample that reproduces the issue

I'm getting an error when I have two threads simultaneously reading and writing from one symbol. The reading thread will periodically throw a LZ4BlockError:

LZ4BlockError: Decompression failed: corrupt input or insufficient space in destination buffer. Error code: 24049

Here is the code to reproduce it:

def get_library():
    return Arctic('some_host:1234').get_library(LIB_NAME)

def write_loop():
    library = get_library()
    while True:
        data = create_data(100)  # Creates 100 rows of random data
        library.append('SYMBOL', data)
        
def read_loop():
    library = get_library()
    while True:
        df = library.read('SYMBOL')

proc = mp.Process(target=write_loop)
proc.start()

try:
    read_loop()
finally:
    proc.terminate()

From a quick check, it seems that the data being passed to decompress(_str) in _compression.py is not valid lz4 - could the the block metadata and data be out of sync?

ddebie avatar Nov 12 '18 02:11 ddebie

I'm pretty sure none of the libraries are threadsafe. There are multiple documents that make up the stored data and you can't read and write to them at the same time (though you can write new data to chunkstore and read back older data that is not part of that chunk at the same time).

correct me if I'm wrong @jamesblackburn

bmoscon avatar Nov 12 '18 04:11 bmoscon

This seems like something Arctic should be handling, and have some documented expectations of what clients might observe. Potentially with ways to documented ways resolve e.g. either retry on a class of errors (similar to SQL optimistic lock failures) or ways to resolve via config e.g. isolation levels.

yschimke avatar Nov 12 '18 09:11 yschimke

Thanks for your responses guys

I've looked into this a bit more and from what I can tell, it's due to writes not being atomic - there is a short period of time where the metadata document is out of sync with the data. So it seems that the problem exists even when running from a single thread.

Even with a single thread, if an application dies while writing, it's possible to cause permanent data corruption (and I've been able to reproduce this pretty consistently). If I run just the writing code (below), and then randomly kill the process, eventually the symbol will become corrupted and unreadable.

def write_loop():
    library = get_library()
    while True:
        data = create_data(100)  # Creates 100 rows of random data
        library.append('SYMBOL', data)

Is there any way around this? It seems to me that without writes being atomic, permanent data corruption is possible.

ddebie avatar Nov 12 '18 23:11 ddebie

I'm well aware of the issue you're seeing, but its not easily solved in arctic. It is more easily solved by the application using arctic, but something might be able to be done in arctic. The multi document atomic writes are supported by mongo, but only in version 4.0+

bmoscon avatar Nov 13 '18 02:11 bmoscon

How would you recommend solving this in the application using arctic? As I understand it, if the application dies or even if the box goes down at the wrong time, the data will be corrupted. How can the application avoid this?

Also, are there currently any plans for Arctic to utilize the new multi doc atomic writes in mongo in the future?

ddebie avatar Nov 14 '18 01:11 ddebie

either don't read while writing or make sure you are not reading from the chunk being written to. If your chunksize were hourly you could make sure you were never reading from the current hour (for example). Obviously there is nothing you can do to prevent power outages or the like.

There are not plants for arctic to support multi doc atomic writes at the moment

bmoscon avatar Nov 16 '18 23:11 bmoscon

@bmoscon Can we leave this open to track what is a valid enhancement request?

yschimke avatar Nov 17 '18 06:11 yschimke

of course, at some point we'll want to do this

bmoscon avatar Nov 17 '18 12:11 bmoscon

either don't read while writing or make sure you are not reading from the chunk being written to. If your chunksize were hourly you could make sure you were never reading from the current hour (for example). Obviously there is nothing you can do to prevent power outages or the like.

There are not plants for arctic to support multi doc atomic writes at the moment

The issue exists even when you're not reading/writing concurrently though. Even if I do just a single write, it is possible to completely corrupt the data so that it's no longer readable. In my view such an issue is pretty critical, and with it I'm not sure how Arctic can be safely used in a production environment where data integrity matters. We have been considering switching our tick databases to Arctic and have liked what we've seen so far, but this would definitely be a blocker to us.

ddebie avatar Nov 18 '18 23:11 ddebie

I assume you're referring to a power outage? The only two collections that matter for chunkstore are symbol and the data. the symbol document doesnt really have anything of importance that isnt written once when its created (chunk size, column data, etc), After that is written, if you are not reading and writing concurrently you really shouldnt have any issues, even with a power outage as mongo has supported single document atomic writes since 3.2 I believe. The only information in the symbol document that is updated are things like row counts, number of chunks, etc but they are only for statistical purposes.

bmoscon avatar Nov 18 '18 23:11 bmoscon

Yep I'm referring to anything that could kill a process while it is writing to Arctic. Could be a power outage, hardware failure, even just killing the python process. This example reproduces the data corruption for me:

import pandas as pd
from arctic import Arctic

library = Arctic('some_host:1234').get_library('compression_test')

while True:
    data = create_data(100) # Create dataframe with 100 rows random data
    library.append('SYMBOL', data)

So I first initialize the library, and then run the above code and repeatedly stop the process with Ctrl+Z. Eventually the data will become corrupted (for me it takes ~10 times or so of stopping before corruption, but would depend on your system/hardware). Here is a sample output from running the above code:

[65]+  Stopped                 python arctic-bug2.py
[root@20f6c80ae2a5 tmp]# python arctic-bug2.py 
^Z
[66]+  Stopped                 python arctic-bug2.py
[root@20f6c80ae2a5 tmp]# python arctic-bug2.py 
^Z
[67]+  Stopped                 python arctic-bug2.py
[root@20f6c80ae2a5 tmp]# python arctic-bug2.py 
^Z
[68]+  Stopped                 python arctic-bug2.py
[root@20f6c80ae2a5 tmp]# python arctic-bug2.py 
^Z
[69]+  Stopped                 python arctic-bug2.py
[root@20f6c80ae2a5 tmp]# python arctic-bug2.py 
Traceback (most recent call last):
  File "arctic-bug2.py", line 24, in <module>
    library.append('SYMBOL', data)
  File "/usr/local/lib/python2.7/site-packages/arctic/chunkstore/chunkstore.py", line 538, in append
    self.__update(sym, item, metadata=metadata, combine_method=SER_MAP[sym[SERIALIZER]].combine, audit=audit)
  File "/usr/local/lib/python2.7/site-packages/arctic/chunkstore/chunkstore.py", line 447, in __update
    df = self.read(symbol, chunk_range=chunker.to_range(start, end), filter_data=False)
  File "/usr/local/lib/python2.7/site-packages/arctic/chunkstore/chunkstore.py", line 301, in read
    return deser(chunks[symbol[0]], **kwargs) if skip_filter else chunker.filter(deser(chunks[symbol[0]], **kwargs), chunk_range)
  File "/usr/local/lib/python2.7/site-packages/arctic/serialization/numpy_arrays.py", line 219, in deserialize
    df = pd.concat([self.converter.objify(d, columns) for d in data], ignore_index=not index)
  File "/usr/local/lib/python2.7/site-packages/arctic/serialization/numpy_arrays.py", line 147, in objify
    d = decompress(doc[DATA][doc[METADATA][LENGTHS][col][0]: doc[METADATA][LENGTHS][col][1] + 1])
  File "/usr/local/lib/python2.7/site-packages/arctic/_compression.py", line 135, in decompress
    return lz4_decompress(_str)
_block.LZ4BlockError: Decompression failed: corrupt input or insufficient space in destination buffer. Error code: 1774823
[root@20f6c80ae2a5 tmp]# python arctic-bug2.py 
Traceback (most recent call last):
  File "arctic-bug2.py", line 24, in <module>
    library.append('SYMBOL', data)
  File "/usr/local/lib/python2.7/site-packages/arctic/chunkstore/chunkstore.py", line 538, in append
    self.__update(sym, item, metadata=metadata, combine_method=SER_MAP[sym[SERIALIZER]].combine, audit=audit)
  File "/usr/local/lib/python2.7/site-packages/arctic/chunkstore/chunkstore.py", line 447, in __update
    df = self.read(symbol, chunk_range=chunker.to_range(start, end), filter_data=False)
  File "/usr/local/lib/python2.7/site-packages/arctic/chunkstore/chunkstore.py", line 301, in read
    return deser(chunks[symbol[0]], **kwargs) if skip_filter else chunker.filter(deser(chunks[symbol[0]], **kwargs), chunk_range)
  File "/usr/local/lib/python2.7/site-packages/arctic/serialization/numpy_arrays.py", line 219, in deserialize
    df = pd.concat([self.converter.objify(d, columns) for d in data], ignore_index=not index)
  File "/usr/local/lib/python2.7/site-packages/arctic/serialization/numpy_arrays.py", line 147, in objify
    d = decompress(doc[DATA][doc[METADATA][LENGTHS][col][0]: doc[METADATA][LENGTHS][col][1] + 1])
  File "/usr/local/lib/python2.7/site-packages/arctic/_compression.py", line 135, in decompress
    return lz4_decompress(_str)
_block.LZ4BlockError: Decompression failed: corrupt input or insufficient space in destination buffer. Error code: 1774823

At this point, it's fully corrupted and you can no longer read nor write

ddebie avatar Nov 21 '18 03:11 ddebie

ok, that should not happen. I can look into that and fix it. I'll let you know

bmoscon avatar Nov 21 '18 13:11 bmoscon

@ddebie also try VersionStore (the default) which is what we use in production here. It shouldn’t be possible to corrupt the symbol with ctrl-C etc.

jamesblackburn avatar Nov 22 '18 03:11 jamesblackburn

@jamesblackburn i found the issue. bulk operations are not atomic in mongodb. The individual operations are, but if it dies in the middle, it will not roll back the previous ones. Not sure how to handle this, other than to write data before the bulk write to internal metadata. on arctic start up we could check to see if the metadata written looks to match what was actually written and remove invalid/corrupt chunks if necessary?

bmoscon avatar Nov 22 '18 13:11 bmoscon

Hi, are there any plans on fixing this? This happens to me from time to time when using ChunkStore. And to cure this, I have to delete all the data for symbol, not just the last chunk. Any workarounds known to cure this without deleting the whole symbol?

streamtv85 avatar Dec 27 '18 16:12 streamtv85

yes I do plan on fixing it. You can delete the last chunk, but you'll also need to update the metadata for the symbol as well, otherwise it will think there is data for a chunk that doesn't exist

bmoscon avatar Dec 27 '18 19:12 bmoscon

the problem is that when trying to delete the last chunk I am getting the same LZ4BlockError but I do not get the error when deleting the whole symbol The easiest way to recover data seemed to me:

  • copy all chunks but the last one to the new symbol
  • delete the broken symbol
  • rename the new symbol to put it in place of the deleted one

The only problem is that it may take much time for large sets of data. Is there more efficient way to recover the broken symbol?

streamtv85 avatar Jan 04 '19 22:01 streamtv85

try using delete(self, symbol, chunk_range=None) with chunk_range set to the appropriate range

bmoscon avatar Jan 04 '19 23:01 bmoscon

Hi Guys, thanks for opening up this work. I am also interested in using ArcticDB for our work to handle market data, just wondering if there is a time frame of when this issue could be fixed? It will make selling Arctic to my boss a much easier job.

cheers

gambitvk avatar Feb 11 '19 00:02 gambitvk

@shashank88 Do you think testing with Forward pointers will make any difference here? Does it make it safer in single writer with concurrent readers mode?

yschimke avatar Feb 11 '19 07:02 yschimke

I dont think the forward pointers would do anything. The issue is that data is being written to two collections in two separate transactions (or in some cases, you might have multiple updates to a collection in separate transactions). If you kill the process in the middle of this, bad things can happen.

bmoscon avatar Feb 14 '19 00:02 bmoscon

Yeah, I don't think forward pointer will help solve this based on what @bmoscon said. I haven't dug into this issue till now, will take a look

shashank88 avatar Feb 20 '19 22:02 shashank88

Sorry I am a bit new to Mongo, just a quick thought base on exp form other DBMS .. we normally will try to use a transaction to group anything that needs to be atomic and looks like Mongodb do provide such facility. Could this be used to solve this problem?

Thanks Guys, keen to see this get fixed!!

https://docs.mongodb.com/manual/core/transactions/

// Start a session. session = db.getMongo().startSession( { readPreference: { mode: "primary" } } );

employeesCollection = session.getDatabase("hr").employees; eventsCollection = session.getDatabase("reporting").events;

// Start a transaction session.startTransaction( { readConcern: { level: "snapshot" }, writeConcern: { w: "majority" } } );

// Operations inside the transaction try { employeesCollection.updateOne( { employee: 3 }, { $set: { status: "Inactive" } } ); eventsCollection.insertOne( { employee: 3, status: { new: "Inactive", old: "Active" } } ); } catch (error) { // Abort transaction on error session.abortTransaction(); throw error; }

// Commit the transaction using write concern set at transaction start session.commitTransaction();

session.endSession();

gambitvk avatar Feb 22 '19 00:02 gambitvk

  1. Mongo transactions are only supported in 4.0+. Adding this would require dropping support for all previous versions. Not sure thats do-able.
  2. It requires a replica set. No other setup is supported. Eventually they'll support transactions on sharded clusters, but they do not currently. The requirement for a specific configuration is likely a dealbreaker.

bmoscon avatar Feb 22 '19 00:02 bmoscon

I think there may be a bit of confusion here regarding the different store types. VersionStore (the default store type) and TickStore should not experience this issue.

In VersionStore we deliberately write chunks (which are immutable) before writing the metadata documents i.e. publishing the data.

We have tests for incomplete writes and concurrent reads and writes: https://github.com/manahl/arctic/blob/master/tests/integration/test_arctic_multithreading.py#L47

In VersionStore there is an edge cases with large sharded clusters where, if a mongod crashes and you're Arctic library only has single mongod consistency (w=1) and a data rollback occurs the latest version can have the wrong number of chunks. In the case of sharded clusters you should use majority write concern to guarantee resilience.

jamesblackburn avatar Feb 22 '19 08:02 jamesblackburn

sorry, I thought this was already clear, but maybe it wasnt. This 100% only affects chunkstore. I'm working on a fix now, should have it fixed within a week or two. I'll likely only fix the data corruption issue (i.e. you write, and the write dies in the middle). Writing and reading concurrently is much harder to fix and I'll address that later

bmoscon avatar Feb 24 '19 21:02 bmoscon

@bmoscon confusion probably comes from me, sorry.

yschimke avatar Feb 25 '19 08:02 yschimke

@yschimke no worries - wasnt blaming anyone. the title now says [ChunkStore] so I think we should be a-ok :D

bmoscon avatar Feb 26 '19 00:02 bmoscon

ok, i have a fix that looks like it will work, doing some more testing. Basically it creates its own transaction record, does the write, and then removes the transaction, if the transaction is present during a read or a write, it will raise an error (and can be self corrected (i..e remove the partially written data)).

bmoscon avatar May 18 '19 20:05 bmoscon

Did you commit the fix? Is it in another branch? I have this issue in production and I can help with more testing.

michael-stefanski avatar Jun 11 '19 12:06 michael-stefanski