gocryptotrader icon indicating copy to clipboard operation
gocryptotrader copied to clipboard

stream: Add processor for concurrent processing of websocket data

Open shazbert opened this issue 1 year ago • 1 comments

PR Description

  • Adds processor (POC) type for streaming readers so that a read can be handled quickly and appropriately in a concurrent manner.
  • Adds method Process to take in a stream.Key struct that allows matching to a spawned routine that directly handles that update for future. The worker has a a buffer in front of it so that if for any reason there is a slow down it does not impede the direct reads of the websocket handler.
  • changes buffer.go orderbook holder's map main mutex to RW mutex. This allows quick direct access to individual data without the overhead of a a standard mutex. This allows for the processor to access the underlying data concurrently.

NOTE: This is only implemented for GATEIO due to its large pair count before undertaking support across all exchanges.

To test in gateio_websocket.go

diff --git a/exchanges/gateio/gateio_websocket.go b/exchanges/gateio/gateio_websocket.go
index 64e7c1139..ba1b25a4d 100644
--- a/exchanges/gateio/gateio_websocket.go
+++ b/exchanges/gateio/gateio_websocket.go
@@ -49,9 +49,9 @@ const (
 )

 var defaultSubscriptions = []string{
-       spotTickerChannel,
-       spotCandlesticksChannel,
-       spotTradesChannel,
+       // spotTickerChannel,
+       // spotCandlesticksChannel,
+       // spotTradesChannel,
        spotOrderbookTickerChannel,
 }

@@ -94,15 +94,35 @@ func (g *Gateio) generateWsSignature(secret, event, channel string, dtime time.T
        return hex.EncodeToString(mac.Sum(nil)), nil
 }

+var startTime = time.Now()
+var wow time.Duration
+var count int
+var ops int
+
 // wsReadConnData receives and passes on websocket messages for processing
 func (g *Gateio) wsReadConnData() {
+       go func() {
+               for {
+                       time.Sleep(time.Second)
+                       fmt.Printf("Current operating throughput: %v operations per second.\n", ops)
+                       ops = 0
+               }
+       }()
        defer g.Websocket.Wg.Done()
        for {
                resp := g.Websocket.Conn.ReadMessage()
                if resp.Raw == nil {
                        return
                }
+               count++
+               ops++
+               start := time.Now()
                err := g.wsHandleData(resp.Raw)
+               since := time.Since(start)
+               if since > wow {
+                       wow = since
+                       fmt.Printf("WOW: %v ms COUNT: %d SINCE: %s\n", since.Milliseconds(), count, time.Since(startTime).String())
+               }
                if err != nil {
                        g.Websocket.DataHandler <- err
                }
.\gocryptotrader --tickersync=false --enableallpairs

My testing:

  • Master branch: Operations per second was comparable to this branch but there was linear friction observed over time which exceeded 100ms in some cases, so everything in the websocket buffer was now out by 100ms (which is not ideal) until the websocket data handler could effectively catch up, and this would occur quite consistently.
  • This branch: Observed that this update achieved better latency performance ~5ms at the worst end over 5mins of operation. There is caveats to initial start with go routine generation and also the handover between a reader routine to the processor which will need to be benchmarked and ironed out but at this situation I am seeing some robust improvements from what it was. Also to note the buffer size of 10 I have only seen maxed out at start up hasn't yet filled up and complained over time.

Dependancy Required: #1309

Type of change

Please delete options that are not relevant and add an x in [] as item is complete.

  • [ ] Bug fix (non-breaking change which fixes an issue)
  • [x] New feature (non-breaking change which adds functionality)
  • [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • [ ] This change requires a documentation update

How has this been tested

Please describe the tests that you ran to verify your changes. Provide instructions so we can reproduce. Please also list any relevant details for your test configuration and also consider improving test coverage whilst working on a certain feature or package.

  • [ ] go test ./... -race
  • [ ] golangci-lint run
  • [ ] Test X

Checklist

  • [ ] My code follows the style guidelines of this project
  • [ ] I have performed a self-review of my own code
  • [ ] I have commented my code, particularly in hard-to-understand areas
  • [ ] I have made corresponding changes to the documentation and regenerated documentation via the documentation tool
  • [ ] My changes generate no new warnings
  • [ ] I have added tests that prove my fix is effective or that my feature works
  • [ ] New and existing unit tests pass locally and on Github Actions/AppVeyor with my changes
  • [ ] Any dependent changes have been merged and published in downstream modules

shazbert avatar Aug 10 '23 06:08 shazbert

Codecov Report

Attention: Patch coverage is 80.71429% with 27 lines in your changes are missing coverage. Please review.

Project coverage is 35.89%. Comparing base (d518993) to head (56e764b). Report is 14 commits behind head on master.

Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #1316      +/-   ##
==========================================
- Coverage   37.75%   35.89%   -1.86%     
==========================================
  Files         411      412       +1     
  Lines      147817   177701   +29884     
==========================================
+ Hits        55815    63794    +7979     
- Misses      84218   106109   +21891     
- Partials     7784     7798      +14     
Files Coverage Δ
exchanges/gateio/gateio_websocket.go 48.71% <100.00%> (-4.05%) :arrow_down:
exchanges/kucoin/kucoin_types.go 60.86% <ø> (-3.95%) :arrow_down:
exchanges/kucoin/kucoin_websocket.go 55.53% <100.00%> (-3.06%) :arrow_down:
exchanges/stream/websocket.go 85.02% <75.00%> (-1.03%) :arrow_down:
exchanges/stream/processor.go 91.80% <91.80%> (ø)
exchanges/stream/buffer/buffer.go 63.78% <66.12%> (-5.49%) :arrow_down:

... and 382 files with indirect coverage changes

codecov[bot] avatar Mar 11 '24 00:03 codecov[bot]