database-stream-processor icon indicating copy to clipboard operation
database-stream-processor copied to clipboard

Circuit without cycles causes a DBSP panic

Open mihaibudiu opened this issue 1 year ago • 3 comments

Here is the rust code that causes problems. This circuit is source->integral->differential->sink. Error is: panicked at 'called Result::unwrap() on an Err value: CyclicCircuit { node_id: GlobalNodeId([NodeId(4)]) }', src/lib.rs:2713:8

fn circuit32() -> impl FnMut(OrdZSet<Tuple6<i32, F64, bool, String, Option<i32>, Option<F64>>, Weight>) -> (OrdZSet<Tuple6<i32, F64, bool, String, Option<i32>, Option<F64>>, Weight>, ) {
    let T = Rc::new(RefCell::<OrdZSet<Tuple6<i32, F64, bool, String, Option<i32>, Option<F64>>, Weight>>::new(Default::default()));
    let T_external = T.clone();
    let T = Generator::new(move || T.borrow().clone());
    let V = Rc::new(RefCell::<OrdZSet<Tuple6<i32, F64, bool, String, Option<i32>, Option<F64>>, Weight>>::new(Default::default()));
    let V_external = V.clone();
    let root = dbsp::RootCircuit::build(|circuit| {
        // CreateRelationStatement{tableName='T', columns=[#0: COL1 INTEGER, #1: COL2 DOUBLE, #2: COL3 BOOLEAN, #3: COL4 VARCHAR, #4: COL5 INTEGER, #5: COL6 DOUBLE]}
        // DBSPSourceOperator 5013
        // CREATE TABLE T (
        // COL1 INT NOT NULL, COL2 DOUBLE NOT NULL, COL3 BOOLEAN NOT NULL, COL4 VARCHAR NOT NULL, COL5 INT, COL6 DOUBLE)
        let T = circuit.add_source(T);
        // DBSPIntegralOperator 5022
        let stream389: Stream<_, OrdZSet<Tuple6<i32, F64, bool, String, Option<i32>, Option<F64>>, Weight>> = T.integrate();
        // DBSPDifferentialOperator 5023
        let stream390: Stream<_, OrdZSet<Tuple6<i32, F64, bool, String, Option<i32>, Option<F64>>, Weight>> = stream389.differentiate();
        // CreateRelationStatement{tableName='V', columns=[#0: COL1 INTEGER, #1: COL2 DOUBLE, #2: COL3 BOOLEAN, #3: COL4 VARCHAR, #4: COL5 INTEGER, #5: COL6 DOUBLE]}
        // CREATE VIEW V AS SELECT * FROM T
        // DBSPSinkOperator 5024
        stream390.inspect(move |m| { *V.borrow_mut() = m.clone() });
    }).unwrap();
    return move |T| {
        *T_external.borrow_mut() = T;
        root.0.step().unwrap();
        return (V_external.borrow().clone(), );
    };
}

mihaibudiu avatar May 02 '23 07:05 mihaibudiu