gleam
gleam copied to clipboard
Some ideas in windows functions
Some ideas in windows functions
- Base idea is ADD Router in Task. And split Task into virtual WinTasks. After Task decode the Row, Router send Row to right WinTask
- Network same in Batch / Windows computing
- TransFunc(like Map/Join/Partition...) Most define the same as current define, but Step / Row may add some fields used in Windows
- ADD SelectWinFields TransFunc to select window fields
- Before SelectWinFields only allow Map Like TransFunc, if not panic at plan time for error
- After SelectWinFields all Windows TransFunc
- Step whether in Batch or Window Mode is determained at plan time
- ADD AppendWinFields TransFunc to copy window fields to values
- default TransFunc no need to see it in values
- After SelectWinFields, TransFunc write like proc the rows without AppendWinFields.
- Instruction:Function:
- Change The Instruction:Function interface
- FROM: func(readers []io.Reader, writers []io.Writer, stats *pb.InstructionStat) error
- TO: func(readers []RowReader, writers []RowWriter, stats *pb.InstructionStat) error
- I saw ALL Instruction:Function decode Rows from stream by themself, so this will not cost more
- ADD Some Concepts
- RowReader: interface{ ReadeRow() (*Row, error) }
- RowWriter: interface{ WriteRow(row *Row) error }
- RowReader / RowWriter are some warpper for raw Row. May hid the WinField, use one channel when Window Mode processing
- WinTask: the virtual Task goroutines process the Window Rows
- Router: who can route Row by "WinField" to right WinTask, And Manage WinTasks
- Change The Instruction:Function interface
- Batch Mode Data Stream
- data trans to Task chan, then RowReader read the chan and decode to Rows by the only Instruction:Function instance
- RowReader / RowWriter are some direct process as current gleam, but decode to Rows become share operation in RowReader
- Window Mode Data Stream
- data trans to Task chan, and decode to Rows (this will be a share operation in Instructions chans read In Task AND Before RowReader)
- Then Router see the WinField, then eat the WinField(Or Use WinValue in Row Struct Field instead), send Row only with valid values to the WinTask instance chanel, WinTask can read it by it's RowReader
- WinTask No need to see the WinField in Row Values
- if some step need to see, USE some specify step to process the raw Rows, or add it to Row values
- Use RowWriter.WriteRow(row) to encode and write rows. RowWriter append WinField binded to WinTask, and write to Task output stream
- when Instruction:Function RowReader read EOF error, or WinTask timeout, then WinTask exit. This is same as Batch Mode processing
- but logic between Batch/Window Mode in Instruction:Function is exactly the same.
I aim to do some developing on "windows functions" as abolve. Is that met the idea of yours? Or is there any conflict with your plan? Or some matters needing attention?
can I join the gleam-dev on Slack ?
If you plan to do the window functions, what are you thinkings for the underlying data structures?
need your email to add you to slack.
I will append some key data structure later.
Key idea for windowing functions
1-Select Window field realization And Use in user application
// Define for selectWindowField like mapper
func selectWindowField(row []interface{}) int64 {
timestampField := row[2].(int64)
windowField := timestampField % 10 // we use timestampField in row generate the windowField
return windowField
}
f := flow.New("kafkaSource").
Read(kfkSource).
Map("decoder", DecoderMapper_ID).
// after SelectWindowField, the windowField will be append into Row
SelectWindowField("Select-Window-Field", selectWindowField_ID).
Top("top2", 5, flow.OrderBy(2, false)).
// after AppendWindowFieldAt, windowField will be seen at Row.V[2]
AppendWindowFieldAt(3).
Printlnf("%s,%d,%d")
/*
When inputs like
a,1,,1516513411
b,2,,1516513412
c,3,,1516513413
d,4,,1516513514
e,5,,1516513515
f,6,,1516513515
Then output would like
c,3,1516513410
b,2,1516513410
f,6,1516513510
e,5,1516513510
In this application windowing field is "windowField := timestampField % 10", each 10 seconds is one window
- How to determin how windowing split is in SelectWindowField function
- If we care the windowing field, just call AppendWindowFieldAt, to insert/append it in Row
*/
2-Row realization for windowing
Row struct realization
Row current in gleam
type Row struct {
K []interface{} `msg:"K"`
V []interface{} `msg:"V"`
T int64 `msg:"T"`
}
Row in windowing calculation
//
// Plan A
//
// one way is append window releated fields on Row
type Row struct {
K []interface{} `msg:"K"`
V []interface{} `msg:"V"`
W int64 `msg:"W"` // for window id, most can be timestamp
S int64 `msg:"S"` // for status Row
}
// in window Mode, not only data Row can be sent by the Row
// but also control command can be.
// "control command can be" like send EOF status, or Window Batch errors
// ================================================================
//
// Plan B
//
// the other way of thinking Row struct: no other fields append to struct
type Row struct {
K []interface{} `msg:"K"`
V []interface{} `msg:"V"`
}
// but we need to transmit command and window info, how we can?
// that can be something like
// Row.V []interface{} = []interface{S, W, Row.V ... }
// which we put (Status for Command use), (Window Field) into Row.V first two fields
// the other true Values append at Row.V tail
Row Read / Write
Read/Write Row By RowReader/RowWriter
- RowReader/RowWriter define
- RowReader: interface{ ReadRow() (*Row, error) }
- RowWriter: interface{ WriteRow(row *Row) error }
- Change The Instruction:Function interface
- FROM: func(readers []io.Reader, writers []io.Writer, stats *pb.InstructionStat) error
- TO: func(readers []RowReader, writers []RowWriter, stats *pb.InstructionStat) error
RowReader realization
//
// RowReader in Batch Mode
//
type RawRowReader struct {
reader io.Reader
}
func (this *RawRowReader) ReadRow() (*util.Row, error) {
return util.ReadRow(this.reader)
}
//
// RowReader in Windowing Mode
//
type WinRowReader struct {
rowChan chan *util.Row
}
func (this *WinRowReader) AddRow(row *util.Row) error {
this.rowChan <-row
}
func (this *WinRowReader) ReadRow() (*util.Row, error) {
select {
case row := <- this.rowChan:
select row.S {
case: ErrorOK
// if Use Plan B Row struct, before return, should eat the head (S, W) fields in row.V
return row, nil
case: ErrorEOF
return nil, ErrorEOF
...
}
}
return util.ReadRow(this.reader)
}
3-Router realization
Router description
- Router work in Function Task direct run
Router realization
//
// Router in windowing Mode
//
func (r *localDriver) runTask(wg *sync.WaitGroup, task *Task) {
step := task.Step
if step.Function != nil {
if step.winMode {
// use RowRouter manage windowing Function call
rowRouter := RowRouter{readers: task.InputChans, writers: task.OutputShards, Function: step.Function}
rowRouter.Run()
return
} else {
// call Function once
rawRowReaders := []RawRowReader{}
rawRowWriters := []RawRowWriters{}
// init rawRowReaders / rawRowWriters
...
step.Function(rawRowReaders, rawRowWriters, stat)
}
}
}
// WinTask
type WinTask struct {
winRowReaders []*WinRowReader
winRowWriters []*WinRowWriters
Function func([]RowReader, []RowWriter, *pb.InstructionStat) error
timeBegin time.Time
timeTimeout time.Time
winFieldBind int64
instructionStat *pb.InstructionStat
}
func (this *WinTask) RunFunc(row *Row) error {
this.Function(this.winRowReaders, this.winRowWriters, this.instructionStat)
// when WinTask.Function done, send EOF Row
row := &Row{S: ErrorEOF}
for _, w := range this.winRowWriters {
w.WriteRow(row)
}
}
// RowRouter: only base logic, should add timeout control and error procs and more
type RowRouter struct {
readers []io.Reader
writers []io.Writer
Function func([]RowReader, []RowWriter, *pb.InstructionStat) error
winTaskReg map[int64]*WinTask
}
func (this *RowRouter) Run() {
for i, r := range this.readers {
winRowReader := &WinRowReader{}
go func() {
rawRowReader := &RawRowReader{reader: r}
for row, err := rawRowReader.ReadRow; err != nil {
winTask := this.GetOrGenWinTask(row.W)
winTask.winRowReaders[i].AddRow(row)
}
}
}
}
func (this *RowRouter) GetOrGenWinTask(winField int64) (*WinTask, error) {
return this.winTaskReg[winField], nil
}
The API changes have some good ideas, but need more thoughts on
- how to handle late events
- how to run blocking functions, such as sort, on the windowed data
maybe check Apache Beam/Flink/Google Data Flow.
ideas for the problems mentioned above
- how to run blocking functions, such as sort, on the windowed data
- the data flow more like to "from WinTask to WinTask", realized by the Task + Router, like the batch mode "from task to task", but these WinTasks share the physical Task, and route Row by the Router to right virtual Task belonging to it's window(WinTask)
- the blocking functions is running in WinTask context for each Window
- Each WinTask has alone goroutines run it's blocking functions
- if sort need blocking, just block the WinTask it is running
- reading done is decided by the RowReader error, this may from upstream WinTask, Or generate by the Router, or some Transform Function like SelectWindowField want to exit by itself
- Router route Rows to right WinTask's "blocking functions", through WinRowReaders
- In fact, in the windowing mode, there are multiple Windows executes at the same time
- the more cost by the Router is about 150ns / Row because the chanel, but there's a lot of space for optimization
- how to handle late events
- Router may Rename to "WinTaskControllor", Router not only responsible for routing, but also responsible for WinTask creation, destruction, timeout destruction control
- Router will have some params like upperLimitToNow/lowerLimitToNow, and these will affect the WinTask life period of Router control
- The first SelectWindowField transform function, may have one smaller upperLimitToNow/lowerLimitToNow, which just keep time window in realtime
- The windowing transform functions behind, may have much bigger upperLimitToNow/lowerLimitToNow, which indicates the timeout time of calculation
- same time I will learn the Apache Beam/Flink/Google Data Flow key ideas in recent days ~
666
Is windows supported now?