moor icon indicating copy to clipboard operation
moor copied to clipboard

tuple uniqueness constraint check?

Open github-actions[bot] opened this issue 4 months ago • 0 comments

If this is a tuple with a unique constraint, we need to check if there's a

newer tuple with the same domain value.

/ commit set in the process.

before releasing the lock.

https://github.com/rdaum/moor/blob/12fc84941773505ff718eea8a0544a6ef3072a92/crates/db/src/rdb/tx/transaction.rs#L321



/// A set of tuples to be committed to the canonical base relations, based on a transaction's
/// working set.
pub struct CommitSet<'a> {
    ts: u64,
    relations: Box<BitArray<BaseRelation, 64, Bitset64<1>>>,

    // Holds a lock on the base relations, which we'll swap out with the new relations at successful commit
    write_guard: RwLockWriteGuard<'a, Vec<BaseRelation>>,

    // I can't/shouldn't be moved around between threads...
    unsend: PhantomUnsend,
    unsync: PhantomUnsync,
}

impl<'a> CommitSet<'a> {
    pub(crate) fn new(ts: u64, write_guard: RwLockWriteGuard<'a, Vec<BaseRelation>>) -> Self {
        Self {
            ts,
            relations: Box::new(BitArray::new()),
            write_guard,
            unsend: Default::default(),
            unsync: Default::default(),
        }
    }

    pub(crate) fn prepare(&mut self, tx_working_set: &mut WorkingSet) -> Result<(), CommitError> {
        for (_, local_relation) in tx_working_set.relations.iter_mut() {
            let relation_id = local_relation.id;
            // scan through the local working set, and for each tuple, check to see if it's safe to
            // commit. If it is, then we'll add it to the commit set.
            // note we're not actually committing yet, just producing a candidate commit set
            for tuple in local_relation.tuples_iter_mut() {
                match &mut tuple.op {
                    TxTupleOp::Insert(tuple) => {
                        // Generally inserts should present no conflicts except for constraint violations.

                        // If this is an insert, we want to verify that unique constraints are not violated, so we
                        // need to check the canonical relation for the domain value.
                        // Apply timestamp logic, tho.
                        let canonical = &self.write_guard[relation_id.0];
                        let results_canonical = canonical.seek_by_domain(tuple.domain());
                        let mut replacements = im::HashSet::new();
                        for t in results_canonical {
                            if canonical.unique_domain && t.ts() > tuple.ts() {
                                return Err(CommitError::UniqueConstraintViolation);
                            }
                            // Check the timestamp on the upstream value, if it's newer than the read-timestamp,
                            // we have for this tuple then that's a conflict, because it means someone else has
                            // already committed a change to this tuple.
                            // Otherwise, we clobber their value.
                            if t.ts() > tuple.ts() {
                                return Err(CommitError::TupleVersionConflict);
                            }
                            replacements.insert(t);
                        }

                        // Otherwise we can straight-away insert into the our fork of the relation.
                        tuple.update_timestamp(self.ts);
                        let forked_relation = self.fork(relation_id);
                        for t in replacements {
                            forked_relation.remove_tuple(&t.id()).unwrap();
                        }
                        forked_relation.insert_tuple(tuple.clone()).unwrap();
                    }
                    TxTupleOp::Update {
                        from_tuple: old_tuple,
                        to_tuple: new_tuple,
                    } => {
                        let canonical = &self.write_guard[relation_id.0];

                        // If this is an update, we want to verify that the tuple we're updating is still there
                        // If it's not, that's a conflict.
                        if !canonical.has_tuple(&old_tuple.id()) {
                            // Someone got here first and deleted the tuple we're trying to update.
                            // By definition, this is a conflict.
                            return Err(CommitError::TupleVersionConflict);
                        };

                        // TODO tuple uniqueness constraint check?

                        // Otherwise apply the change into the forked relation
                        new_tuple.update_timestamp(self.ts);
                        let forked_relation = self.fork(relation_id);
                        forked_relation
                            .update_tuple(&old_tuple.id(), new_tuple.clone())
                            .unwrap();
                    }
                    TxTupleOp::Tombstone(tuple, _) => {
                        let canonical = &self.write_guard[relation_id.0];

                        // Check that the tuple we're trying to delete is still there.
                        if !canonical.has_tuple(&tuple.id()) {
                            // Someone got here first and deleted the tuple we're trying to delete.
                            // If this is a tuple with a unique constraint, we need to check if there's a
                            // newer tuple with the same domain value.
                            if canonical.unique_domain {
                                let results_canonical = canonical.seek_by_domain(tuple.domain());
                                for t in results_canonical {
                                    if t.ts() > tuple.ts() {
                                        return Err(CommitError::UniqueConstraintViolation);
                                    }
                                }
                            }
                            // No confict, and was already deleted. So we don't have to do anything.
                        } else {
                            // If so, do the del0rt in our fork.
                            let forked_relation = self.fork(relation_id);
                            forked_relation.remove_tuple(&tuple.id()).unwrap();
                        }
                    }
                    TxTupleOp::Value(_) => {
                        continue;
                    }
                }
            }
        }
        Ok(())
    }

    pub(crate) fn try_commit(mut self) -> Result<(), CommitError> {
        // Everything passed, so we can commit the changes by swapping in the new canonical
        // before releasing the lock.
        let commit_ts = self.ts;
        for (_, mut relation) in self.relations.take_all() {
            let idx = relation.id.0;

            relation.ts = commit_ts;
            self.write_guard[idx] = relation;
        }

        Ok(())
    }

    /// Fork the given base relation into the commit set, if it's not already there.
    fn fork(&mut self, relation_id: RelationId) -> &mut BaseRelation {
        if self.relations.get(relation_id.0).is_none() {
            let r = self.write_guard[relation_id.0].clone();
            self.relations.set(relation_id.0, r);
        }
        self.relations.get_mut(relation_id.0).unwrap()

github-actions[bot] avatar Feb 17 '24 00:02 github-actions[bot]