arrow2
arrow2 copied to clipboard
Reading data chunk by chunk
I have a running crate extracting data from a file into ndarray later exposed with PyO3 for big data but I would like to migrate it to arrow instead to benefit performance and arrow ecosystem. The data blocks read are row based and I currently read several columns in parallel into already allocated ndarrays using rayon from a chunk of data in memory (not whole data block in order to avoid consuming too much memory). Types are mainly primitives but also utf8, complex and multi dimensional arrays of primitive or complex. File bytes can be little or big endians.
Can I have your advice on best method to read this data with arrow2 ? A few thoughts and directions I considered so far:
- I noticed there is Buffer to create a PrimitiveArray but comparing to official arrow crate, there is not from_bytes() implementation and PrimitiveArray does not vectorise Buffer. How to input data chunk by chunk into a Buffer ? Is there a way to input bytes along with a DataType ?
- It seems I could use MutablePrimitiveArray like a Vec with defined capacity but I am not sure this is performing. I am also bit afraid of cost to convert to PrimitiveArray.
- Is it still acceptable to simply keep my ndarray implementation and at the end convert everything back to arrow2 ? I did not see any zero copy from ndarray.
Thanks for reaching out and for the explanation, very interesting!
You understood it absolutely right: MutablePrimitiveArray
and the like are useful to add optional item by optional item, Buffer
, Bitmap
and Vec
are for bulk operations of values and validity.
I am assuming that the files are custom formats (e.g. not parquet) and that you know how to read them. In that case, the most performant way to create a primitive array is to read to values: Vec<T>
and use PrimitiveArray::new(DataType::..., values.into(), None)
. To read to Vec
efficiently, you can use
fn read_uncompressed_buffer<T: NativeType, R: Read>(
reader: &mut R,
length: usize,
is_little_endian: bool,
) -> Result<Vec<T>> {
let bytes = length * std::mem::size_of::<T>();
// it is undefined behavior to call read_exact on un-initialized, https://doc.rust-lang.org/std/io/trait.Read.html#tymethod.read
// see also https://github.com/MaikKlein/ash/issues/354#issue-781730580
let mut buffer = vec![T::default(); length];
if is_native_little_endian() == is_little_endian {
// fast case where we can just copy the contents as is
let slice = bytemuck::cast_slice_mut(&mut buffer);
reader.read_exact(slice)?;
} else {
read_swapped(reader, length, &mut buffer, is_little_endian)?;
}
Ok(buffer)
}
fn read_swapped<T: NativeType, R: Read>(
reader: &mut R,
length: usize,
buffer: &mut Vec<T>,
is_little_endian: bool,
) -> Result<()> {
// slow case where we must reverse bits
let mut slice = vec![0u8; length * std::mem::size_of::<T>()];
reader.read_exact(&mut slice)?;
let chunks = slice.chunks_exact(std::mem::size_of::<T>());
if !is_little_endian {
// machine is little endian, file is big endian
buffer
.as_mut_slice()
.iter_mut()
.zip(chunks)
.try_for_each(|(slot, chunk)| {
let a: T::Bytes = match chunk.try_into() {
Ok(a) => a,
Err(_) => unreachable!(),
};
*slot = T::from_be_bytes(a);
Result::Ok(())
})?;
} else {
// machine is big endian, file is little endian
}
Ok(())
}
Vec<T>
to Buffer<T>
is O(1)
, since Buffer
is basically anArc<Vec<T>>
.
In other words, read the data into Vec<T>
as you do in Rust and create an array via PrimitiveArray::new(DataType::..., values.into(), None)
. CPU-wise this operation is essentially an zeroed allocation + a read to that region.
Does it make sense?
For Utf8Array (say [a, bb, ccc]
) you need to create two buffers
-
values: Vec<u8>
with[a,b,b,c,c,c]
-
offsets: Vec<i32>
with[0, 1, 3, 6]
(first is always 0; first element of values has a length of 1 = 1-0, second element (bb
) has a length of 2 = 3-1 , last (ccc
) has a length of 3 = 6-3).
and then use Utf8Array::<i32>::new(DataType::Utf8, values.into(), offsets.into(), None)
. The creation of these vectors depend on the details of how the strings are stored in disk. Lengths are measured in bytes, values between two offsets must be valid utf8 (we check). Use try_new
to error on invalid data (e.g. invalid utf8), use unsafe new_unchecked
or unsafe try_new_unchecked
to skip some of the checks.
Thanks for your answer :) To correct my intention ; the data blocks I want to read are consecutive records/rows, typically like [record1, record2, record3] with record=[channel1:u8, channel2:u64, channel3:f64]. And I read the data block by chunks: chunk1=[record1..recordn], chunk2=[recordn+1..recordz], etc. example snippet for reading u16:
// data is a ndarray already allocated, data_chunk is Vec<u8> containing n records of length record_length
if swap_needed {
for (i, record) in data_chunk.chunks(record_length).enumerate() {
value =
&record[pos_byte_beg..pos_byte_beg + std::mem::size_of::<u16>()];
data[i + previous_index] = u16::from_be_bytes(
value.try_into().expect("Could not read be u16"),
);
}
} else {
for (i, record) in data_chunk.chunks(record_length).enumerate() {
value =
&record[pos_byte_beg..pos_byte_beg + std::mem::size_of::<u16>()];
data[i + previous_index] = u16::from_le_bytes(
value.try_into().expect("Could not read le u16"),
);
}
}
But I will focus based on your advice to read into Vec<T> to more easily save into PrimitiveArray for 1D arrays. I will have to study more the API to convert then into complex and n dimensional arrays.
A useful way of thinking about arrow2 is:
-
Buffer<T> ~= Arc<Vec<T>>
-
Bitmap ~= Arc<Vec<u8>>
-
PrimitmiveArray<T> ~= (Buffer<T>, Bitmap)
.
That is why to build a PrimitiveArray we can just use Vec<T>
.
The format is row/record based. In that case we always need to perform a transposition of the data, as you are showing, which means that we can't load multiple values in one go as I was describing.
One way we do transpositions in this case is to declare "mutable columns" and push to them item by item in record. This allows us to only transverse all records once. We do something similar for CSV, see here. That is essentially:
let mut all: Vec<Box<dyn Array>> = vec![];
for i, column in columns.enumerate():
let mut data = vec![];
for row in rows:
data.push(row[i])
let c: Box<dyn Array> = finalize(data);
all.push(c)
Thanks.
I changed my approach by having Vec<T> basically for all columnar data (1D arrays), including complex which I then convert into Arrow PrimitiveArray, FixedSizeListArray (for complex), Utf8Array (for Strings), BinaryArray for variable length binary and FixedSizeBinary for fixed length binary array. Most of them using Buffer::from_slice(), so I expect little performance impact.
The only thing I am not so sure of is about Bitmap creation. I can have boolean array (Vec
Also the file I am reading can contain ndarrays, changing by timestamp. I looked at tensor in apache/arrow and several issue tickets. It seems it is not well supported and tested yet so I suppose you are not implementing it yet. But could you advise which type is most suitable to have compatibility with Pandas or Polars ? I have seen in Jira FixedSizeBinary but seems weird to use binary if type is already known.
While trying to use MutableArray, I am facing issues with missing traits implementation, mainly for MutableFixedSizeListArray with PartialEq and Default.
I am using it to store Vec<Option<Complex
Nice, that is interesting! I am also trying to improve the API for COW semantics atm!
wrt to Default and PartialEq - no reason, just forgotten - let's add it! Would you like to do it, or would you like me to?
wrt to the Vec<bool>
, that is a different format than Bitmap
, which contains Vec<u8>
where every element is a bit. One way to convert one to the other is via Bitmap::from_trusted_len_iter(vec.into_iter())
(it is still a bit expensive).
Hi Sorry for the delayed answer. I am not sure if I am comfortable enough to implement Default and PartialEq but I can try. I am still at learning phase of Rust.
I am currently integrating further arrow2 into mdfr But I am facing issues with mutable reference issues for arrays that are not primitive. In relation with #1601 where it is simple to add get_mut_values() for fixedsizedbinary, it becomes more complicated for arrays including offset (Utf8, Largebinary, arrow2 fork ). Getting also complicated for fixedsizelistarray. So COW seems practicable for primitives but is it applicable also for other arrays ? Did I miss something ?