dfuse-eosio
dfuse-eosio copied to clipboard
Design of stacked filtering
Meaning: filtering already filtered blocks.
We want to be able to pre-filter some large blocks, reduce them to take out undesired transactions, and then base a next phase of filtering on those ones. This can 20x performances for the second phase, instead of starting with full-full blocks.
Here are some changes to the proto definitions:
message Block {
...
string filtering_include_filter_expr = 41;
string filtering_exclude_filter_expr = 42;
string filtering_system_actions_include_filter_expr = 50;
repeated FilteringFilters filtering_applied_filters = 51;
}
message FilteringFilters {
bytes hash = 1; // of hash(include + exclude + sa_include codes)
// the following would only be set each 100 blocks (on the %100 == 0 boundary)
string include_filter_expr = 2;
string exclude_filter_expr = 3;
string system_actions_include_filter_expr = 4;
}
This would bump +1 the Version
field of dfuse.eosio.codec.v1.Block
, and we'll add something like:
diff --git a/pb/dfuse/eosio/codec/v1/codec.go b/pb/dfuse/eosio/codec/v1/codec.go
index fc7a3ae..cf3df26 100644
--- a/pb/dfuse/eosio/codec/v1/codec.go
+++ b/pb/dfuse/eosio/codec/v1/codec.go
@@ -151,6 +151,20 @@ func (b *Block) MigrateV0ToV1() {
}
}
+
+func (b *Block) MigrateV1ToV2() {
+ if b.Version != 1 {
+ return
+ }
+ b.Version = 2
+
+ if b.FilteringApplied {
+ // prendre filtering_include_filter_expr -> mettre dans un pbcodec.FilteringFilters{}
+ // hasher ceux qui sont trop gros? tous? pour tous les blocks où blockNum%100 != 0
+ // on append ce FilteredApplied à
+ }
+}
+
We'll also want to apply a patch similar to:
diff --git a/codec/decoder.go b/codec/decoder.go
index 69fec58..b1a8c89 100644
--- a/codec/decoder.go
+++ b/codec/decoder.go
@@ -30,7 +30,7 @@ func BlockDecoder(blk *bstream.Block) (interface{}, error) {
}
if blk.Version() != 1 {
- return nil, fmt.Errorf("this decoder only knows about version 1, got %d", blk.Version())
+ return nil, fmt.Errorf("this decoder only knows about bstream.Block version 1, got %d", blk.Version())
}
block := new(pbcodec.Block)
@@ -51,7 +51,13 @@ func BlockDecoder(blk *bstream.Block) (interface{}, error) {
//
// We reconstruct the transaction & action count values
+ const MAX_SUPPORTED_PBCODEC_VERSION = 1
+ if block.Version > MAX_SUPPORTED_PBCODEC_VERSION {
+ return nil, fmt.Errorf("future block formats not supported, this code supports dfuse.eosio.codec.v1.Block version %d, received version %d", MAX_SUPPORTED_PBCODEC_VERSION, block.Version)
+ }
+
block.MigrateV0ToV1()
+ //block.MigrateV1ToV2()
return block, nil
}
The filtering/filter.go
file will need to be aware of those FilteringFilters
bundles (include, exclude, sa_include), .. (WARN: might intertwine with #129 and create a combinatorial of Filters
if the different --common-filters=#123123;filter
are not aligned on the same block numbers).
Some other help:
diff --git a/filtering/filter.go b/filtering/filter.go
index 1d9cb6e..5803dcd 100644
--- a/filtering/filter.go
+++ b/filtering/filter.go
@@ -9,6 +9,7 @@ import (
)
type BlockFilter struct {
+// array of BlockFilters, at blockNum
IncludeProgram *CELFilter
ExcludeProgram *CELFilter
}
@@ -58,6 +59,17 @@ func (f *BlockFilter) TransformInPlace(blk *bstream.Block) error {
func (f *BlockFilter) transfromInPlace(block *pbcodec.Block) {
block.FilteringApplied = true
+
+ // Short-circuit:
+ // IF: FilteringFiltersApplied[0].include_filter_hash == IncludeProgram.codeHash
+ // AND: FilteringFiltersApplied[0].exclude_filter_hash == ExcludeProgram.codeHash
+ // AND: FilteringFiltersApplied[0].system_actions_include_filter_hash == SystemActionsIncludeProgram.codeHash
+
+ filtersApplied := []Applied{fl1, fl2, fl3, fl2} // filters should be COMMUTATIVE, so we'd actually check if the filterHash was there anywhere in the `FilteringFiltersApplied`, we can skip this filtering.. as we're guaranteed things would all have been purged already.
+
+ // fl2
+
+ // TODO: apply expression and hash it
block.FilteringIncludeFilterExpr = f.IncludeProgram.code
block.FilteringExcludeFilterExpr = f.ExcludeProgram.code
@@ -71,6 +83,11 @@ func (f *BlockFilter) transfromInPlace(block *pbcodec.Block) {
trxTraceExcluded := true
for _, actTrace := range trxTrace.ActionTraces {
if !f.shouldProcess(trxTrace, actTrace) {
+ actTrace.FilteringMatched = false
+ continue
+ }
+
+ if !actTrace.FilteringMatched {
continue
}
@@ -90,9 +107,13 @@ func (f *BlockFilter) transfromInPlace(block *pbcodec.Block) {
if trxTrace.FailedDtrxTrace != nil {
for _, actTrace := range trxTrace.FailedDtrxTrace.ActionTraces {
if !f.shouldProcess(trxTrace.FailedDtrxTrace, actTrace) {
+ actTrace.FilteringMatched = false
continue
}
+ if !actTrace.FilteringMatched {
+ continue
+ }
actTrace.FilteringMatched = true
if !trxTraceAddedToFiltered {
filteredTrxTrace = append(filteredTrxTrace, trxTrace)
Proposed simplified implementation (no hash optimization, no proto changes)
-
when a new filter is applied for any type (include, exclude, ...), if a filter is already present, the new one is added, separated by symbol
;;;
(as is done for the per-blocknum filter selector) -
a consumer program, supposed to apply a filter XYZ looks at a pre-filtered block, strings.splits() the applied filter, and checks if XYZ is included in the array to know if it can skip (filter already applied)
When later, we go for the optimization based on hashes, we can revisit (since we will have to modify the proto at that point)