xorf icon indicating copy to clipboard operation
xorf copied to clipboard

RFC: 0-copy serialization / deserialization

Open vlovich opened this issue 7 months ago • 14 comments

I'm building an application that has a bunch of large (megabytes) probabilistic filters. However, I don't want to pay the bincode serialization / deserialization cost as it adds up quite a bit (currently a massive bottleneck in my application). I'd like to be able to read the raw filter into memory & then create a filter that references it. Serde seems incompatible with this approach (but please correct me if that's not the case).

There's 2 parts of the filters I've observed:

  1. A fixed-size descriptor that's typically maybe 2-3 u64 words (e.g. segment length, segment count length etc).
  2. The fingerprints array.

For serialization I'm hoping this interface is not objectionable:

pub trait DmaSerializable {
    /// The serialized length of the descriptor. Very small and safe to allocate on-stack if needed.
    const DESCRIPTOR_LEN: usize;

    /// Copies the small fixed-length descriptor part of the filter to an output buffer.
    fn dma_copy_descriptor_to(&self, out: &mut [u8]);

    /// Obtains the raw byte slice of the fingerprints to serialize to disk.
    fn dma_fingerprints(&self) -> &[u8];
}

The application code should have the necessary information to dump the filter to disk without any serialization (just a simple memcpy at worst).

For deserialization things get tricky and there's a few ways to handle it. I think the "cleanest" way is a new Ref type (e.g. BinaryFuse8Ref<'a>) that can be constructed via from_dma_parts which takes in a pointer to the descriptor & the fingerprints and internally manage their access. I'm not sure about the cost of trying to avoid copying out of the descriptor vs eagerly destructuring it. My initial hunch would be to keep it simple and eagerly destructure.

@ayazhafiz thoughts?

vlovich avatar Nov 02 '23 23:11 vlovich

Thanks for writing this up, Vitali. I think this is reasonable. I do have a few notes:

  • I think this should be doable with both bincode and serde. Both expose "borrowing" deserializers, bincode here and serde with this, which I understand to be 0-copy from the underlying byte array (besides the stack copy for the fixed-sized header/descriptor set you mention). I looked into this briefly and it looks like serde also has a serde_bytes feature - my understanding is that if we have a struct like

    #[derive(Deserialize)]
    struct Filter<'a> {
      segment...: u32,
      #[serde(with = "serde_bytes")]
      fingerprints: &'a [u8],
    }
    

    then the segment* fields will be a trivial stack copy, and then fingerprints is immediately stored as a pointer into the remaining byte array from which the filter is being deserialized. So, effectively a small fixed-sized memcpy, and a pointer addition to perform the deserialization.

    What do you think of this approach? Would it satisfy your needs? Regarding the name, I think a *Ref<'a> suffix is reasonable. In hindsight, I regret making fingerprints a public member.

  • Separately, I am not opposed to a DMA interface, but I wonder if we can make it simpler. I think ideally we would have a "dma_serialize" and "dma_deserialize" function for filters that do the direct transmutation from or to a byte array, without having to segregate fingerprints or the fixed-size descriptors. Then deserialization is simply an mmap and a call to dma_deserialize, which as before, only does a stack copy and a pointer addition. I realize that for serialization this interface is not enough if you want to allocate the space up front, so maybe an additional function like dma_serialization_size that returns the size of the descriptor + number of bytes would be useful.

WDYT?

ayazhafiz avatar Nov 03 '23 03:11 ayazhafiz

How would you implement dma_serialization_size in a serde world?

vlovich avatar Nov 03 '23 18:11 vlovich

Also, I'm having trouble on 2 fronts. The first is that this fails to compile:

/// Like [`BinaryFuse16`] except represents a reference to a filter.
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "bincode", derive(Encode, Decode))]
#[derive(Debug, Clone)]
pub struct BinaryFuse16Ref<'a> {
    seed: u64,
    segment_length: u32,
    segment_length_mask: u32,
    segment_count_length: u32,
    /// The fingerprints for the filter
    #[cfg_attr(feature = "serde", serde(borrow))]
    fingerprints: &'a [u16],
}

results in

error[E0277]: the trait bound `&'a [u16]: Deserialize<'_>` is not satisfied
    --> src/bfuse16.rs:134:19
     |
134  |     fingerprints: &'a [u16],
     |                   ^^^^^^^^^ the trait `Deserialize<'_>` is not implemented for `&'a [u16]`

I also am not super clear on how to actually write a test to validate this.

vlovich avatar Nov 03 '23 18:11 vlovich

Thinking about it some more, I suspect splitting up the descriptor and the fingerprints for serialization might be interesting in that you could serialize the descriptor and fingerprints separately. That way you could have a tightly packed array of serialized descriptors and an array of fingerprints & lazily access the fingerprints. In a DMA style design, it's probably more efficiet to tightly pack the serialized descriptors because you get read-ahead of the descriptors (which might have locality) whereas in a packed serialization your read ahead pulls in parts of the filter (which is accessed randomly & thus sees 0 locality).

vlovich avatar Nov 03 '23 18:11 vlovich

Just acknowledging that I’ve seen the above comments and that I will reply in the next 48 hours.

ayazhafiz avatar Nov 05 '23 00:11 ayazhafiz

How would you implement dma_serialization_size in a serde world?

I think you would still expose a serialization_size function on the filter that exposes the byte size, and preallocate a buffer with that size before serializing.

Also, I'm having trouble on 2 fronts. The first is that this fails to compile:

I think the problem is serde has no direct deserialization of u16s, so we must decode to a pointer of u8s first and then trasmute. Something like https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=769fd8086f420484e8d26a076f18cd1e seems to work.

I also am not super clear on how to actually write a test to validate this.

What are you wishing to validate? The correctness of the ser/de code?

splitting up the descriptor and the fingerprints for serialization might be interesting in that you could serialize the descriptor and fingerprints separately. That way you could have a tightly packed array of serialized descriptors and an array of fingerprints & lazily access the fingerprints.

That certainly sounds interesting, but I'm curious what the concrete use cases are. Unfortunately, I do not think you can do much with the descriptor words on their own. I suspect that you do not actually want to access those two parts of a filter independently, and that the overwhelming use case is to serialize/deserialize an entire filter in one go, in which case separation doesn't help the memory cache.

ayazhafiz avatar Nov 05 '23 18:11 ayazhafiz

so we must decode to a pointer of u8s first and then trasmute

I'm worried about alignment issues in such an approach.

What are you wishing to validate? The correctness of the ser/de code?

Yeah, like how do I write a test to validate this.

suspect that you do not actually want to access those two parts of a filter independently, and that the overwhelming use case is to serialize/deserialize an entire filter in one go

I don't disagree that the "common" approach most people will take is to serialize everything in one go. I think there's a lot of value to splitting them up & that's really relevant if you're looking at 0-copy DMA in the first place.

Imagine an LSM database like RocksDB. You have a filter for each level & maybe even sub-filters for each block within the level (duplicate filter data but partitioned which is a space/time tradeoff common in this space). Let's take a typical database that has ~50 GiB worth of key data to be covered by filters with an average 40 byte key size (IIRC that's not atypical from the papers I've seen). That's ~1B keys which would be best case ~1.5 GiB of RAM dedicated to just storing the fingerprint data (assuming 9 bits per key on average). That's best case - in practice it should be a lot more because the block filters & level filters cover the same space + there's a non-trivial amount of overhead from partitioning across filters at that scale (+ ignoring the fact that you want some portion of the key space covered by larger filters to reduce false positives). That's both a lot of RAM & I/O that would have to be done for a data structure that's sparsely accessed & is basically "useless" memory usage (think of it as memory amplification taking away from memory you want to use to cache actual user data). If your DB had 1M filters, only ~24 MiB of data is needed for the descriptor part of all those filters (24 bytes per BinaryFuse filter * 1M). The descriptor is needed for every lookup and determines the location of the fingerprint blocks you'll access because containment is:

let hash = mix($key, $self.seed);
let mut f = fingerprint!(hash) as $fpty;
let (h0, h1, h2) = hash_of_hash(hash, $self.segment_length, $self.segment_length_mask, $self.segment_count_length);

which then lookups up offsets h0/h1/h2 within the fingerprint location & combines it with f.

However, if you have the descriptor & fingerprint data intermixed, that 24 MiB of data to represent 1M tables becomes 4 GiB of I/O just to access the descriptors to determine the positions within the fingerprint data to lookup because each descriptor requires 4 KiB of I/O to extract 24 bytes of data (on some drive topologies, this can even be as high as 64 KiB for QLC disks which are starting to be found in data centers). So instead of ~5ms to load the descriptors which can be done easily on opening the DB, you're looking at 1s best case iff and only iff it was all contiguous (200x overhead) & unlikely something you can do upfront. However, even NVME drives have a cost to doing random access I/O reads because IOP capacity is another aspect to keep in mind & 1 million operations is a non-trivial fraction of total IOP capacity. Additionally, I can keep the descriptor co-located with other metadata for the "thing being accessed" which means that there's actually no incremental cost - it's part of an I/O block I'm pulling in anyway (i.e. could have the descriptor + pointer to where the fingerprint data lives). So the descriptor then is free to access (no I/O amplification) & can be essentially cached forever (no memory amplification & no memory copy).

Similarly, with some additional changes to the API to add an async containment which looks like:

async fn contains_lazy<F>(descriptor: &[u8], f: F) -> bool where F: FnOnce(usize, usize, usize) -> (u8, u8, u8)

then an arbitrarily large filter could be checked for containment only using 3 disk I/O reads (12 KiB on TLC, 196 KiB on QLC) which is a substantial reduction. ~2 microseconds of I/O if random blocks of the filter aren't in the cache vs needing to pull in the entire filter if it's not read + not being able to utilize the memory more efficiently (i.e. if you do the lazy containment and have 100 MiB of cache you're dealing with, your 100 MiB will be shared across all filters based on access patterns vs if it's storing entire filters, then you've got 50 filters & most of their memory goes unused).

It's fine if you think this is a bad fit for the typical user of this library. I'm fine having my own fork.

Side note about QLC: the spec sheets for them support 4KiB read/writes but the IOP rate sinks like a stone so you really want to make sure you're doing native aligned operations & keeping data as close together as possible to minimize read amplification.

vlovich avatar Nov 11 '23 20:11 vlovich

I'm worried about alignment issues in such an approach.

This is an issue any API will face, no? I think it must be mandated that the user ensures the provided opaque byte array is properly aligned before the deserialization, regardless of the deserialization approach (if the deserialization is no-copy)

Yeah, like how do I write a test to validate this.

I think some idempotency tests, both in the filter behavior and the serialized bytes, are sufficient.

Your argument is reasonable. I have no issue with supporting the disjoint header + filter ser/de you suggest. Maybe a convenience method for the case where you want to ser/de it all in one go would also be nice, and that can directly call the disjoint methods since the size of the header is well known.

Apologies for the delay here, I'm sorry if you've lost interest.

ayazhafiz avatar Dec 10 '23 03:12 ayazhafiz

so we must decode to a pointer of u8s first and then trasmute

I'm worried about alignment issues in such an approach.

What are you wishing to validate? The correctness of the ser/de code?

Yeah, like how do I write a test to validate this.

suspect that you do not actually want to access those two parts of a filter independently, and that the overwhelming use case is to serialize/deserialize an entire filter in one go

I don't disagree that the "common" approach most people will take is to serialize everything in one go. I think there's a lot of value to splitting them up & that's really relevant if you're looking at 0-copy DMA in the first place.

Imagine an LSM database like RocksDB. You have a filter for each level & maybe even sub-filters for each block within the level (duplicate filter data but partitioned which is a space/time tradeoff common in this space). Let's take a typical database that has ~50 GiB worth of key data to be covered by filters with an average 40 byte key size (IIRC that's not atypical from the papers I've seen). That's ~1B keys which would be best case ~1.5 GiB of RAM dedicated to just storing the fingerprint data (assuming 9 bits per key on average). That's best case - in practice it should be a lot more because the block filters & level filters cover the same space + there's a non-trivial amount of overhead from partitioning across filters at that scale (+ ignoring the fact that you want some portion of the key space covered by larger filters to reduce false positives). That's both a lot of RAM & I/O that would have to be done for a data structure that's sparsely accessed & is basically "useless" memory usage (think of it as memory amplification taking away from memory you want to use to cache actual user data). If your DB had 1M filters, only ~24 MiB of data is needed for the descriptor part of all those filters (24 bytes per BinaryFuse filter * 1M). The descriptor is needed for every lookup and determines the location of the fingerprint blocks you'll access because containment is:

let hash = mix($key, $self.seed);
let mut f = fingerprint!(hash) as $fpty;
let (h0, h1, h2) = hash_of_hash(hash, $self.segment_length, $self.segment_length_mask, $self.segment_count_length);

which then lookups up offsets h0/h1/h2 within the fingerprint location & combines it with f.

However, if you have the descriptor & fingerprint data intermixed, that 24 MiB of data to represent 1M tables becomes 4 GiB of I/O just to access the descriptors to determine the positions within the fingerprint data to lookup because each descriptor requires 4 KiB of I/O to extract 24 bytes of data (on some drive topologies, this can even be as high as 64 KiB for QLC disks which are starting to be found in data centers). So instead of ~5ms to load the descriptors which can be done easily on opening the DB, you're looking at 1s best case iff and only iff it was all contiguous (200x overhead) & unlikely something you can do upfront. However, even NVME drives have a cost to doing random access I/O reads because IOP capacity is another aspect to keep in mind & 1 million operations is a non-trivial fraction of total IOP capacity. Additionally, I can keep the descriptor co-located with other metadata for the "thing being accessed" which means that there's actually no incremental cost - it's part of an I/O block I'm pulling in anyway (i.e. could have the descriptor + pointer to where the fingerprint data lives). So the descriptor then is free to access (no I/O amplification) & can be essentially cached forever (no memory amplification & no memory copy).

Similarly, with some additional changes to the API to add an async containment which looks like:

async fn contains_lazy<F>(descriptor: &[u8], f: F) -> bool where F: FnOnce(usize, usize, usize) -> (u8, u8, u8)

then an arbitrarily large filter could be checked for containment only using 3 disk I/O reads (12 KiB on TLC, 196 KiB on QLC) which is a substantial reduction. ~2 microseconds of I/O if random blocks of the filter aren't in the cache vs needing to pull in the entire filter if it's not read + not being able to utilize the memory more efficiently (i.e. if you do the lazy containment and have 100 MiB of cache you're dealing with, your 100 MiB will be shared across all filters based on access patterns vs if it's storing entire filters, then you've got 50 filters & most of their memory goes unused).

It's fine if you think this is a bad fit for the typical user of this library. I'm fine having my own fork.

Side note about QLC: the spec sheets for them support 4KiB read/writes but the IOP rate sinks like a stone so you really want to make sure you're doing native aligned operations & keeping data as close together as possible to minimize read amplification.

Hello @vlovich @ayazhafiz !!

I'm also building an app that realies on probabilistic structures heavily and have some gigabytes of filters in files on disk.

I want to use binary fuse filters but also realise that the serialisation/deserialisation is pretty bad for perf. For this I want to explore zero copy reading of these filters. (Also using a DMA approach with glommio).

For this API I think the header stuff can be serialised in any way since it is very small (but for zero copy, can use capnproto or rykv). But the data part needs to be in a separate section in the file system so headers can be loaded seperately like @vlovich mentioned. Also I think that this data part can be raw u8 binary because we essentially want to make a bit by bit comparison of fingerprint agains it. Can make the fingerprint an [u8; N] where N is specified by fingerprint length (it can be done with u16::to_be_bytes or similar methods) so won't have to worry about alignment at all, can just load the bytes and compare.

I'll build a PR with this API. Curious to hear what you think about using data as raw binary and also rykv/capnproto for the headers.

ozgrakkurt avatar Dec 12 '23 02:12 ozgrakkurt

thinking more about this, can have a third function get_fingerprint_indices(&self, key: &[u8]) -> (u64, u64, u64, u64) that only returns where the fingerprints should be located so the downstream application can load them however it wants. Also can have the fingerprints array empty in this case so we have seperate storage.

ozgrakkurt avatar Dec 12 '23 03:12 ozgrakkurt

@ozgrakkurt did you want to take a stab at putting up a pr implementing it?

vlovich avatar Dec 15 '23 08:12 vlovich

@ozgrakkurt did you want to take a stab at putting up a pr implementing it?

Thinking more about it, I'll just use blocked bloom filter since I can essentially first run a bucket index query for all elements I want to test and get only some buckets (32bytes each) and run query that way if I am loading a subset of data. Blocked bloom filter beats any other filter in speed and the impl I have is very suitable for this: https://github.com/ozgrakkurt/sbbf-rs so I plan to just use this combined with glommio read_many method.

So I can have 64 bits per key or something like that and get insane value since I am loading a tiny subset of filter bits

ozgrakkurt avatar Dec 15 '23 09:12 ozgrakkurt

@vlovich can use something like this (I didn't test it but it should work fine):

use std::collections::BTreeMap;

use futures::StreamExt;
use glommio::io::{ImmutableFile, MergedBufferLimit, ReadAmplificationLimit};
use anyhow::{Context, Result, anyhow};

#[inline(always)]
pub fn bucket_idx(hash: u64, num_buckets: u32) -> u32 {
    fastrange_rs::fastrange_32(hash.rotate_left(32) as u32, num_buckets)
}

/// Checks if a filter located in file at [offset..offset+num_buckets*32) contains any of the given hashes
pub async fn contains_any_hash(file: &ImmutableFile, offset: u64, num_buckets: u32, hashes: impl Iterator<Item=u64>) -> Result<bool> {
    let mut bucket_indices: BTreeMap<u32, Vec<u64>> = BTreeMap::default();

    for hash in hashes {
        bucket_indices.entry(bucket_idx(hash, num_buckets)).or_default().push(hash);
    }

    let reads = bucket_indices.keys().map(|&bucket_idx| {
        let start = offset + u64::from(bucket_idx) * 32;
        (start, 32)
    });
    let reads = futures::stream::iter(reads);

    let mut reads = file.read_many(
        reads,
        // Don't want reads bigger than 1K
        MergedBufferLimit::Custom(1024),
        // Single read is 32bytes so 32 times read amplification equals 1K?
        ReadAmplificationLimit::Custom(32),
    );

    // Just get a single bucket sized filter
    let mut filter = sbbf_rs_safe::Filter::new(32 as usize * 8, 1);

    for buf in reads.next().await {
        let ((bucket_offset, _), buf) = buf.map_err(|e| anyhow!("{}", e)).context("read a bucket from disk")?;
        let bucket_idx = ((bucket_offset - offset) / 32) as u32;

        let hashes = bucket_indices.get(&bucket_idx).unwrap();

        filter.as_bytes_mut().copy_from_slice(&buf);

        for &hash in hashes.iter() {
            if filter.contains_hash(hash) {
                return Ok(true);
            }
        }
    }

    Ok(false)
}

ozgrakkurt avatar Dec 29 '23 20:12 ozgrakkurt

@vlovich I just got this into the code and it makes a huge difference. Thanks for great idea!

When scanning all indices in db, it made it about 5x as fast as loading everything from disk. It is almost as fast as pinning everything in memory which is insane (pinning is 0.5x faster)

ozgrakkurt avatar Mar 16 '24 18:03 ozgrakkurt