gleam icon indicating copy to clipboard operation
gleam copied to clipboard

Some ideas in windows functions

Open qazqwe1596357 opened this issue 7 years ago • 8 comments

Some ideas in windows functions

  • Base idea is ADD Router in Task. And split Task into virtual WinTasks. After Task decode the Row, Router send Row to right WinTask
  • Network same in Batch / Windows computing
  • TransFunc(like Map/Join/Partition...) Most define the same as current define, but Step / Row may add some fields used in Windows
  • ADD SelectWinFields TransFunc to select window fields
    • Before SelectWinFields only allow Map Like TransFunc, if not panic at plan time for error
    • After SelectWinFields all Windows TransFunc
    • Step whether in Batch or Window Mode is determained at plan time
  • ADD AppendWinFields TransFunc to copy window fields to values
    • default TransFunc no need to see it in values
    • After SelectWinFields, TransFunc write like proc the rows without AppendWinFields.
  • Instruction:Function:
    • Change The Instruction:Function interface
      • FROM: func(readers []io.Reader, writers []io.Writer, stats *pb.InstructionStat) error
      • TO: func(readers []RowReader, writers []RowWriter, stats *pb.InstructionStat) error
      • I saw ALL Instruction:Function decode Rows from stream by themself, so this will not cost more
    • ADD Some Concepts
      • RowReader: interface{ ReadeRow() (*Row, error) }
      • RowWriter: interface{ WriteRow(row *Row) error }
      • RowReader / RowWriter are some warpper for raw Row. May hid the WinField, use one channel when Window Mode processing
      • WinTask: the virtual Task goroutines process the Window Rows
      • Router: who can route Row by "WinField" to right WinTask, And Manage WinTasks
  • Batch Mode Data Stream
    • data trans to Task chan, then RowReader read the chan and decode to Rows by the only Instruction:Function instance
    • RowReader / RowWriter are some direct process as current gleam, but decode to Rows become share operation in RowReader
  • Window Mode Data Stream
    • data trans to Task chan, and decode to Rows (this will be a share operation in Instructions chans read In Task AND Before RowReader)
    • Then Router see the WinField, then eat the WinField(Or Use WinValue in Row Struct Field instead), send Row only with valid values to the WinTask instance chanel, WinTask can read it by it's RowReader
      • WinTask No need to see the WinField in Row Values
      • if some step need to see, USE some specify step to process the raw Rows, or add it to Row values
    • Use RowWriter.WriteRow(row) to encode and write rows. RowWriter append WinField binded to WinTask, and write to Task output stream
    • when Instruction:Function RowReader read EOF error, or WinTask timeout, then WinTask exit. This is same as Batch Mode processing
    • but logic between Batch/Window Mode in Instruction:Function is exactly the same.

I aim to do some developing on "windows functions" as abolve. Is that met the idea of yours? Or is there any conflict with your plan? Or some matters needing attention?

qazqwe1596357 avatar Jan 20 '18 14:01 qazqwe1596357

can I join the gleam-dev on Slack ?

qazqwe1596357 avatar Jan 20 '18 14:01 qazqwe1596357

If you plan to do the window functions, what are you thinkings for the underlying data structures?

need your email to add you to slack.

chrislusf avatar Jan 20 '18 18:01 chrislusf

I will append some key data structure later.

qazqwe1596357 avatar Jan 20 '18 18:01 qazqwe1596357

Key idea for windowing functions

1-Select Window field realization And Use in user application

// Define for selectWindowField like mapper
func selectWindowField(row []interface{}) int64 {
    timestampField := row[2].(int64)
    windowField := timestampField % 10  // we use timestampField in row generate the windowField
    return windowField
}

f := flow.New("kafkaSource").
          Read(kfkSource).
          Map("decoder", DecoderMapper_ID).
          // after SelectWindowField, the windowField will be append into Row
          SelectWindowField("Select-Window-Field", selectWindowField_ID).
          Top("top2", 5, flow.OrderBy(2, false)).
          // after AppendWindowFieldAt, windowField will be seen at Row.V[2]
          AppendWindowFieldAt(3).
          Printlnf("%s,%d,%d")

/*
When inputs like
    a,1,,1516513411
    b,2,,1516513412
    c,3,,1516513413
    d,4,,1516513514
    e,5,,1516513515
    f,6,,1516513515
Then output would like
    c,3,1516513410
    b,2,1516513410
    f,6,1516513510
    e,5,1516513510

In this application windowing field is "windowField := timestampField % 10", each 10 seconds is one window
- How to determin how windowing split is in SelectWindowField function
- If we care the windowing field, just call AppendWindowFieldAt, to insert/append it in Row
*/

2-Row realization for windowing

Row struct realization

Row current in gleam
type Row struct {
	K []interface{} `msg:"K"`
	V []interface{} `msg:"V"`
	T int64         `msg:"T"`
}
Row in windowing calculation
//
// Plan A
//

// one way is append window releated fields on Row
type Row struct {
	K []interface{} `msg:"K"`
	V []interface{} `msg:"V"`
	W int64         `msg:"W"`  // for window id, most can be timestamp
	S int64         `msg:"S"`  // for status Row
}

// in window Mode, not only data Row can be sent by the Row
// but also control command can be.
// "control command can be" like send EOF status, or Window Batch errors


// ================================================================

//
// Plan B
//

// the other way of thinking Row struct: no other fields append to struct
type Row struct {
	K []interface{} `msg:"K"`
	V []interface{} `msg:"V"`
}

// but we need to transmit command and window info, how we can?
// that can be something like
//     Row.V []interface{} = []interface{S, W, Row.V ... }
//     which we put (Status for Command use), (Window Field) into Row.V first two fields
//     the other true Values append at Row.V tail

Row Read / Write

Read/Write Row By RowReader/RowWriter

  • RowReader/RowWriter define
    • RowReader: interface{ ReadRow() (*Row, error) }
    • RowWriter: interface{ WriteRow(row *Row) error }
  • Change The Instruction:Function interface
    • FROM: func(readers []io.Reader, writers []io.Writer, stats *pb.InstructionStat) error
    • TO: func(readers []RowReader, writers []RowWriter, stats *pb.InstructionStat) error

RowReader realization

//
// RowReader in Batch Mode
//
type RawRowReader struct {
    reader io.Reader
}

func (this *RawRowReader) ReadRow() (*util.Row, error) {
    return util.ReadRow(this.reader)
}


//
// RowReader in Windowing Mode
//
type WinRowReader struct {
    rowChan chan *util.Row
}

func (this *WinRowReader) AddRow(row *util.Row) error {
    this.rowChan <-row
}

func (this *WinRowReader) ReadRow() (*util.Row, error) {
    select {
        case row := <- this.rowChan:
            select row.S {
            case: ErrorOK
                // if Use Plan B Row struct, before return, should eat the head (S, W) fields in row.V
                return row, nil
            case: ErrorEOF
                return nil, ErrorEOF
            ...
            }
    }
    return util.ReadRow(this.reader)
}

3-Router realization

Router description

  • Router work in Function Task direct run

Router realization

//
// Router in windowing Mode
//
func (r *localDriver) runTask(wg *sync.WaitGroup, task *Task) {
    step := task.Step
    if step.Function != nil {
        if step.winMode {
            // use RowRouter manage windowing Function call
            rowRouter := RowRouter{readers: task.InputChans, writers: task.OutputShards, Function: step.Function}
            rowRouter.Run()
            return
        } else {
            // call Function once
            rawRowReaders := []RawRowReader{}
            rawRowWriters := []RawRowWriters{}
            // init rawRowReaders / rawRowWriters
            ...
            step.Function(rawRowReaders, rawRowWriters, stat)
        }
    }
}


// WinTask
type WinTask struct {
    winRowReaders     []*WinRowReader
    winRowWriters     []*WinRowWriters
    Function          func([]RowReader, []RowWriter, *pb.InstructionStat) error
    timeBegin         time.Time
    timeTimeout       time.Time
    winFieldBind      int64
    instructionStat   *pb.InstructionStat
}

func (this *WinTask) RunFunc(row *Row) error {
    this.Function(this.winRowReaders, this.winRowWriters, this.instructionStat)
    // when WinTask.Function done, send EOF Row
    row := &Row{S: ErrorEOF}
    for _, w := range this.winRowWriters {
        w.WriteRow(row)
    }
}


// RowRouter: only base logic, should add timeout control and error procs and more
type RowRouter struct {
    readers     []io.Reader
    writers     []io.Writer
    Function    func([]RowReader, []RowWriter, *pb.InstructionStat) error
    winTaskReg  map[int64]*WinTask
}

func (this *RowRouter) Run() {
    for i, r := range this.readers {
        winRowReader := &WinRowReader{}
        go func() {
            rawRowReader := &RawRowReader{reader: r}
            for row, err := rawRowReader.ReadRow; err != nil {
                winTask := this.GetOrGenWinTask(row.W)
                winTask.winRowReaders[i].AddRow(row)
            }
        }
    }
}

func (this *RowRouter) GetOrGenWinTask(winField int64) (*WinTask, error) {
    return this.winTaskReg[winField], nil
}

qazqwe1596357 avatar Jan 21 '18 07:01 qazqwe1596357

The API changes have some good ideas, but need more thoughts on

  • how to handle late events
  • how to run blocking functions, such as sort, on the windowed data

maybe check Apache Beam/Flink/Google Data Flow.

chrislusf avatar Jan 21 '18 19:01 chrislusf

ideas for the problems mentioned above

  • how to run blocking functions, such as sort, on the windowed data
    • the data flow more like to "from WinTask to WinTask", realized by the Task + Router, like the batch mode "from task to task", but these WinTasks share the physical Task, and route Row by the Router to right virtual Task belonging to it's window(WinTask)
    • the blocking functions is running in WinTask context for each Window
    • Each WinTask has alone goroutines run it's blocking functions
    • if sort need blocking, just block the WinTask it is running
    • reading done is decided by the RowReader error, this may from upstream WinTask, Or generate by the Router, or some Transform Function like SelectWindowField want to exit by itself
    • Router route Rows to right WinTask's "blocking functions", through WinRowReaders
    • In fact, in the windowing mode, there are multiple Windows executes at the same time
    • the more cost by the Router is about 150ns / Row because the chanel, but there's a lot of space for optimization
  • how to handle late events
    • Router may Rename to "WinTaskControllor", Router not only responsible for routing, but also responsible for WinTask creation, destruction, timeout destruction control
    • Router will have some params like upperLimitToNow/lowerLimitToNow, and these will affect the WinTask life period of Router control
    • The first SelectWindowField transform function, may have one smaller upperLimitToNow/lowerLimitToNow, which just keep time window in realtime
    • The windowing transform functions behind, may have much bigger upperLimitToNow/lowerLimitToNow, which indicates the timeout time of calculation
  • same time I will learn the Apache Beam/Flink/Google Data Flow key ideas in recent days ~

qazqwe1596357 avatar Jan 22 '18 02:01 qazqwe1596357

666

binyoucai avatar Oct 25 '21 17:10 binyoucai

Is windows supported now?

feiyangbeyond avatar Dec 01 '23 03:12 feiyangbeyond