zstd icon indicating copy to clipboard operation
zstd copied to clipboard

Is zstd splitabble in hadoop/spark/etc?

Open trixpan opened this issue 7 years ago • 57 comments

I know the 4mc container adds splitability to zstd but is the standard by itself able to do so without the help of 3rd party implementations?

Note: I am aware zstd still not officially implemented in Hadoop so this is more like a theoretical question.

trixpan avatar Sep 28 '16 13:09 trixpan

We have plan to re-use and extend the skip format based on skippable frames implemented in pzstd for multi-threaded decompression. 4mc's @carlomedas is aware of this context. We may spend some time in the future to properly define it and make it part of the official format.

Cyan4973 avatar Sep 28 '16 13:09 Cyan4973

Hi, @trixpan, I talked to the 4mc's author, It would be possible to write metadata in zstd skippable frames and make the whole file splittable by hadoop(need a special InputFormat).

But the idea would be same with 4mc in the essential.

advancedxy avatar Sep 28 '16 13:09 advancedxy

@advancedxy I saw 4mc previously.

I think people's concern about 4mc is the limited community (although I must confess compression algorithms don't strike me as pieces of code with hundreds of contributors)

trixpan avatar Feb 10 '17 00:02 trixpan

Is the format in which pzstd uses splittable frames official? If it is then I can update the hadoop project to utilize anything compressed with zstd-cli then copied into hadoop. Have you guys decided or finalized the format of pzstd files? Thank you

ghost avatar Feb 27 '17 23:02 ghost

We don't have yet a splittable format.

This is an active topic, we are working on it, and expect to make progresses throughout March for this objective. Btw, it's probably the best time to be involved in this process, as we will work on this specification with several projects interested in a splittable format. So don't hesitate if you want to be part of it.

The final format will likely be different from current pzstd one. Not that there is any problem with pzstd format, it's just that we can do better. pzstd format will likely be discontinued, though it's not a concern for existing compressed data : pzstd format has been designed to be compatible with zstd decoder, and it is perfectly decodable by any zstd compliant decoder. That will remain true. We merely are going to use different ways to generate splittable and multi-threaded data.

Cyan4973 avatar Feb 27 '17 23:02 Cyan4973

@Cyan4973 thanks for the update, I think having zstd splittable in hadoop would be very attractive to a lot of users. I would like to be involved if possible discussing the proposed splittable format. Thank you.

ghost avatar Feb 27 '17 23:02 ghost

cc @iburinoc

Cyan4973 avatar Feb 27 '17 23:02 Cyan4973

I definitely agree with this approach: 4mc is nice but we need this as mainstream in zstd. A splittable zstd, as matter of fact, is just leveraging the already fully scalable compression format to make sure it can be processed/emitted in blocks with the related indexes.

carlomedas avatar Mar 02 '17 09:03 carlomedas

Agreed. Please involve me with discussion and implementation if possible.

advancedxy avatar Mar 02 '17 16:03 advancedxy

Here's an overview of our current thoughts on how to implement this, any feedback is appreciated!

Splittable/Seekable Format

A splittable and/or seekable Zstandard format could be useful in a variety of a scenarios by allowing decompression/processing to be parallelized or by allowing small subsections to be decompressed without having to decompress the entire preceding file.

The proposed general format of a Zstandard frame would consist of a sequence of Zstandard compressed frames, with a skippable frame containing a "jump table". This jump table would contain the information allowing decompressors to seek directly to the frame it wants to decompress, rather than having to process the entire file leading up to it. This design is based on the format used in @carlomedas's 4mc project, which currently adds splittability to LZ4 and Zstd for use in Hadoop.

Given this outline, there's a few choices left to be made:

Jump Table Position

Where in the file should the jump table be placed? There are two obvious choices here:

  • Beginning of the file:
    • Pros:
      • Decompressors don't need to seek anywhere to find the jump table, they can read it immediately
    • Cons:
      • We need to know how many chunks we're going to write before we start anything
      • We need to be able to seek back afterwards and write the jump table
        • Very difficult to use with streaming
      • Can't append new data
  • End of file:
    • Pros:
      • Can write the jump table after processing all data, so it works with streaming
      • Can append more data easily, just overwrite the jump table (or leave it as a skipped frame), and write a new one at the new end
    • Cons:
      • More complicated for the decompressor, they must seek to the end of the file to determine where to go
  • Multiple jump tables: This style could be similar to the beggining of file one, except with an additional field that indicates the jump offset to find the next jump table This works with appending data, however it still requires either seeking back to write, or compressing chunks in memory and writing them out later on, so it's still challenging to use in streaming mode, and it's very complex for the decompressor.

The end of file approach seems best as I'm not aware of any scenarios where the seek to end of file requirement, for reads, is an issue, but if there are such scenarios then this topic is open.

Jump Table Format

The format of the Jump Table would be a Zstandard skippable frame, with some header (or footer depending on the position of the jump table) containing some information such as the number of chunks, and some flags, and then an array containing the data in the jump table itself

Potentially useful data to have in the table is, for each frame:

  • compressed size
  • uncompressed size
  • a checksum for the frame

Compressed Size

There are two main options for storing the compressed size: do we store an offset, or do we store the size

  • Offset:
    • Pros:
      • For seekability, we don't need to process every preceding frame in the table
      • This method could also be used to avoid have entries in the jump table for skippable frames, although then a method like ZSTD_findCompressedSize would be needed to get the actual compressed size of a frame
    • Cons:
      • To find the actual compressed size, we also need to read the next entry in the table
      • Size constraints: If we store the offset, for a large file (> 4GB) we need more than 4 bytes for each offset for each entry (likely 8 bytes in this case). Do we then always use 8 bytes? Or perhaps make that a flag?
  • Size:
    • Pros:
      • We can limit frame size to 4 GB while still having files larger than that, and so we only need 4 bytes per field
        • Although we could still leave this open as an option, if larger chunks are desired
    • Cons:
      • Finding the offset to seek to requires processing all previous entries in the jump table

Thoughts: we could leave this as an option to be chosen by a flag, although I personally prefer the offset option as it provides good behaviour in both cases, and the size issue only comes into play when dealing with files that are already large.

Uncompressed Size

Here we have the same offset/size problem as with compressed size, so that section more or less applies to both. One additional point to add is that for seeking to the middle of a file, the offset method allows for binary search to be used, while the size method requires a linear scan. It's unlikely that the jump table would be large enough for this to be an issue however.

Another point with uncompressed size is that it's likely that in many cases, each frame has the same uncompressed size. If this is a case that would occur, then it could be worth adding a flag/field that allows the jump table to set a single uncompressed size for all frames, allowing a decompressor to immediately determine which frames it needs to decompress based on the offset and frame size.

I think the optional solution for uncompressed size would be to resolve the size/offset issue with the same choice as for the compressed size, for consistency, and then allow for a flag in the header to indicate that all frames have the same uncompressed size instead of storing it with each entry.

Checksum

It might be desirable to have a checksum accompany each frame so we can verify frames. Zstandard already has a per-frame checksum over uncompressed data, however the layout of this format thus far would allow for it to trivially be extended to allow frames compressed with other algorithms, e.g. gz or xz.

So the questions to be answered here are:

Do we include a checksum? I think this should be a configurable option in the format.

Which algorithm should be used? For consistency with Zstandard, the least significant 4 bytes of XXH64 makes sense.

Should it be over the compressed or uncompressed data? Uncompressed data: allows us to verify both the compressed data and the decompression process. Compressed data: allows us to check the data before we attempt decompression

I think uncompressed data makes more sense, especially if we're mixing various compression algorithms, and since if we have to read the compressed data anyway, we might as well decompress it while we're at it. This is definitely worth discussing more, however.

Dictionary Compression

One way to recoup the compression-ratio losses of splitting the input is to use dictionaries. While currently training dictionaries on data is quite slow, and so might not be suited to doing inline, a possible future improvement on this is to train a dictionary on the frames and using them with each one, and then including this dictionary somewhere in the compressed file.

Some obvious impacts this usage could have are:

  • If we have a special frame containing the dictionary at the start of the file, the jump table can't make the assumption that the first frame starts at offset 0, which complicates the size method.
  • If dictionaries aren't necessarily stored at the beginning of the file but simply stored inline, we need to be able to determine whether a frame needs a certain dictionary for decoding, and if so, where to find it. A possible solution here is to include some sort of dictionary field in the jump table, e.g. the offset at which to find the dictionary for decompression (these could then be cached and used as DDict's for efficient decompression of multiple frames).

We should make sure to put some thought into these and other possible related situations to ensure that the format doesn't hamper future expansion.

sean-purcell avatar Mar 15 '17 17:03 sean-purcell

I definitely like this.

Here's my 2 comments/preferences:

  • Jump Table Positions - End of File - One more Pro's: Hadoop/Spark/other-big-data compression codecs will be able to work with this flawlessly: HDFS output streams only work in append mode.

  • Jump table format / Compressed Size / By Size: I would even remove from the 'Cons' the limit up to 4 GB of each frame, as it's very huge one: when you want to have the file splittable even 64MB is a too bit frame size IMHO.

carlomedas avatar Mar 15 '17 17:03 carlomedas

My 2 cents:

  • Jump Table Positions: End of File, works well with append only files. And really like the idea of skip the jump table and add a new one.
  • Jump Table Format:
    • Compressed Size/Uncompressed Size: Size. The meta(footer or header) should be negligible to process. Prefers no flags for simplicity
    • Checksum: XXH64 of uncompressed data.

advancedxy avatar Mar 16 '17 07:03 advancedxy

I'm not sure no flags is the best option, it leaves very little room to expand.

Potential flags I can think of currently:

  • Ability to have a constant block decompressed size. This could be useful in seekable files as they wouldn't need variable size blocks, and could immediately determine the block number they need. Admittedly this doesn't work very well with skippable frames, unless the offset method is used.
  • Ability to turn off checksum. This isn't as useful with large files, but with smaller files it could be preferable. Also, since the Zstd frame format already has a checksum of the uncompressed data, we'd probably want to be able to turn one of them off. If you can turn off the one in the splittable header, then a regular (non-splittable aware) decompressor can still verify the checksums.
  • Some sort of dict ID specification/jump table: a possible use case would be to encode a few dictionaries inline at the (or elsewhere), and then have the compressor decide on the fly which to use for each chunk. We'd need some way for the decompressor to find these, possibly by having the option to add a dictionary jump table?

Since the implementation will be in the library itself, we can handle some complexity there, and we want to make sure we have the flexibility to handle all use-cases that could benefit from this.

sean-purcell avatar Mar 17 '17 18:03 sean-purcell

Since the implementation will be in the library itself, we can handle some complexity there, and we want to make sure we have the flexibility to handle all use-cases that could benefit from this.

On second thought, It's indeed better to use flags to handle various use cases.

advancedxy avatar Mar 19 '17 15:03 advancedxy

I'm not aware of any scenarios where the seek to end of file requirement, for reads, is an issue, but if there are such scenarios then this topic is open.

One scenario where seeking is not possible is when reading from a pipe. Granted, then, also, no frame can be skipped. But a performance improvement can be achieved by not processing frames one is not interested in.

As there are distinct advantages and limitations to having index frames prepended or appended, it might be useful to provide both.

It is already possible to make a distributed index by prepending by using pzstd skippable frames and Zstandard frame headersʼ Frame_Content_Size.

For appending, I think that a similar simple format would be best. Other extensions such as checksums and dictionaries would then be better implemented in separate frames.

Senemu avatar Mar 30 '17 11:03 Senemu

I've created a first round implementation.

The format description is here, comments/feedback is appreciated.

As for the API, the planned compression API looks like this:

/*===== Seekable compressor management =====*/
ZSTD_seekable_CStream* ZSTD_seekable_createCStream(void);
size_t ZSTD_seekable_freeCStream(ZSTD_seekable_CStream* zcs);

/*===== Seekable compression functions =====*/
size_t ZSTD_seekable_initCStream(ZSTD_seekable_CStream* zcs, int compressionLevel, int checksumFlag, unsigned maxFrameSize);
size_t ZSTD_seekable_compressStream(ZSTD_seekable_CStream* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input);
size_t ZSTD_seekable_endFrame(ZSTD_seekable_CStream* zcs, ZSTD_outBuffer* output);
size_t ZSTD_seekable_endStream(ZSTD_seekable_CStream* zcs, ZSTD_outBuffer* output);

which looks similar to the regular zstd streaming API.

On the decompression side it's a bit more complex, since a decompressor needs to be able to seek.

There's a few cases I can think of here that we can provide for an API:

  • File is entirely in memory, similar to the ZSTD_decompress API:
    ZSTD_seekable_decompressRange(void* dst, size_t dstSize, void* src, size_t srcSize, size_t offset, size_t length);
    
  • File is on disk, in which we could provide a FILE* oriented API
    ZSTD_seekable_init(ZSTD_seekable* zds, FILE* src);
    ZSTD_seekable_decompress(ZSTD_seekable* zds, void* dst, size_t dstSize, unsigned long long offset); /*< There's a bit of a naming clash here with the in-memory API above */
    
    This API could also be extended to allow more advanced usage when the user can't provide a FILE* object, for example with a callback-based API like this:
    typedef void(*ZSTD_seekable_read)(void* opaque, void* buffer, size_t n);
    typedef void(*ZSTD_seekable_seek)(void* opaque, unsigned long long offset); /* Careful with files > 4 GiB and fseek() */
    typedef struct {
      void* opaque;
      ZSTD_seekable_read read;
      ZSTD_seekable_seek seek;
    } ZSTD_seekable_customFile;
    ZSTD_seekable_init_advanced(ZSTD_seekable* zds, ZSTD_seekable_customFile src);
    
    One concern here is that this model may be hard for users that are creating zstd bindings to other languages. Would this API be usable or are C callbacks too hard to interact with from other languages? Feedback here appreciated. An alternative way to provide functionality similar to this API would be to have a special return code from ZSTD_decompressStream that indicates the client needs to seek to a different position, like what's currently in the proposal.

Other relevant notes here:

  • It could be useful to allow clients direct access to the contents of the seek table, such as chunk offsets etc, so that they can break up and distribute the frames directly.
    ZSTD_seekable_getFrameCompressedOffset(ZSTD_seekable* zds, unsigned chunkIdx);
    ZSTD_seekable_getFrameDecompressedOffset(ZSTD_seekable* zds, unsigned chunkIdx);
    ZSTD_seekable_convertPosToChunk(ZSTD_seekable* zds, unsigned long long decompressedPosition);
    
  • For range decompression operations, is it easier for the client to provide the input in (offset, length) form, or in (rangeStart, rangeEnd) form?

sean-purcell avatar Apr 14 '17 18:04 sean-purcell

folks would you mind if I ask you to clarify what is current status of this?

Reason I ask is because I got quite confused trying to use git commits to understand what is going on... 😃

eg. despite multi-threading now being part of the main tool, pzstd still included as part of the contrib source directory and while pzstd refer to Skippable format, the RFC related to this ticket suggest the Seekable format.

I am confused. 😃

trixpan avatar Sep 07 '17 02:09 trixpan

The proposed format is implemented in this demo : https://github.com/iburinoc/zstd/tree/splittable/contrib/seekable_format So far, it's the only code which applies it.

pzstd is an unrelated project focused on multithreading for faster processing speed. pzstd has been interesting to experiment multiple appended frames, which is one mechanism used in the specification. But that's all they have in common, which means it's very little.

Cyan4973 avatar Sep 12 '17 00:09 Cyan4973

@iburinoc Direct lookup of the seek table is going to be critical to the Hadoop InputFormat case, correct? Since some byte range spanning one or more chunks is going to be distributed as an InputSplit to different hosts.

adamkennedy avatar Oct 30 '17 23:10 adamkennedy

@adamkennedy The seek table itself won't be split up though, and it's not hard to traverse the seek table to convert it into a direct lookup to determine what data is where in the source data.

sean-purcell avatar Oct 31 '17 15:10 sean-purcell

I solved similar problem - break large zstd/lz4/lzma files for parallel processing, by extending zstd/lz4 framing format. I am breaking input data into large chunks (recomending 1MB), with each rec reside in full inside chunk (record not broken between chunks). Each chunk is compressed independently (lz4 frame, zstd frame, or lzma (no frame), prepended by transport skippable frame and sequentially written. Transport frame contains size of compressed, size of uncompressed data, type of compression (I support lz4, zstd, lzma, and it can be a mix of compressions in file). And custom data tags. Transport frame protected by CRC. I can do random binary search in resulting format by file offset from beginning, or by custom tags (as soon as tags monotonically increasing, for example timestamps). Works fine on sub-1TB files. Breaking of file done by breaking into even parts (one part per processing thread), and then searching next transport frame. To make sure I found correct next transport frame, I am checking couple more that follows. Have multithreaded utility that compress/decompress/change compression, c++ library, java bindings etc. Works like a charm.

scherepanov avatar Nov 01 '17 17:11 scherepanov

It would be great if the seekable format plays well with the tabix index format (https://samtools.github.io/hts-specs/tabix.pdf ) widely used in bioinformatics; this would speed up a bunch of workflows by up to ~2-3x over BGZF. (I'd be happy to write some tabix indexing and seeking code, if the basic library functions are all ready.)

chrchang avatar Dec 15 '17 19:12 chrchang

@chrchang : Would you mind testing the experimental splittable format API for tabix use case ?

The goal would be to know if this API is suitable enough for such usage, and if not or partially, how to make it evolve to better fit the need.

Cyan4973 avatar Dec 15 '17 19:12 Cyan4973

Experimental splittable API was not enough for me. I implemented my own extended version of splittable API. Here are reasons and some details:

  1. My data have many formats, but all of them are record-oriented. Record must be processed as single unit. Record size is much smaller than compression block (my records sizes are 40bytes - 2KB, blocks are 128-256-512KB or 1MB). Many records fits into single compression block.
  2. Splitting record between two compression block makes processing much harder that it needs to be. I am requiring that record in full is inside compression block. That greatly simplify everything.
  3. Pure searching by arbitrary offset makes little sense when you have records - no guarantee that some record will not be partial. I need to get data from record boundaries only. If data starts in the middle of record, it is invalid.
  4. I do provide search by offset, but data provided from next record begin. That guarantee that data is valid.
  5. I support record attribute, 8-bytes integer that have to be monotonically increasing. In my case it is time. I support search on attribute. Again, I return data from record begin, always valid.

Experimental splittable API is of no use for me. I am searching for transport frames, read block, and inside block do data scan. That guarantee that I am always have valid record boundaries. Splittable API returns data from rather arbitrary offset - you have to guess record boundaries.

As additional feature, my API transparently support zstd, lz4 and lzma compressions. And uncompressed data. File can have several compression formats inside. Each compression block have it's own compression format, and API hide compression type (or absence of compression) from client. API support search in compressed formats by offset and by attribute, and in uncompressed format by attribute (and by offset, to provide data from record boundaries).

I implemented API with a lot of multithreading, and I am not hiding parallel nature of compressor/decompressor. That allow for more direct integration with multithreaded clients. I communicate with clients with blocks of data, corresponding to compression blocks in file. Splittable API produces single data stream, to parallelize it you have to do some efforts. I do not want to say that splittable API is bad. I just telling that it did not work for me, and reasons why.

That was interesting to hear about tabbix format. I think I implemented a small subset of tabbix format. Full support probably would be time consuming.

scherepanov avatar Dec 15 '17 21:12 scherepanov

Okay, after taking a look at the current seekable API, it appears to be adequate for a basic single-threaded tabix implementation, but it's missing parallel-compression and decompression-stream support which would really make seekable .zst files a joy to work with.

It looks like parallel compression wouldn't take much work to add: just create a version of ZSTD_seekable_compressStream that calls ZSTD_compress_generic?

Possible decompression stream interface (which I've previously written for BGZF files): during decompressor initialization, you pass an additional uint64_t* containing a sequence of uint64 [decompressed start pos, decompressed end pos) intervals (not necessarily in increasing-position order, and not necessarily disjoint), as well as the number of such intervals. (So if there are N intervals, the uint64_t* points to a length-2N array.) A compile-time constant can be defined to denote the decompressed file size, so N=1 with a [0, DECOMPRESSED_FILE_SIZE) interval would request a normal decompression stream. Ideally, initialization succeeds on a non-seekable zstd file when the intervals are disjoint and in increasing order. A nice property of this interface is that it supports parallel decompression, though that can be left out of the initial implementation.

chrchang avatar Dec 15 '17 21:12 chrchang

It looks like parallel compression wouldn't take much work to add: just create a version of ZSTD_seekable_compressStream that calls ZSTD_compress_generic?

I agree, that would probably work.

Cyan4973 avatar Dec 15 '17 21:12 Cyan4973

Just checking in on the progress of this issue. Is there anything I can do to assist?

We are already using the existing C API (with custom dictionary) inside a system that runs on about 10,000 machines, and if we can get splittability in and Hadoop integration over line we have a couple of use cases in the 100PB range we'd love to use it for. We've also had discussions with one of the major Hadoop vendors about assisting in the finals steps as well.

adamkennedy avatar Feb 17 '18 23:02 adamkennedy

Let's have a talk next week on this topic. We are missing "real-world" use cases, that's why it's hard to feel confident about the feature yet, which is required to move it from "experimental" status to production grade. Little details matter, and only production experience can give such hindsight.

So, if you have availability @adamkennedy, I'll be happy to discuss the topic and plan some next steps.

Cyan4973 avatar Feb 18 '18 01:02 Cyan4973

I'd be happy to. Are you at FB HQ? I just happened to speak to the team using the custom dictionary stuff today as well, and I think they've love to have a chat about that at some point too.

adamkennedy avatar Feb 22 '18 02:02 adamkennedy

Yes, I'm located at Menlo Park now. We can indeed plan for a face-to-face meeting too if we can make the logistic work.

Cyan4973 avatar Feb 22 '18 02:02 Cyan4973