python-zstandard icon indicating copy to clipboard operation
python-zstandard copied to clipboard

zstd.decompress not threadsafe for shared objects

Open mlbileschi opened this issue 1 year ago • 2 comments

Describe the bug Hi, When I have an object zstandard.ZstdCompressor() shared across a concurrent.futures.ThreadPoolExecutor, I see some nondeterministic behavior when using this object to compress.

My expectation was that there is no shared state in calls to compressor.compress(bytes), so it was surprising to me that threadedness saw some stochasticity in outputs.

To Reproduce

!pip install zstandard # this fetched zstandard-0.23.0
import zstandard
import concurrent.futures
import time

n = 10000
compressor = zstandard.ZstdCompressor()
decompressor = zstandard.ZstdDecompressor()

def compress(b):
  return compressor.compress(b)
def decompress(b):
  return decompressor.decompress(b)

compressed_values = []

# Compress in parallel
with concurrent.futures.ThreadPoolExecutor(max_workers=n) as executor:
  futures = [executor.submit(compress, str(i).encode()) for i in range(n)]

  for future in concurrent.futures.as_completed(futures):
    compressed_values.append(future.result())

# Decompress in parallel
decompressed_values = []
with concurrent.futures.ThreadPoolExecutor(max_workers=n) as executor:
  futures = [executor.submit(decompress, o) for o in compressed_values]

  for future in concurrent.futures.as_completed(futures):
    decompressed_values.append(future.result())
decompressed_values

Expected behavior A clear and concise description of what you expected to happen.

Screenshots and charts I see 4 different behaviors: a "happy path", and 3 errors. Sometimes the code outputs the decompressed values, and sometimes it does not. One of two errors:

---------------------------------------------------------------------------
ZstdError                                 Traceback (most recent call last)
<ipython-input-28-4a3cb429fab4> in <cell line: 17>()
     19 
     20   for future in concurrent.futures.as_completed(futures):
---> 21     compressed_values.append(future.result())
     22 
     23 decompressed_values = []

3 frames
<ipython-input-28-4a3cb429fab4> in compress(b)
      9 
     10 def compress(b):
---> 11   return compressor.compress(b)
     12 def decompress(b):
     13   return decompressor.decompress(b)

ZstdError: cannot compress: Operation not authorized at current processing stage

Another error:

---------------------------------------------------------------------------
ZstdError                                 Traceback (most recent call last)
<ipython-input-30-4a3cb429fab4> in <cell line: 24>()
     28   # Get results as they become available
     29   for future in concurrent.futures.as_completed(futures):
---> 30     decompressed_values.append(future.result())
     31 decompressed_values

3 frames
<ipython-input-30-4a3cb429fab4> in decompress(b)
     11   return compressor.compress(b)
     12 def decompress(b):
---> 13   return decompressor.decompress(b)
     14 
     15 compressed_values = []

ZstdError: decompression error: Unknown frame descriptor

The final error I'm seeing:

---------------------------------------------------------------------------
ZstdError                                 Traceback (most recent call last)
[<ipython-input-35-4a3cb429fab4>](https://localhost:8080/#) in <cell line: 24>()
     28   # Get results as they become available
     29   for future in concurrent.futures.as_completed(futures):
---> 30     decompressed_values.append(future.result())
     31 decompressed_values

3 frames
[<ipython-input-35-4a3cb429fab4>](https://localhost:8080/#) in decompress(b)
     11   return compressor.compress(b)
     12 def decompress(b):
---> 13   return decompressor.decompress(b)
     14 
     15 compressed_values = []

ZstdError: decompression error: Data corruption detected

Desktop (please complete the following information): !pip install zstandard # this fetched zstandard-0.23.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata from a free colab kernel at colab.research.google.com

Additional context This may be a user error, but I found it surprising nonetheless

mlbileschi avatar Dec 02 '24 14:12 mlbileschi

Also decompress has error:

# Step 2: Decompress the Zstd file
with open(file_path, 'rb') as f:
        dctx = zstandard.ZstdDecompressor()
        with dctx.stream_reader(f) as zstd_stream:
                tar_data = io.BytesIO(zstd_stream.read())

this gives error:

concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
Command exited with non-zero status 1

also this error cannot be handled with try catch. python program dies.

kazimsarikaya avatar Dec 19 '24 22:12 kazimsarikaya

See #243 which attempts to fix this by raising an error if someone attempts to share compressors or decompressors.

ngoldbaum avatar Jan 28 '25 18:01 ngoldbaum

The docs at https://github.com/indygreg/python-zstandard/blob/main/docs/api_usage.rst#thread-and-object-reuse-safety call out that ZstdCompressor and ZstdDecompressor instances cannot be used across multiple threads safely. And the rendered API docs at e.g. https://python-zstandard.readthedocs.io/en/latest/compressor.html#zstdcompressor call this out as well.

This behavior may get enforced as part of implementing free-threaded support. Time will tell.

The original example code:

compressor = zstandard.ZstdCompressor()
decompressor = zstandard.ZstdDecompressor()

def compress(b):
  return compressor.compress(b)
def decompress(b):
  return decompressor.decompress(b)

is clearly in violation of this stated API contract. So I don't consider this a bug.

indygreg avatar Aug 17 '25 03:08 indygreg