dfuse-eosio icon indicating copy to clipboard operation
dfuse-eosio copied to clipboard

Design of stacked filtering

Open abourget opened this issue 4 years ago • 1 comments

Meaning: filtering already filtered blocks.

We want to be able to pre-filter some large blocks, reduce them to take out undesired transactions, and then base a next phase of filtering on those ones. This can 20x performances for the second phase, instead of starting with full-full blocks.


Here are some changes to the proto definitions:

message Block {
  ...
  string filtering_include_filter_expr = 41;
  string filtering_exclude_filter_expr = 42;
  string filtering_system_actions_include_filter_expr = 50;

  repeated FilteringFilters filtering_applied_filters = 51;
}
message FilteringFilters {
  bytes hash = 1;  // of hash(include + exclude + sa_include codes)

  // the following would only be set each 100 blocks (on the %100 == 0 boundary)
  string include_filter_expr = 2;
  string exclude_filter_expr = 3;
  string system_actions_include_filter_expr = 4;
}

This would bump +1 the Version field of dfuse.eosio.codec.v1.Block, and we'll add something like:

diff --git a/pb/dfuse/eosio/codec/v1/codec.go b/pb/dfuse/eosio/codec/v1/codec.go
index fc7a3ae..cf3df26 100644
--- a/pb/dfuse/eosio/codec/v1/codec.go
+++ b/pb/dfuse/eosio/codec/v1/codec.go
@@ -151,6 +151,20 @@ func (b *Block) MigrateV0ToV1() {
        }
 }
 
+
+func (b *Block) MigrateV1ToV2() {
+       if b.Version != 1 {
+               return
+       }
+       b.Version = 2
+
+       if b.FilteringApplied {
+               // prendre filtering_include_filter_expr -> mettre dans un pbcodec.FilteringFilters{}
+               // hasher ceux qui sont trop gros? tous? pour tous les blocks où blockNum%100 != 0
+               // on append ce FilteredApplied à
+       }
+}
+

We'll also want to apply a patch similar to:

diff --git a/codec/decoder.go b/codec/decoder.go
index 69fec58..b1a8c89 100644
--- a/codec/decoder.go
+++ b/codec/decoder.go
@@ -30,7 +30,7 @@ func BlockDecoder(blk *bstream.Block) (interface{}, error) {
        }
 
        if blk.Version() != 1 {
-               return nil, fmt.Errorf("this decoder only knows about version 1, got %d", blk.Version())
+               return nil, fmt.Errorf("this decoder only knows about bstream.Block version 1, got %d", blk.Version())
        }
 
        block := new(pbcodec.Block)
@@ -51,7 +51,13 @@ func BlockDecoder(blk *bstream.Block) (interface{}, error) {
        //
        // We reconstruct the transaction & action count values
 
+       const MAX_SUPPORTED_PBCODEC_VERSION = 1
+       if block.Version > MAX_SUPPORTED_PBCODEC_VERSION {
+               return nil, fmt.Errorf("future block formats not supported, this code supports dfuse.eosio.codec.v1.Block version %d, received version %d", MAX_SUPPORTED_PBCODEC_VERSION, block.Version)
+       }
+
        block.MigrateV0ToV1()
+       //block.MigrateV1ToV2()
 
        return block, nil
 }

The filtering/filter.go file will need to be aware of those FilteringFilters bundles (include, exclude, sa_include), .. (WARN: might intertwine with #129 and create a combinatorial of Filters if the different --common-filters=#123123;filter are not aligned on the same block numbers).

Some other help:

diff --git a/filtering/filter.go b/filtering/filter.go
index 1d9cb6e..5803dcd 100644
--- a/filtering/filter.go
+++ b/filtering/filter.go
@@ -9,6 +9,7 @@ import (
 )
 
 type BlockFilter struct {
+// array of BlockFilters, at blockNum
 	IncludeProgram *CELFilter
 	ExcludeProgram *CELFilter
 }
@@ -58,6 +59,17 @@ func (f *BlockFilter) TransformInPlace(blk *bstream.Block) error {
 
 func (f *BlockFilter) transfromInPlace(block *pbcodec.Block) {
 	block.FilteringApplied = true
+
+	// Short-circuit:
+	// IF: FilteringFiltersApplied[0].include_filter_hash == IncludeProgram.codeHash
+	// AND: FilteringFiltersApplied[0].exclude_filter_hash == ExcludeProgram.codeHash
+	// AND: FilteringFiltersApplied[0].system_actions_include_filter_hash == SystemActionsIncludeProgram.codeHash
+
+	filtersApplied := []Applied{fl1, fl2, fl3, fl2}  // filters should be COMMUTATIVE, so we'd actually check if the filterHash was there anywhere in the `FilteringFiltersApplied`, we can skip this filtering.. as we're guaranteed things would all have been purged already.
+
+	// fl2
+
+	// TODO: apply expression and hash it
 	block.FilteringIncludeFilterExpr = f.IncludeProgram.code
 	block.FilteringExcludeFilterExpr = f.ExcludeProgram.code
 
@@ -71,6 +83,11 @@ func (f *BlockFilter) transfromInPlace(block *pbcodec.Block) {
 		trxTraceExcluded := true
 		for _, actTrace := range trxTrace.ActionTraces {
 			if !f.shouldProcess(trxTrace, actTrace) {
+				actTrace.FilteringMatched = false
+				continue
+			}
+
+			if !actTrace.FilteringMatched {
 				continue
 			}
 
@@ -90,9 +107,13 @@ func (f *BlockFilter) transfromInPlace(block *pbcodec.Block) {
 		if trxTrace.FailedDtrxTrace != nil {
 			for _, actTrace := range trxTrace.FailedDtrxTrace.ActionTraces {
 				if !f.shouldProcess(trxTrace.FailedDtrxTrace, actTrace) {
+					actTrace.FilteringMatched = false
 					continue
 				}
 
+				if !actTrace.FilteringMatched {
+					continue
+				}
 				actTrace.FilteringMatched = true
 				if !trxTraceAddedToFiltered {
 					filteredTrxTrace = append(filteredTrxTrace, trxTrace)

abourget avatar Aug 27 '20 15:08 abourget

Proposed simplified implementation (no hash optimization, no proto changes)

  1. when a new filter is applied for any type (include, exclude, ...), if a filter is already present, the new one is added, separated by symbol ;;; (as is done for the per-blocknum filter selector)

  2. a consumer program, supposed to apply a filter XYZ looks at a pre-filtered block, strings.splits() the applied filter, and checks if XYZ is included in the array to know if it can skip (filter already applied)

When later, we go for the optimization based on hashes, we can revisit (since we will have to modify the proto at that point)

sduchesneau avatar Nov 18 '20 19:11 sduchesneau