iceberg-python icon indicating copy to clipboard operation
iceberg-python copied to clipboard

Support partitioned writes

Open Fokko opened this issue 2 years ago โ€ข 43 comments

Feature Request / Improvement

Support partitioned writes

So I think we want to tackle the static overwrite first, and then we can compute the predicate for the dynamic overwrite to support that. We can come up with a separate API. I haven't really thought this trough, and we can still change this. I think the most important steps are the breakdown of the work. There is a lot involved, but luckily we already get the test suite from the full overwrite.

Steps I can see:

  • [x] Extend the summary generation to support partitioned writes (here in Java)
  • [x] Add support for the append files.
    • How are we going to fan out the writing of the data. We have an Arrow table, what is an efficient way to compute the partitions and scale out the work. For example, are we going to sort the table on the partition column and do a full pass through it? Or are we going to compute all the affected partitions, and then scale out?
  • [x] Add support for static overwrites
  • [x] Add support for dynamic overwrites

Other things on my mind:

  • In Iceberg it can be that some files are still on an older partitioning, we should make sure that we handle those correctly based on the that we provide.
  • How to handle delete files; it might be that the delete files become unrelated because the affected datafiles are replaced. We could first ignore this.

The good part:

  • In PyIceberg we're first going to ignore the fast-appends (this is when you create a new manifest, and add it to the manifest list). Instead we'll just take the existing manifest(s) and rewrite it into a single new manifest which makes it a bit easier to reason about the snapshot (and therefore the snapshot summaries). The reason is that this caused quite a few bugs in Java, and it can be added always on a later moment.

Fokko avatar Dec 12 '23 20:12 Fokko

Hi @Fokko and Iceberg community, I and @syun64 are continuing working on testing the write capability in Write support pr. We are excited about it as it will help us with our use case a lot.

Our use case also includes overwriting partitions of tables. So I am highly interested in opportunities to contribute to this issue. Would it be alright for me to start working on this issue, based on the Write support pr if no one else has already begun?

jqin61 avatar Jan 05 '24 22:01 jqin61

Hey @jqin61 Thanks for replying here. I'm not aware of the fact that anyone already started on this. It would be great if you can take a stab at it ๐Ÿš€

Fokko avatar Jan 06 '24 15:01 Fokko

In Iceberg it can be that some files are still on an older partitioning, we should make sure that we handle those correctly based on the that we provide.

It seems Spark's iceberg support has such overwrite behaviors under partition scheme evolution:

  • dynamic overwrite: data files generated from old partition spec will not be replaced even if some of the records match the overwriting data
  • static overwrite with PARTITION values specified: same as above
  • static overwrite without PARTITION values: all data is deleted regardless of what partition specs they conform to.

As Fokko mentioned, we need to make sure in the implementation we use the latest partition spec_id when overwriting partitions so that the data in the old partition spec is not touched.

jqin61 avatar Jan 10 '24 18:01 jqin61

How are we going to fan out the writing of the data. We have an Arrow table, what is an efficient way to compute the partitions and scale out the work. For example, are we going to sort the table on the partition column and do a full pass through it? Or are we going to compute all the affected partitions, and then scale out?

It just comes to me that when spark writes to iceberg, it requires the input dataframe to be sorted by the partition value otherwise an error will be raised during writing. Do we want to take the same assumption for pyiceberg?

If not, if we have to use arrow.compute.filter() to extract each partition before serialization, it seems a global sort before the filter() on the entire table is unnecessary since the filter makes no assumption of the array organization?

To extract the partitions by filter(), would it be helpful if we firstly build an API in pyarrow which does a full scan of the array and bucket-sorts it into partitions and returns buckets (partitions) as a list of arrow arrays? These arrays could be further passed as input to writing jobs which are executed in a multi-threading way.

jqin61 avatar Jan 12 '24 17:01 jqin61

I currently see two approaches:

  • First get the unique partitions, and then filter for each of the partitions the relevant data. It is nice that we know the partition upfront, but the downside is that we need to do a pass through the dataframe to get the relevant rows for that partition.
  • Sort the entire dataframe on the partition columns, which is expensive, but we need to do this only once. Once it is sorted, we can just do a single pass through the dataframe. If we want to scale out, we can split out the rows: [0, n/2], [n/2, 2].

I'm not sure what is the best. I think the first one works better if you have few partitions, and the latter one is more efficient when you have many partitions.

To extract the partitions by filter(), would it be helpful if we firstly build an API in pyarrow which does a full scan of the array and bucket-sorts it into partitions and returns buckets (partitions) as a list of arrow arrays? These arrays could be further passed as input to writing jobs which are executed in a multi-threading way.

Starting with the API is always a great idea. My only concern is that we make sure that we don't take copies of the data, since that might blow up the memory quite quickly.

Hope this helps!

Fokko avatar Jan 12 '24 20:01 Fokko

@jqin61 I did some more thinking over the weekend, and I think that the approach that you suggested is the most flexible. I forgot about the sort-order that we also want to add at some point. Then we would need to sort twice ๐Ÿ‘Ž

Fokko avatar Jan 15 '24 09:01 Fokko

Based on the existing discussion, there are 3 major possible directions for detecting partitions and writing each partition in a multi-threaded way to maximize I/O. It seems there isnโ€™t any approach simple enough that we could purely leverage the existing Pyarrow APIs in Pyiceberg. I marshalled Fokko's suggestions and list these approaches for discussion purpose:

Filter Out Partitions As Fokko suggested, we could filter the table to get partitions before writing but we will need an API on Arrow to get unique partition values (e.g. extend compute.unique() from array/scalar to table).

partitions: list[dict] = pyarrow.compute.unique(arrow_table)

With it, we could filter the table to get partitions and provide them as inputs to concurrent jobs.

arrow_table = pa.table({
    'column1': ['A', 'B', 'A', 'C', 'B'],
    'column2': [1, 2, 1, 3, 2],
    'column3': ['X', 'Y', 'X', 'Z', 'Y']
})
partition_keys = table.select(['column1', 'column2'])
# The existing unique does not have support on table, we need to create API on Arrow side.
partitions: list[dict] = pyarrow.compute.unique(partition_keys)
# = [
    {'column1': 'A', 'column2': 1},
    {'column1': 'B', 'column2': 2},
    {'column1': 'C', 'column3': 3}
]


def filter_and_write_table(partition, index):
    # Create a boolean mask for rows that match the criteria
    mask = pc.and_(*(pc.equal(table[col], val) for col, val in partition.items()))
    
    # Filter the table
    filtered_table = table.filter(mask)

    # Write the filtered table to a Parquet file
    parquet_file = f'output_partition_{index}.parquet'
    pq.write_table(filtered_table, parquet_file)

with ThreadPoolExecutor() as executor:
    for i, partition in enumerate(partitions):
        executor.submit(filter_and_write_table, partition, i)

Sort and Single-direction Writing As Fokko suggested, we could sort the table first. We then slice the table and do a one-direction scan for each slice to write out partitioned files. Suppose we have such an arrow API that takes a sorted table, scans through it, creates a new file whenever encountering a row with a new partition value, and raises an error if it encounters a row with a partition value it already passes, just like how spark writes to an iceberg table.

def write_table_partitions(sorted_table, partition_columns, dir)

Then we could do

partition_columns = ['column1', 'column2']
sorted_table = table.sort_by([('column1', 'ascending'), ('column2', 'ascending')])
directory_path = '/path/to/output/directory'

# Break down the sorted table to slices with zero-copy
slices = slice_table(sorted_table, slice_options)

# Call the API
with ThreadPoolExecutor() as executor:
    # Submit tasks to the executor
    for i, partition in enumerate(partitions):
        executor.submit(write_table_partitions, sorted_table, partition_columns, dir)

Bucketing We could create an arrow API to return the partitioned tables/record batches based on the inputs of a table and alist of partition columns in a way that the algorithm does a full scan of the arrow table in O(table_length) time and bucket-sorts it and creates a table/record batch for each bucket:

table_partitions = pyarrow.compute.partition(arrow_table, partition_columns)

We could write each batch:

def write_table_to_parquet(table, directory_path, file_suffix):
    # Construct the full path for the Parquet file
    file_path = os.path.join(directory_path, f'record_batch_{file_suffix}.parquet')

    # Write the table to a Parquet file
    pq.write_table(table, file_path)

with ThreadPoolExecutor() as executor:
    for i, partition_as_table in enumerate(table_partitions):
        executor.submit(write_table_to_parquet, partition_as_table, directory_path, i)

As Fokko pointed out, the filter method will not be efficient if there are many partitions - the filter takes O(table_length) time and although each thread can filter on its own, on a single node, the execution will be O(table_length * number_of_partitions) for all the jobs. Technically we only need one same scan to get all the buckets.

It seems the sort method is not as efficient compared to the bucketing method because the relative order of partitions does not matter, so a general sort algorithm on the partition column might be overkill (compared with bucketing).

I feel like all 3 directions require some implementation on Arrow itself (I did not find any approach simple enough that we could purely leverage the existing Pyarrow APIs to implement any of the directions). And I want to get opinions on whether pursuing arrow API level utilities smells good. Thank you!

Specifically, for the third direction of bucketing and returning materialized tables/batches, since Arrow has dataset.write_dataset() which supports partition-respected writing, I did some reading to see how it partitions and whether we could leverage anything from it.

https://github.com/apache/arrow/blob/main/cpp/src/arrow/dataset/partition.cc#L118 is where the partition happens. The partition algorithm is a full scan with bucket sort leveraging Grouper class utilities in arrow's compute component. Specifically: 1.Grouper.consume() initiates the groups based on the presenting partition columns https://github.com/apache/arrow/blob/55afcf0450aa2b611e78335bdbfd77e55ae3bc9f/cpp/src/arrow/compute/row/grouper.cc#L422 2.Grouper.MakeGroupings() builds a ListArray where each list represents a partition and each element in the list represents the row_id of the original table. https://github.com/apache/arrow/blob/55afcf0450aa2b611e78335bdbfd77e55ae3bc9f/cpp/src/arrow/compute/row/grouper.cc#L886C45-L886C58 3.Grouper.ApplyGroupings() efficiently converts the grouped representation of row_ids into actual rows. https://github.com/apache/arrow/blob/55afcf0450aa2b611e78335bdbfd77e55ae3bc9f/cpp/src/arrow/compute/row/grouper.cc#L875

Other than being used in the dataset writing, Grouper from Arrow's compute component is used to support other exposed compute APIs such as aggregation functions. At the end of the day, what we want (in order to support Pyiceberg's partitioned write) is an API that returns record batches/tables based on an input table and an input partition scheme, so maybe we could expose such a new API under compute leveraging Grouper.

jqin61 avatar Jan 19 '24 23:01 jqin61

@jqin61 just wondering if we can use this directly https://arrow.apache.org/docs/python/generated/pyarrow.dataset.partitioning.html

asheeshgarg avatar Jan 23 '24 13:01 asheeshgarg

@jqin61 just wondering if we can use this directly https://arrow.apache.org/docs/python/generated/pyarrow.dataset.partitioning.html

Thank you Ashish! I overlooked it, as you mention, we could just use write_dataset() with specified args of partitioning base_nametemplate to write out the partitioned datafiles as iceberg needs.

jqin61 avatar Jan 23 '24 20:01 jqin61

@Fokko @jqin61 I am also interested in this to move forward as we also deal with lot of write involves partitions. Happy to collaborate on to this. For write_dataset() we might need to look if we need to add field meta data at parquet in terms of field-id etc while committing data files.

asheeshgarg avatar Jan 23 '24 20:01 asheeshgarg

@Fokko @jqin61 I am also interested in this to move forward as we also deal with lot of write involves partitions. Happy to collaborate on to this. For write_dataset() we might need to look if we need to add field meta data at parquet in terms of field-id etc while committing data files.

Yes - I think we learned this from our earlier attempts: https://github.com/apache/iceberg-python/pull/41/files/1398a2fb01341087a1334482db84a193843a2362#r1427302782

As @jqin61 pointed out in a previous PR, adding these to the schema should output parquet files with the correct field_id.

sungwy avatar Jan 23 '24 20:01 sungwy

@Fokko @jqin61 Today I tried basic example on partition write:

from pyiceberg.io.pyarrow import schema_to_pyarrow
import pyarrow as pa
from pyarrow import parquet as pq
data = {'key': ['001', '001', '002', '002'],
        'value_1': [10, 20, 100, 200],
        'value_2': ['a', 'b', 'a', 'b']}
my_partitioning = pa.dataset.partitioning(pa.schema([pa.field("key", pa.string())]), flavor='hive')
TABLE_SCHEMA = Schema(
    NestedField(field_id=1, name="key", field_type=StringType(), required=False),
    NestedField(field_id=2, name="value_1", field_type=StringType(), required=False),
    NestedField(field_id=3, name="value_2", field_type=StringType(), required=False),
)
schema = schema_to_pyarrow(TABLE_SCHEMA)
patbl = pa.Table.from_pydict(data)
pq.write_to_dataset(patbl,'partitioned_data',partitioning=my_partitioning,schema=schema)

If I don't use schema in write it works fine. But if I pass the schema create

schema = schema_to_pyarrow(TABLE_SCHEMA)

It fails with

ArrowTypeError: Item has schema
key: string
value_1: int64
value_2: string
which does not match expected schema
key: string
  -- field metadata --
  PARQUET:field_id: '1'
value_1: string
  -- field metadata --
  PARQUET:field_id: '2'
value_2: string
  -- field metadata --
  PARQUET:field_id: '3'

I also tried the parquet write the way we are doing currently:

writer = pq.ParquetWriter("test", schema=schema, version="1.0") 
writer.write_table(patbl)
ValueError: Table schema does not match schema used to create file: 
table:
key: string
value_1: int64
value_2: string vs. 
file:
key: string
  -- field metadata --
  PARQUET:field_id: '1'
value_1: string
  -- field metadata --
  PARQUET:field_id: '2'
value_2: string
  -- field metadata --
  PARQUET:field_id: '3

Do we do any other transformation for the schema before we write in current write support.

asheeshgarg avatar Jan 24 '24 21:01 asheeshgarg

Hey @jqin61

Thanks for the elaborate post, and sorry for my slow reply. I did want to take the time to write a good answer.

Probably the following statement needs another map step:

partitions: list[dict] = pyarrow.compute.unique(arrow_table)

The above is true for an identity partition, but often we take truncate the month, day or hour from a field, and use that as a partition. Another example is the bucketing partition where we hash the field, and determine in which bucket it will fall.

With regard of utilizing the Arrow primitives that are already there. I think that's a great idea, we just have to make sure that they are flexible enough for Iceberg. There are a couple of questions that pop into my mind:

  • Can we support all Icebergs partition strategies, such as bucketing, truncating etc.
  • Are we able to extract the metrics similar that we do for non-partitioned writes.

@asheeshgarg Thanks for giving it a try. Looking at the schema, there is a discrapency. The test-data that you generate has value_1 as an int64, and the table expects a string. I think the error is correct here.

Fokko avatar Jan 25 '24 14:01 Fokko

@Fokko thanks for pointing out the mismatch it worked. After modifying the datatype it worked.

asheeshgarg avatar Jan 25 '24 18:01 asheeshgarg

@Fokko Thank you! These 2 points of supporting hidden partitioning and extracting metrics efficiently during writing are very insightful!

For using pyarrow.dataset.write_dataset(), its behavior removes the partition columns in the written-out parquet files. I think this is the deal breaker for using write_dataset(). So either we extend pyarrow.dataset.write_dataset() or fall back to the arrow API direction.

Some findings during chasing a solution of dataset.write_dataset():

  1. pyarrow.dataset.partitioning() only supports static values in the column, so we might need on Iceberg to add a transformed column. (this column will be dropped into the directory when write_dataset() uses hive partitioning)
  2. write_dataset() takes in a customized callable to collect file paths and file metadata when files are written and when we create DataFile object this metadata could be leveraged:
visited_paths = []
metadata_list = []
def file_visitor(written_file):
    visited_paths.append(written_file.path)
    metadata_list.append(written_file.metadata)

jqin61 avatar Jan 26 '24 17:01 jqin61

Right, as @jqin61 mentioned, if we only had to support Transformed Partitions, we could have employed some hack to add partition column to the dataset, which gets consumed by write_dataset API when we pass the column in pyarrow.dataset.partitioning.

But we can't apply the same hack with Identity Partitions, where the HIVE partition scheme on the file path shares the same name as the partition column that needs to be persisted into the data file. Arrow does not allow two columns to share the same name, and this hack will lead to an exception on write_dataset.

So it sounds like we might be running out of options in using the existing APIs...

If we are in agreement that we need a new PyArrow API to optimally bucket sort the partitions and produce partitioned pyarrow tables or record batches to pass into WriteTask, do we see any value in introducing a simpler PyIceberg feature in the interim, where write_file can support partitioned tables as long as the provided arrow_table only has a single partition of data?

I think introducing this first would have two upsides:

  1. We decouple the work of supporting writes to partitioned table (like handling partitions in file paths on write, adding partition metadata to manifests) with the work of optimally sorting and bucketing an arrow table into target partitions
  2. If a user really needs to break down their in memory pyarrow table into partitions, they can do so, using existing methods to filter on the partition column and producing a new pyarrow.Table. This isn't optimal, especially if they have many partitions within the in-memory table, and is precisely the reason why @jqin61 is investigating the different options in bucket sorting by partition within Arrow/Arrow Datasets.

sungwy avatar Jan 26 '24 18:01 sungwy

Maybe another approach we could take if we want to use existing PyArrow functions is:

  1. table.sort_by (all partitions)
  2. figure out the row index for each permutation of partition groups by taking another pass through the table
  3. Use table.slice(index, length) with indexes we generated above to write out the tables using List[WriteTask] in write_file

If there was an existing PyArrow API that gave us the outcome of (1) + (2) in one pass, it would have been the most optimal, but it seems like there isn't... so I think taking just one more pass to find the indices is maybe not the worst idea.

We could also argue that (1) should be a requirement that we check on the provided PyArrow table, rather than running the sort within the PyIceberg API.

Please let me know your thoughts!

sungwy avatar Jan 26 '24 19:01 sungwy

@jqin61 I have also seen this behavior pyarrow.dataset.write_dataset(), its behavior removes the partition columns in the written-out parquet files. @syun64 above approach look reasonable to me.

It would have been ideal if the partition write we can be done directly using arrow dataset API and use meta data based hidden partitioning using Pyiceberg API. But we need to do good amount of lift in order to that. Haven't seen support for bucket partitioning.

I think we can add write directly using the Pyarrow API as suggested above.

asheeshgarg avatar Jan 26 '24 20:01 asheeshgarg

@Fokko @syun64 @syun64 another option I can think is use polars to do it simple example below with hashing and partitioning sorting in partition. Where all the partition is handle by rust layer in Polars and we write parquet based on arrow table returned. Not sure if we want to add it as dependency? We can do custom transforms like hours etc we have in iceberg as well easily.

import pyarrow as pa import pyarrow.compute as pc import polars as pl t = pa.table({'strings':["A", "A", "B", "A"],'ints':[2, 1, 3, 4]}) df=pl.from_arrow(t)

N = 2 tables=(df.with_columns([ (pl.col("strings").hash() % N).alias("partition_id") ]).partition_by("partition_id"))

for tbl in tables: print(tbl.to_arrow().sort_by("ints"))

asheeshgarg avatar Jan 26 '24 21:01 asheeshgarg

@jqin61 and I discussed this offline, and just wanted to follow up on possible options for step (2). If we wanted to use existing PyArrow functions, I think we could use a 2 pass algorithm to figure out the row index of each permutation of partition groups on a partition-sorted pyarrow table:

import pyarrow as pa
import pyarrow.dataset
import pyarrow.compute

# pyarrow table, which is already sorted by partitions 'year' and 'animals'
pylist = [{'year': 2021, 'animals': 'Bear'}, {'year': 2021, 'animals': 'Bear'}, {'year': 2021, 'animals': 'Centipede'}, {'year': 2022, 'animals': 'Cat'}, {'year': 2022, 'animals': 'Flamingo'},{'year': 2023, 'animals': 'Dog'}]
table = pa.Table.from_pylist(pylist)

# assert that the table is sorted by checking sort_indices are in order
pa.compute.sort_indices(table, sort_keys=[('year', "ascending"), ("animals", "ascending")])
<pyarrow.lib.UInt64Array object at 0x7fe9b0f4c340>
[
  0,
  1,
  2,
  3,
  4,
  5
]

# then sort the same list of partitions in opposite order, and check the indices to figure out the offsets and lengths of each partition group. If a larger number comes before a smaller index, that's the starting offset of the partition group. the number of consecutive number of indices that are in correct ascending order is the length of that partition group.
pa.compute.sort_indices(table, sort_keys=[('year', "descending"), ("animals", "descending")])
<pyarrow.lib.UInt64Array object at 0x7fe9b0f3ff40>
[
  5,
  4,
  3,
  2,
  0,
  1
]

# from above, we get the following partition group:
partition_slices = [(0, 2), (1, 1), (2, 1), (3, 1), (4, 1), (5, 1)]

for offset, length in partition_slices:
    write_file(iceberg_table, iter([WriteTask(write_uuid, next(counter), table.slice(offset, length))]))

Then, how do we handle transformed partitions? I think going back to the previous idea, we could create intermediate helper columns to generate the transformed partition values in order to use them for sorting. We can keep track of these columns and ensure that we drop the column after we use the above algorithm to split up the table by partition slices.

Regardless of whether we choose to support sorting within the PyIceberg write API or have it as a requirement, maybe we can create a helper function that takes the PartitionSpec of the iceberg table and the pyarrow table and makes sure that the table is sortedByPartition by using the above method.

sungwy avatar Jan 26 '24 21:01 sungwy

The Design Document on data file writes that was discussed during the monthly sync. The document summarizes all of the approaches discussed above

sungwy avatar Jan 31 '24 17:01 sungwy

I have an incoming PR with working code samples that conform to the design above and cover identity transform + append as the first step of supporting partitioned write. During implementation, I find if the partition column has nulls the code will break. This issue is the same with the existing write where append() or overwrite() would break for any arrow table with a column consisting only of nulls. So I opened Issue#348 to track separately.

jqin61 avatar Feb 01 '24 16:02 jqin61

Opened draft PR with working code samples (it supports partitioned append with identity transform for now): https://github.com/apache/iceberg-python/pull/353

jqin61 avatar Feb 02 '24 05:02 jqin61

Updates for monthly sync:

  1. Working on dynamic overwrite which gets unblocked by partial deletes https://github.com/apache/iceberg-python/pull/569
  2. For transforms functions, we could convert the arrow column to a Python list and feed that to the transform function to generate transformed pyarrow columns for grouping partitions using existing algorithm. But there is efficiency concerns since the transform function can only take Python data types and we have to convert between arrow, python and back to arrow. Also, the types in arrow and iceberg are quite different and sometimes we need to call some utility functions. For example, timestamp is converted to datetime in Python, and we have to call an existing utility function to convert it to micros(int) before feeding it into transform functions. Another option is to create an Arrow UDF for the partition transforms which might parallelize better.

jqin61 avatar Apr 30 '24 00:04 jqin61

Idea from @Fokko - support day/month/year transforms first

sungwy avatar Apr 30 '24 16:04 sungwy

Idea from @Fokko - support day/month/year transforms first

You can also try using the transforms that Daft has already implemented. Full list of transforms:

There were a lot of intricacies in these transforms that we had to make sure to get exactly right, so as to be compatible with the existing Java implementations. Especially wrt hashing.

Should be zero-copy conversions between arrow and Daft as well (cheap!):

import pyarrow as pa
from daft import Series

pyarrow_array = pa.array(list(range(10000)))

# Should be very cheap! Under the hood just uses the same arrow buffers
daft_series = Series.from_arrow(pyarrow_array)

print(daft_series)
โ•ญโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฎ
โ”‚ arrow_series โ”‚
โ”‚ ---          โ”‚
โ”‚ Int64        โ”‚
โ•žโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•ก
โ”‚ 0            โ”‚
โ”œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ”ค
โ”‚ 1            โ”‚
โ”œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ”ค
โ”‚ 2            โ”‚
โ”œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ”ค
โ”‚ 3            โ”‚
โ”œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ”ค
โ”‚ 4            โ”‚
โ”œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ”ค
โ”‚ โ€ฆ            โ”‚
โ”œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ”ค
โ”‚ 9995         โ”‚
โ”œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ”ค
โ”‚ 9996         โ”‚
โ”œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ”ค
โ”‚ 9997         โ”‚
โ”œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ”ค
โ”‚ 9998         โ”‚
โ”œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ”ค
โ”‚ 9999         โ”‚
โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ

partitioned = daft_series.partitioning.iceberg_bucket(32)
print(partitioned)

โ•ญโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฎ
โ”‚ arrow_series_bucket โ”‚
โ”‚ ---                 โ”‚
โ”‚ Int32               โ”‚
โ•žโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•ก
โ”‚ 28                  โ”‚
โ”œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ”ค
โ”‚ 4                   โ”‚
โ”œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ”ค
โ”‚ 20                  โ”‚
โ”œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ”ค
โ”‚ 19                  โ”‚
โ”œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ”ค
โ”‚ 6                   โ”‚
โ”œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ”ค
โ”‚ โ€ฆ                   โ”‚
โ”œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ”ค
โ”‚ 10                  โ”‚
โ”œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ”ค
โ”‚ 28                  โ”‚
โ”œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ”ค
โ”‚ 7                   โ”‚
โ”œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ”ค
โ”‚ 13                  โ”‚
โ”œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ•Œโ”ค
โ”‚ 28                  โ”‚
โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ

# Convert back to arrow
partitioned_arrow_arr = partitioned.to_arrow()

jaychia avatar Apr 30 '24 16:04 jaychia

hi, do we have any way to write to partitioned table so far?

deepika094 avatar Jun 06 '24 09:06 deepika094

Curious as well as where you guys are standing on partitioned write.

ppasquet avatar Jun 09 '24 04:06 ppasquet

Hey everyone, the support for partioned writes are coming along pretty nicely. We miss some of the transforms, such as the bucket transform. Most of the stuff is on the main branch waiting for a release.

If you want to check it out, you can also install it from source: https://py.iceberg.apache.org/contributing/

Fokko avatar Jun 09 '24 10:06 Fokko

@Fokko I installed from source, but I'm hitting this error.

Does anyone have a minimal example of writing to a partitioned table?

mike-luabase avatar Jun 09 '24 13:06 mike-luabase