dlt icon indicating copy to clipboard operation
dlt copied to clipboard

HIVE Partitions in AWS Athena

Open sh-rp opened this issue 2 years ago • 8 comments

We want to enable the user to make use of AWS Athena partitions. For this we will most likely need to change the schema a bit and add the option for the user to define partitions on the resources.

Analyzing the discussion on this topic, we can implement this in 3 steps of increasing complexity (3 PRs expected)

support iceberg tables

here partitioning is available out of the box on the create table command. partitions are created based on the data in the tables

  • [x] * implement a table adapter for athena that allows to define columns that will participate in the partition
  • [x] * generate PARTITIONED BY clause as in https://github.com/dlt-hub/dlt/pull/1349 and https://github.com/dlt-hub/dlt/issues/555#issuecomment-2057000641
  • [x] * when generating table schemas, warn if partition is requested but the table format is not iceberg
  • [x] * standard tests and docs

support regular, hive tables

after loading a file ADD PARTITION must be executed to map file into a partition. paritions are created based on the metadata that must be available via file layout placeholders so if a given partition column is declared in the adapter - the same column name must be a placeholder in the file layout (or be present in placeholders list)

  • [ ] * we use ADD PARTITON as in Scenario 2 here https://docs.aws.amazon.com/athena/latest/ug/partitions.html - after adding any new file to the bucket
  • [ ] * we need render the partition column value using path_utils.py that are used to render file names. see how filesystem does that. we may need to give athena access to rendering file names via staging_config note: I'm not sure that the table must have a partition column physically or we just may declare it and fill the data from layout placeholder. that needs to be tested

hive partitioning based on table data

when #1382 is merged we can easily add more table formats (ie. hive) and use it for partitioning. form our discussion we know that MSCK REPAIR TABLE costs too much for incremental load so I do not yet see a value in it


this is old disussion ~Tasks:

  • [ ] Alter schema to have an order of partition keys (possibly replace the "bool" hint with an "int" hint which is null or 0 if this column should not be a partition, and a number if it should which then defines the order?)
  • [ ] Expose partition config on the resource as array of strings which are the column names?
  • [ ] When creating parquet files, respect the partitions and create a file for every partition key combination and upload that to the right path. Pyarrow can do this for us, so we need to change our load jobs to also take folders, not only file paths.
  • [ ] Make the partitions known to Athena, either we can run explicit ddl commands, but it should also be possible to infer the partitions with a repair command, the latter is more costly though, so doing this with our schema makes sense. This means we will have schema m migration operations that not only add columns but also partition definitions.

To think about:

  • [ ] How to guard against the user uploading to the same bucket with different partitionk key settings?
  • [ ] How to migrate existing schemas if we change the type of the partition settings?
  • [ ] How to behave if there are partitions defined, but the yielded resource item does not have a value for this column? Will there always be a "None" path for those cases? Will this be queryable by Athena?
  • [ ] If we use hive style partitions, will the resulting paths be more or less universal, or do we need to use different patterns for different data lake providers?~

sh-rp avatar Aug 14 '23 09:08 sh-rp

@sh-rp let's take a look at native partitioning that pyarrow has. OFC in that case we'll need to deal with several files in folders so our jobs mechanism are getting complicated (but why not to have jobs that are folders ;> OMG)

in Weaviate PR we are adding a concept of a destination adapter that injects additional information in schema to be interpreted by loader/normalizer (and a data writer - also has access to it).

so it all fits together but still looks like bigger project

rudolfix avatar Aug 14 '23 10:08 rudolfix

Documenting a potential use-case. I'm loading files from S3 + SFTP + API sources to S3 destination.

I'd like to use properties from the source (not data itself) to design the partition layout in the destination. The properties may be resource arguments, or indirectly source metadata like object path and/or file name.

Examples:

  • Single daily object from SFTP
    • sftp://example.com/foo/bar/report_20231114000000.csv --> s3://destination-bucket/baz/quz/yyyy=2023/mm=11/dd=14/report.csv
  • Multiple daily objects from S3
    • s3://example-bucket/foo/bar/report_20231114000000.csv --> s3://destination-bucket/baz/quz/yyyy=2023/mm=11/dd=14/report_20231114000000.csv
    • s3://example-bucket/foo/bar/report_20231114005000.csv --> s3://destination-bucket/baz/quz/yyyy=2023/mm=11/dd=14/report_20231114005000.csv
  • Single daily object from API call
    • https://example.com/api/report?date=2023-11-14 --> s3://destination-bucket/baz/quz/yyyy=2023/mm=11/dd=14/report.csv

How others have (not) done it:

Implications for this ticket:

  • Partition config should support resource parameters
    • For example, if date (or year, month, date) is a parameter to a resource, that should be available in the partition layout.
  • Partition config should support resource metadata, such as object path or creation date.
    • For example, in order to replicate the partitioning from the source to destination.
  • Consider: Is partitioning a generic issue, or something purely related to filesystem source/destination, see API case above?

TL;DR I'm looking to partition based on metadata, not the data itself.

gamgi avatar Nov 14 '23 13:11 gamgi

Just wanted to add a thing to think about.

Iceberg tables make use of hidden partitioning. And keep track of different partitioning over time. This might be much simpler to implement than the classical hive table.

https://trino.io/blog/2021/05/03/a-gentle-introduction-to-iceberg#hidden-partitions

Considering this issue is about AWS Athena partitions I do not believe this is out of scope and might be the "simpler" part of the issue.

Best

MRocholl avatar Feb 14 '24 14:02 MRocholl

(Partly) subsumed by https://github.com/dlt-hub/dlt/issues/930

Pipboyguy avatar Feb 26 '24 16:02 Pipboyguy

In case native iceberg tables are used in Athena, the partition implementation can be delegated to iceberg directly (hidden partitioning), and it's possible to use an implementation similar to what is done in BigQuery, where the PARTITION BY clause is added via SQL. Pretty much as @MRocholl mentioned.

For pure parquet writing of course what mention doesn't work, because partitions are part of the object path of the data written to S3.

@sh-rp @rudolfix @sultaniman as I'm interested in using Athena/Iceberg destination with partitions, do you see anything against what I proposed above? it's just about adding the right SQL for iceberg managed tables.

nicor88 avatar Apr 15 '24 14:04 nicor88

@nicor88 I've been working on extending layout placeholders for filesystem destination, atm we don't yet support syncing or taking date & time from parquet files for partitioning.

sultaniman avatar Apr 15 '24 14:04 sultaniman

@sultaniman Layout placehoders for file-system for what I can see won't work with Iceberg athena managed tables.

For example an iceberg table can be created like that:

CREATE TABLE events (id bigint, event_date date)
  PARTITIONED BY (event_date, bucket(16, id))
  LOCATION 's3://DOC-EXAMPLE-BUCKET/your-folder/'
  TBLPROPERTIES ( 'table_type' = 'ICEBERG' )

The writing to the file systems is handle under the hood by athena itself in case of Iceberg table, not by the writer (dlt), this because of Iceberg hidden partitioning.

Therefore the only thing to do with athena iceberg tables is to specify the partition definition on table creation, or as a post-hook (e.g.using pyiceberg) to modify the partition specs of a table.

Said so, I was planning to propose a PR for that, because it's the only limitation that is blocking us to use dlt in production.

All our tables are partitioned by ingestion date for example, to reduce data scans down-streams.

Happy to have a chat with more details if the above is not clear.

nicor88 avatar Apr 15 '24 14:04 nicor88

@nicor88 sure we can have a chat about it, I think once we merge the original PR I will be more that happy for allowing iceberg support 🙂

sultaniman avatar Apr 15 '24 14:04 sultaniman