delta-rs icon indicating copy to clipboard operation
delta-rs copied to clipboard

Schema evolution `mergeSchema` support

Open hongbo-miao opened this issue 1 year ago • 9 comments

Description

Originally asked at Stack Overflow for how to add a new column when writing to a Delta table.

Use Case

One of my Delta table has 4 columns. Now the new data coming has one more new column, so total 5 columns.

I am hoping to use write_deltalake to write the new data in mode="append" to same Delta table.

In Spark, it is possible to write new data with more columns by using mergeSchema option based on this doc.

It would be great to support schema evolution mergeSchema option for write_deltalake. Thanks! 😃

hongbo-miao avatar May 22 '23 17:05 hongbo-miao

I am looking for exact same functionality as well in delta-rs! From current doc, it seems impossible to add/drop columns on a delta table without impacting existing data. Hope it has good suggestion or solution in delta-rs. Thanks!

itsluqiang avatar May 23 '23 14:05 itsluqiang

I'm currently investigating switching from my Databricks job to something more lightweight - this seems like a rather big roadblock when one of the core functionality of delta tables is schema evolution. Although we are doing it with

spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

Which of course is not applicable here :)

Dammi87 avatar Jun 05 '23 10:06 Dammi87

+1 on this.

I found a way to hack around this limitation by running some code like this that appends a new MetaData to the delta log:


let delta_fields = vec![...my expected fields...]
let mut delta_schema = table.get_schema().expect("No schema").clone();
// HACK: Enable schema evolution by adding a MetaData to the delta log
if delta_schema.get_fields().len() < delta_fields.len() || delta_schema.get_fields().iter().enumerate().any(|(i, f)| *f != delta_fields[i]) {
  println!("New columns added");
  let metadata = DeltaTableMetaData::new(
        None,
        None,
        None,
        SchemaTypeStruct::new(delta_fields),
        vec!["date".to_string()],
        HashMap::new(),
    );
  let meta = action::MetaData::try_from(metadata)?;
  let actions = vec![Action::metaData(meta)];
  let storage = table.object_store();
  commit(storage.as_ref(), &actions, &table.state).await?;
  table.load().await?;
  delta_schema = table.get_schema().expect("No schema").clone();
}

/** Crate private methods copied from delta rust/src/operations/transaction/mod.rs */

fn get_commit_bytes(
    actions: &Vec<Action>,
) -> Result<bytes::Bytes, TransactionError> {
    let mut extra_info = serde_json::Map::<String, Value>::new();
    let mut commit_info: CommitInfo = Default::default();
    commit_info.timestamp = Some(Utc::now().timestamp_millis());
    extra_info.insert(
        "clientVersion".to_string(),
        Value::String(format!("delta-rs.{}", crate_version())),
    );
    commit_info.info = extra_info;
    Ok(bytes::Bytes::from(log_entry_from_actions(
        actions
            .iter()
            .chain(std::iter::once(&Action::commitInfo(commit_info))),
    )?))
}

fn log_entry_from_actions<'a>(
    actions: impl IntoIterator<Item = &'a Action>,
) -> Result<String, TransactionError> {
    let mut jsons = Vec::<String>::new();
    for action in actions {
        let json = serde_json::to_string(action)
            .map_err(|e| TransactionError::SerializeLogJson { json_err: e })?;
        jsons.push(json);
    }
    Ok(jsons.join("\n"))
}

const DELTA_LOG_FOLDER: &str = "_delta_log";

/// Low-level transaction API. Creates a temporary commit file. Once created,
/// the transaction object could be dropped and the actual commit could be executed
/// with `DeltaTable.try_commit_transaction`.
pub(crate) async fn prepare_commit<'a>(
    storage: &dyn ObjectStore,
    actions: &Vec<Action>,
) -> Result<Path, TransactionError> {
    // Serialize all actions that are part of this log entry.
    let log_entry = get_commit_bytes(actions)?;

    // Write delta log entry as temporary file to storage. For the actual commit,
    // the temporary file is moved (atomic rename) to the delta log folder within `commit` function.
    let token = uuid::Uuid::new_v4().to_string();
    let file_name = format!("_commit_{token}.json.tmp");
    let path = Path::from_iter([DELTA_LOG_FOLDER, &file_name]);
    storage.put(&path, log_entry).await?;

    Ok(path)
}

/// Commit a transaction, with up to 5 retries. This is low-level transaction API.
///
/// Will error early if the a concurrent transaction has already been committed
/// and conflicts with this transaction.
pub async fn commit(
    storage: &dyn ObjectStore,
    actions: &Vec<Action>,
    read_snapshot: &DeltaTableState,
) -> Result<i64> {
    let tmp_commit = prepare_commit(storage, actions).await?;
    let version = read_snapshot.version() + 1;

    try_commit_transaction(storage, &tmp_commit, version).await.map_err(|e| e.into())
}

fn commit_uri_from_version(version: i64) -> Path {
    let version = format!("{version:020}.json");
    Path::from(DELTA_LOG_FOLDER).child(version.as_str())
}

/// Tries to commit a prepared commit file. Returns [DeltaTableError::VersionAlreadyExists]
/// if the given `version` already exists. The caller should handle the retry logic itself.
/// This is low-level transaction API. If user does not want to maintain the commit loop then
/// the `DeltaTransaction.commit` is desired to be used as it handles `try_commit_transaction`
/// with retry logic.
async fn try_commit_transaction(
    storage: &dyn ObjectStore,
    tmp_commit: &Path,
    version: i64,
) -> Result<i64> {
    // move temporary commit file to delta log directory
    // rely on storage to fail if the file already exists -
    storage
        .rename_if_not_exists(tmp_commit, &commit_uri_from_version(version))
        .await
        .map_err(|err| anyhow!("Failed to commit {:?}", err))?;
    Ok(version)
}

ChewingGlass avatar Jun 13 '23 16:06 ChewingGlass

Is there any news on this topic or on the fork of this improvement?

JhonathanBld avatar Jul 17 '23 17:07 JhonathanBld

Hi, this will be great to have,

We have been discussing in Polars ##11983 that having merge out of the box in write_delta() will be awesome now that the release v.0.12 supports MERGE. If the schema evolution is going to be added in TableMerger() this integration would be bit more straightforward.

BR E

edgBR avatar Oct 26 '23 08:10 edgBR

Hi,

We desire this as well. Would be great to have this.

obiii avatar Oct 26 '23 09:10 obiii

Hello,

I would be grateful if option to change schema when merging was implemented.

justkr avatar Oct 26 '23 09:10 justkr

Are there any updates on this ? Is this being worked on by anyone ?

BarakStout avatar Nov 28 '23 18:11 BarakStout

@BarakStout no one is working on this afaik

ion-elgreco avatar Nov 29 '23 06:11 ion-elgreco

Hi, due to this missing feature we are not able to leverage delta-rs. it would be great if we can have schema evolution on existing tables without overwriting data.

srsharathreddy-92 avatar Jan 10 '24 21:01 srsharathreddy-92

@srsharathreddy-92 this is currently being worked on by @rtyler :)

ion-elgreco avatar Jan 10 '24 21:01 ion-elgreco

I am keeping this open, until it's also part of the operations API writer because that's the one that's also used by the Python bindings

ion-elgreco avatar Feb 05 '24 12:02 ion-elgreco

I am keeping this open, until it's also part of the operations API writer because that's the one that's also used by the Python bindings

Hi, is someone working on adding to python Bindings?

I am excited to start utilizing this feature :)

srsharathreddy-92 avatar Feb 06 '24 17:02 srsharathreddy-92

I am keeping this open, until it's also part of the operations API writer because that's the one that's also used by the Python bindings

I'm not too deep in the code, but I guess thats not too hard to do. If you give me some hints I can give it a try. This unblocks very interesting use cases for us

aersam avatar Feb 21 '24 17:02 aersam

@aersam the schema evolution logic got added into the RecordBatchWriter, which sits here: crates/core/src/writer/record_batch.rs

However we want this logic to be added as well here: crates/core/src/operations/write.rs

So you need to check how much of the schema evolution can be reused and copied/modified into the write operation. You should likely follow a similar pattern of allowing the mergeSchema paramater to be set, merge the table schema and arrow recordbatch schema, and then create columns with null values for the ones that in the SET difference of the mergedschema.

Hope that helps for you to get started :)

ion-elgreco avatar Feb 24 '24 09:02 ion-elgreco

I am keeping this open, until it's also part of the operations API writer because that's the one that's also used by the Python bindings

Hi, is someone working on adding to python Bindings?

I am excited to start utilizing this feature :)

It can only be exposed to python when it's added into the write operation, which currently is not the case yet.

ion-elgreco avatar Feb 24 '24 10:02 ion-elgreco

@aersam the schema evolution logic got added into the RecordBatchWriter, which sits here: crates/core/src/writer/record_batch.rs

However we want this logic to be added as well here: crates/core/src/operations/write.rs

So you need to check how much of the schema evolution can be reused and copied/modified into the write operation. You should likely follow a similar pattern of allowing the mergeSchema paramater to be set, merge the table schema and arrow recordbatch schema, and then create columns with null values for the ones that in the SET difference of the mergedschema.

Hope that helps for you to get started :)

I'm trying to do that, but a bit simplier. We know the schema of the given data anyway, so I can do it upfront, I guess? Also I try to preserve partitioning columns, I'd expect the new schema to contain the same partition columns. I should probably test that assumption or even just take the partitioning columns given. About the null columns: I don't even think we need them, according to Spec: Any missing column should be treated as null. This also aligns with my expectations how append is supposed to work with new columns, old files are untouched and just don't contain all columns. So new files could do the same, though it's maybe not very nice.

My PR is in a very early stage, no testing done yet, not even manually. However if you want to have a look, preventing me to do something completely wrong, feel free to review

aersam avatar Feb 28 '24 16:02 aersam

Hi, wanted to follow up on this thread - given that there is support to merge schema now via write_deltalake with the rust engine, is there any way to just update the schema without needing to call write_deltalake? The use case being wanting to add a column to a delta table without necessarily writing new data at the same time would be really useful.

I tried doing:

write_deltalake(
    table,
    new_data,
    mode="append",
    schema_mode="merge",
    engine="rust",

)

where new_data is an empty pyarrow table of the new column I want to add, but then I get Generic error: No data source supplied to write command., presumably because the data is empty.

Are there any workarounds to achieve this desired behavior?

cc @rtyler @aersam

echai58 avatar Apr 10 '24 15:04 echai58

PR #2289 would fix the error, I hope I can finish it soon

aersam avatar Apr 10 '24 17:04 aersam

@aersam maybe we should hold off with that one before all timeout issues are addressed

ion-elgreco avatar Apr 10 '24 17:04 ion-elgreco

Oh, there are timeout issues? 🙂 which ones are you talking about?

aersam avatar Apr 10 '24 17:04 aersam

@aersam yeah unfortunately, here are the related issues:

https://github.com/delta-io/delta-rs/issues/2403

https://github.com/delta-io/delta-rs/issues/2301

ion-elgreco avatar Apr 10 '24 18:04 ion-elgreco

Ok, would be interesting if those are even resolved by the PR as writing starts much earlier and way less RAM is required

aersam avatar Apr 10 '24 18:04 aersam

@aersam maybe yeah

ion-elgreco avatar Apr 10 '24 18:04 ion-elgreco