IxJS icon indicating copy to clipboard operation
IxJS copied to clipboard

pipe() should work recursively, and the free standing pipe() should work inside the pipe chain method

Open buzzware opened this issue 4 years ago • 4 comments

pipe() can be used as a chain method with a series of operators from([1,2,3]).pipe(map(x=>x*3),filter(x=>x%2==0)); but I would like to dynamically build my pipeline before using it. To do that I would like to have operator() methods that return one or more operators. It would make sense then if returning a pipe of operators would qualify as returning an operator.

function operator1() { return pipe(map(x=>x*3),filter(x=>x%2==0)); // this doesn't work, because pipe() can't be used inside pipe below }

function operator2() { return map(x=>x+100); }

from([1,2,3]).pipe(operator1(),operator2());

If this can't be supported, then it should throw an error. Currently it behaves in unexpected ways.

buzzware avatar Jun 08 '21 14:06 buzzware

Does this not work?

function operator1(source) {
  return source.pipe(map(x => x * 3), filter(x => x % 2 == 0));
}

function operator2(source) {
  return map(x => x + 100)(source);
}

from([1,2,3]).pipe(operator1, operator2);

trxcllnt avatar Jun 08 '21 16:06 trxcllnt

@trxcllnt yes that does work. I suppose I could either

  1. just use your operator1 style, and still use pipe for a single operator
  2. have the abstract operator function return either op() or [opA(),opB()] and use lodash flatten(castArray(operator())) the array before calling pipe(...operators).

I don't find your operator2 style intuitive. But if I was to adopt that, I would expect pipe(map(x => x + 100))(source) to work, but it has to be pipe(source,map(x => x + 100))

If you're not going to have chaining operators on the source object, then being able to use standalone pipe() recursively within chaining pipe() would make a smooth experience.

For now, this achieves what I need :

const { from, pipe } = require('ix/asynciterable');
const { map,filter } = require('ix/asynciterable/operators');
const _ = require('lodash');

function joinOps(...ops) {
  return _.flatten(ops);
}

(async function() {
  try {

    function operator1() {
      return [
        map(x => x * 3),
        filter(x => x % 2 != 0)
      ];
    }

    function operator2() {
      return map(x => x + 100);
    }

    function logger(data) {
      console.log(data);
      return data;
    }

    try{
      let operators = [operator1,operator2];
      operators = _(operators).map(op => op()).flatten().value();
      let result = await from([1,2,3]).pipe(...operators).forEach(logger);
      console.log('done');
    }
    catch(err) {
      console.error('pipeline failed with error:', err);
    }

    console.log('finished');
  } catch(e) {
    console.log(e);
  } finally {
    process.exit();
  }
})();

buzzware avatar Jun 09 '21 04:06 buzzware

The style I described is how all the Ix operators are implemented; functions which return functions that take a source iterable and return a result iterable. pipe() is a specialization of left-fold over a list of functions with the signature (source: TSource) => TResult.

It sounds like you want a curried version of pipe, which is valid but straightforward to write yourself:

// could also be:
// const compose = (...ops) => (source) => source.pipe(...ops);
const compose = (...ops) => (source) => ops.reduce((source, op) => op(source), source);

function operator1() {
  return compose(map(x => x * 3), filter(x => x % 2 == 0));
}

function operator2() {
  return map(x => x + 100);
}

from([1,2,3]).pipe(operator1(), operator2());

trxcllnt avatar Jun 09 '21 15:06 trxcllnt

Thanks for that, I haven't had enough functional experience to write that quickly. Yes, your compose() is what I was expecting from the standalone pipe(). This can't be a special need - could you add something like compose to the library?

buzzware avatar Jun 10 '21 00:06 buzzware