delta-rs
delta-rs copied to clipboard
Schema evolution `mergeSchema` support
Description
Originally asked at Stack Overflow for how to add a new column when writing to a Delta table.
Use Case
One of my Delta table has 4 columns. Now the new data coming has one more new column, so total 5 columns.
I am hoping to use write_deltalake
to write the new data in mode="append"
to same Delta table.
In Spark, it is possible to write new data with more columns by using mergeSchema
option based on this doc.
It would be great to support schema evolution mergeSchema
option for write_deltalake
. Thanks! 😃
I am looking for exact same functionality as well in delta-rs! From current doc, it seems impossible to add/drop columns on a delta table without impacting existing data. Hope it has good suggestion or solution in delta-rs. Thanks!
I'm currently investigating switching from my Databricks job to something more lightweight - this seems like a rather big roadblock when one of the core functionality of delta tables is schema evolution. Although we are doing it with
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")
Which of course is not applicable here :)
+1 on this.
I found a way to hack around this limitation by running some code like this that appends a new MetaData
to the delta log:
let delta_fields = vec![...my expected fields...]
let mut delta_schema = table.get_schema().expect("No schema").clone();
// HACK: Enable schema evolution by adding a MetaData to the delta log
if delta_schema.get_fields().len() < delta_fields.len() || delta_schema.get_fields().iter().enumerate().any(|(i, f)| *f != delta_fields[i]) {
println!("New columns added");
let metadata = DeltaTableMetaData::new(
None,
None,
None,
SchemaTypeStruct::new(delta_fields),
vec!["date".to_string()],
HashMap::new(),
);
let meta = action::MetaData::try_from(metadata)?;
let actions = vec![Action::metaData(meta)];
let storage = table.object_store();
commit(storage.as_ref(), &actions, &table.state).await?;
table.load().await?;
delta_schema = table.get_schema().expect("No schema").clone();
}
/** Crate private methods copied from delta rust/src/operations/transaction/mod.rs */
fn get_commit_bytes(
actions: &Vec<Action>,
) -> Result<bytes::Bytes, TransactionError> {
let mut extra_info = serde_json::Map::<String, Value>::new();
let mut commit_info: CommitInfo = Default::default();
commit_info.timestamp = Some(Utc::now().timestamp_millis());
extra_info.insert(
"clientVersion".to_string(),
Value::String(format!("delta-rs.{}", crate_version())),
);
commit_info.info = extra_info;
Ok(bytes::Bytes::from(log_entry_from_actions(
actions
.iter()
.chain(std::iter::once(&Action::commitInfo(commit_info))),
)?))
}
fn log_entry_from_actions<'a>(
actions: impl IntoIterator<Item = &'a Action>,
) -> Result<String, TransactionError> {
let mut jsons = Vec::<String>::new();
for action in actions {
let json = serde_json::to_string(action)
.map_err(|e| TransactionError::SerializeLogJson { json_err: e })?;
jsons.push(json);
}
Ok(jsons.join("\n"))
}
const DELTA_LOG_FOLDER: &str = "_delta_log";
/// Low-level transaction API. Creates a temporary commit file. Once created,
/// the transaction object could be dropped and the actual commit could be executed
/// with `DeltaTable.try_commit_transaction`.
pub(crate) async fn prepare_commit<'a>(
storage: &dyn ObjectStore,
actions: &Vec<Action>,
) -> Result<Path, TransactionError> {
// Serialize all actions that are part of this log entry.
let log_entry = get_commit_bytes(actions)?;
// Write delta log entry as temporary file to storage. For the actual commit,
// the temporary file is moved (atomic rename) to the delta log folder within `commit` function.
let token = uuid::Uuid::new_v4().to_string();
let file_name = format!("_commit_{token}.json.tmp");
let path = Path::from_iter([DELTA_LOG_FOLDER, &file_name]);
storage.put(&path, log_entry).await?;
Ok(path)
}
/// Commit a transaction, with up to 5 retries. This is low-level transaction API.
///
/// Will error early if the a concurrent transaction has already been committed
/// and conflicts with this transaction.
pub async fn commit(
storage: &dyn ObjectStore,
actions: &Vec<Action>,
read_snapshot: &DeltaTableState,
) -> Result<i64> {
let tmp_commit = prepare_commit(storage, actions).await?;
let version = read_snapshot.version() + 1;
try_commit_transaction(storage, &tmp_commit, version).await.map_err(|e| e.into())
}
fn commit_uri_from_version(version: i64) -> Path {
let version = format!("{version:020}.json");
Path::from(DELTA_LOG_FOLDER).child(version.as_str())
}
/// Tries to commit a prepared commit file. Returns [DeltaTableError::VersionAlreadyExists]
/// if the given `version` already exists. The caller should handle the retry logic itself.
/// This is low-level transaction API. If user does not want to maintain the commit loop then
/// the `DeltaTransaction.commit` is desired to be used as it handles `try_commit_transaction`
/// with retry logic.
async fn try_commit_transaction(
storage: &dyn ObjectStore,
tmp_commit: &Path,
version: i64,
) -> Result<i64> {
// move temporary commit file to delta log directory
// rely on storage to fail if the file already exists -
storage
.rename_if_not_exists(tmp_commit, &commit_uri_from_version(version))
.await
.map_err(|err| anyhow!("Failed to commit {:?}", err))?;
Ok(version)
}
Is there any news on this topic or on the fork of this improvement?
Hi, this will be great to have,
We have been discussing in Polars ##11983 that having merge out of the box in write_delta() will be awesome now that the release v.0.12 supports MERGE. If the schema evolution is going to be added in TableMerger() this integration would be bit more straightforward.
BR E
Hi,
We desire this as well. Would be great to have this.
Hello,
I would be grateful if option to change schema when merging was implemented.
Are there any updates on this ? Is this being worked on by anyone ?
@BarakStout no one is working on this afaik
Hi, due to this missing feature we are not able to leverage delta-rs. it would be great if we can have schema evolution on existing tables without overwriting data.
@srsharathreddy-92 this is currently being worked on by @rtyler :)
I am keeping this open, until it's also part of the operations API writer because that's the one that's also used by the Python bindings
I am keeping this open, until it's also part of the operations API writer because that's the one that's also used by the Python bindings
Hi, is someone working on adding to python Bindings?
I am excited to start utilizing this feature :)
I am keeping this open, until it's also part of the operations API writer because that's the one that's also used by the Python bindings
I'm not too deep in the code, but I guess thats not too hard to do. If you give me some hints I can give it a try. This unblocks very interesting use cases for us
@aersam the schema evolution logic got added into the RecordBatchWriter, which sits here: crates/core/src/writer/record_batch.rs
However we want this logic to be added as well here: crates/core/src/operations/write.rs
So you need to check how much of the schema evolution can be reused and copied/modified into the write operation. You should likely follow a similar pattern of allowing the mergeSchema paramater to be set, merge the table schema and arrow recordbatch schema, and then create columns with null values for the ones that in the SET difference of the mergedschema.
Hope that helps for you to get started :)
I am keeping this open, until it's also part of the operations API writer because that's the one that's also used by the Python bindings
Hi, is someone working on adding to python Bindings?
I am excited to start utilizing this feature :)
It can only be exposed to python when it's added into the write operation, which currently is not the case yet.
@aersam the schema evolution logic got added into the RecordBatchWriter, which sits here: crates/core/src/writer/record_batch.rs
However we want this logic to be added as well here: crates/core/src/operations/write.rs
So you need to check how much of the schema evolution can be reused and copied/modified into the write operation. You should likely follow a similar pattern of allowing the mergeSchema paramater to be set, merge the table schema and arrow recordbatch schema, and then create columns with null values for the ones that in the SET difference of the mergedschema.
Hope that helps for you to get started :)
I'm trying to do that, but a bit simplier. We know the schema of the given data anyway, so I can do it upfront, I guess? Also I try to preserve partitioning columns, I'd expect the new schema to contain the same partition columns. I should probably test that assumption or even just take the partitioning columns given. About the null columns: I don't even think we need them, according to Spec: Any missing column should be treated as null.
This also aligns with my expectations how append is supposed to work with new columns, old files are untouched and just don't contain all columns. So new files could do the same, though it's maybe not very nice.
My PR is in a very early stage, no testing done yet, not even manually. However if you want to have a look, preventing me to do something completely wrong, feel free to review
Hi, wanted to follow up on this thread - given that there is support to merge schema now via write_deltalake
with the rust engine, is there any way to just update the schema without needing to call write_deltalake
? The use case being wanting to add a column to a delta table without necessarily writing new data at the same time would be really useful.
I tried doing:
write_deltalake(
table,
new_data,
mode="append",
schema_mode="merge",
engine="rust",
)
where new_data
is an empty pyarrow table of the new column I want to add, but then I get Generic error: No data source supplied to write command.
, presumably because the data is empty.
Are there any workarounds to achieve this desired behavior?
cc @rtyler @aersam
PR #2289 would fix the error, I hope I can finish it soon
@aersam maybe we should hold off with that one before all timeout issues are addressed
Oh, there are timeout issues? 🙂 which ones are you talking about?
@aersam yeah unfortunately, here are the related issues:
https://github.com/delta-io/delta-rs/issues/2403
https://github.com/delta-io/delta-rs/issues/2301
Ok, would be interesting if those are even resolved by the PR as writing starts much earlier and way less RAM is required
@aersam maybe yeah