guac icon indicating copy to clipboard operation
guac copied to clipboard

Collector Subscriber requires data sources tracking

Open pxp928 opened this issue 2 years ago • 5 comments

Currently, the collectors run through all the data sources provided by the collector subscriber. There needs to be a collector-specific time stamp for all the data sources that are checked to keep track and not keep rechecking. After a certain interval specified by the user, the data sources can be rechecked by the collector for updated information if it exists.

pxp928 avatar Apr 03 '23 14:04 pxp928

This seems like an issue I can help fix. Could you point me to where the checking happens in the code, please?

arorasoham9 avatar Jul 22 '23 06:07 arorasoham9

@lumjjb an update to this. We may want to consider removing it from the data sources once information has been found about it from deps.dev or other collectors. Otherwise we may be hitting a limit as shown below. Need to investigate further for the root cause of this issue.

{"level":"info","ts":1712865729.4502122,"caller":"logging/logger.go:70","msg":"Logging at info level"}
{"level":"info","ts":1712865729.4503033,"caller":"cli/init.go:69","msg":"Using config file: /guac/guac.yaml"}
{"level":"error","ts":1712865729.5760174,"caller":"cmd/files.go:165","msg":"collector ended with error: unable to 
populate reporefs: unable to retrieve datasource: rpc error: code = ResourceExhausted desc = grpc: received message larger than max (17081927 vs. 
4194304)","stacktrace":"github.com/guacsec/guac/cmd/guaccollect/cmd.initializeNATsandCollector.func2\n\t/home/runn
er/work/iac/iac/cmd/guaccollect/cmd/files.go:165\ngithub.com/guacsec/guac/pkg/handler/collector.Collect\n\t/home/runne
r/work/iac/iac/pkg/handler/collector/collector.go:101\ngithub.com/guacsec/guac/cmd/guaccollect/cmd.initializeNATsandCo
llector.func3\n\t/home/runner/work/iac/iac/cmd/guaccollect/cmd/files.go:176"}
{"level":"info","ts":1712865729.5761647,"caller":"cmd/files.go:187","msg":"All Collectors completed"}

pxp928 avatar Apr 17 '24 13:04 pxp928

I believe this is an issue today.

I am running guacgql, guacingest, guaccsub, and guaccollect deps_dev, current commit is 358205b583a21f3ba196073c8b08521c6389ace4. I have applied the following debug diff:

diff --git a/pkg/collectsub/client/client.go b/pkg/collectsub/client/client.go
index eb63ee7d..0513a084 100644
--- a/pkg/collectsub/client/client.go
+++ b/pkg/collectsub/client/client.go
@@ -25,6 +25,7 @@ import (
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/credentials"
 	"google.golang.org/grpc/credentials/insecure"
+	"google.golang.org/protobuf/proto"
 )
 
 type Client interface {
@@ -101,6 +102,11 @@ func (c *client) GetCollectEntries(ctx context.Context, filters []*pb.CollectEnt
 	res, err := c.client.GetCollectEntries(ctx, &pb.GetCollectEntriesRequest{
 		Filters: filters,
 	})
+
+	if res != nil {
+		fmt.Printf("Res size is %d bytes\n", proto.Size(res))
+	}
+
 	if err != nil {
 		return nil, err
 	}
diff --git a/pkg/collectsub/datasource/csubsource/csubsource.go b/pkg/collectsub/datasource/csubsource/csubsource.go
index 21316af4..872d340c 100644
--- a/pkg/collectsub/datasource/csubsource/csubsource.go
+++ b/pkg/collectsub/datasource/csubsource/csubsource.go
@@ -58,6 +58,11 @@ func (d *csubDataSources) GetDataSources(ctx context.Context) (*datasource.DataS
 	}
 	ds := entriesToSources(ctx, entries)
 
+	fmt.Printf("Num OCI DataSources:            %d\n", len(ds.OciDataSources))
+	fmt.Printf("Num Git DataSources:            %d\n", len(ds.GitDataSources))
+	fmt.Printf("Num GitHub Release DataSources: %d\n", len(ds.GithubReleaseDataSources))
+	fmt.Printf("Num pURL DataSources:           %d\n", len(ds.PurlDataSources))
+
 	return ds, nil
 }

Then I run go run ./cmd/guaccollect files ~/dev/bom-shelter/in-the-lab/spdx-popular-containers/data/ --service-poll=false which initiates the ingestion of a large collection of SBOMs.

While the SBOMs are being ingested, in another terminal tab I run go run ./cmd/guaccollect image "ghcr.io/guacsec/go-multi-test:7ddfb3e035b42cd70649cc33393fe32c" --service-poll=false a few times over.

After a few minutes, it looked like this:

nchelluri@Narsimhams-MBP guac % go run ./cmd/guaccollect image "ghcr.io/guacsec/go-multi-test:7ddfb3e035b42cd70649cc33393fe32c" --service-poll=false
{"level":"info","ts":1713442963.9271512,"caller":"logging/logger.go:75","msg":"Logging at info level"}
{"level":"info","ts":1713442963.927302,"caller":"cli/init.go:69","msg":"Using config file: /Users/nchelluri/dev/guac/guac.yaml"}
Res size is 2362787 bytes
Num OCI DataSources:            0
Num Git DataSources:            0
Num GitHub Release DataSources: 0
Num pURL DataSources:           28036
{"level":"info","ts":1713442963.94002,"caller":"cmd/files.go:164","msg":"collector ended gracefully"}
{"level":"info","ts":1713442963.940036,"caller":"cmd/files.go:189","msg":"All Collectors completed"}
nchelluri@Narsimhams-MBP guac % go run ./cmd/guaccollect image "ghcr.io/guacsec/go-multi-test:7ddfb3e035b42cd70649cc33393fe32c" --service-poll=false
{"level":"info","ts":1713443251.8381839,"caller":"logging/logger.go:75","msg":"Logging at info level"}
{"level":"info","ts":1713443251.838291,"caller":"cli/init.go:69","msg":"Using config file: /Users/nchelluri/dev/guac/guac.yaml"}
{"level":"error","ts":1713443251.9148831,"caller":"cmd/files.go:167","msg":"collector ended with error: unable to populate reporefs: unable to retrieve datasource: rpc error: code = ResourceExhausted desc = grpc: received message larger than max (10671859 vs. 4194304)","stacktrace":"github.com/guacsec/guac/cmd/guaccollect/cmd.initializeNATsandCollector.func2\n\t/Users/nchelluri/dev/guac/cmd/guaccollect/cmd/files.go:167\ngithub.com/guacsec/guac/pkg/handler/collector.Collect\n\t/Users/nchelluri/dev/guac/pkg/handler/collector/collector.go:111\ngithub.com/guacsec/guac/cmd/guaccollect/cmd.initializeNATsandCollector.func3\n\t/Users/nchelluri/dev/guac/cmd/guaccollect/cmd/files.go:178"}
{"level":"info","ts":1713443251.914948,"caller":"cmd/files.go:189","msg":"All Collectors completed"}
nchelluri@Narsimhams-MBP guac % 

Just so you don't have to horizontally scroll here is the interesting bit of the above:

rpc error: code = ResourceExhausted desc = grpc: received message larger than max (10671859 vs. 4194304)

I am attempting to put together a PR to solve the issue now. After discussions with Parth, I am planning to expand the protobuf interface

service ColectSubscriberService {
  rpc AddCollectEntries(AddCollectEntriesRequest) returns (AddCollectEntriesResponse);
  rpc GetCollectEntries (GetCollectEntriesRequest) returns (GetCollectEntriesResponse);
}

by adding a new RemoveCollectEntry() rpc and then implementing it and making use of it in the collectors: when a collector gets a DataSource from the Collector Subscriber Service, it will then delete it. That's my initial idea anyway.

nchelluri avatar Apr 18 '24 12:04 nchelluri

@lumjjb I was trying to implement the solution I described above and while I did have some success, eventually I did run into the same issue. I was wondering if I could give some background and run an alternate approach by you as Parth has told me that you initially created the csub.

  1. I am trying a test where I run go run ./cmd/guaccollect files ~/dev/bom-shelter/in-the-lab/spdx-popular-containers/data/ --service-poll=false
    • that path is the data directory from https://github.com/chainguard-dev/bom-shelter/tree/89ed4943e027c9bf148e6a80d7b2b78ae8d7771a/in-the-lab/spdx-popular-containers
    • the directory contains approx 4.2GB of SPDX JSON SBOMs
      • first off, is this too much? Can we say that we don't need to implement ingesting that many SBOMs at once, and then I can trim the data set and find a solution that works for the smaller data set?
  2. Basically my approach was to add code like the deps.dev collector diff below and all of the backing stuff to make it work with GRPC (that is to say: I roughly finished the approach I described above. I made a new RPC RemoveCollectEntries() and called it with a slice of data sources).
  3. At this point, I found that while the pURL data source list did get trimmed, there still comes a point where any collector that calls ds.GetDataSources() (where ds is an instance of csubDataSources) would get a GRPC response from the csub that is too large and so the collector errors out.

So my current proposed solution is:

  1. Finish the rough implementation I have started and make it ready for a PR. But before doing that,
  2. Also change the GetCollectEntries() rpc in collectsub.proto to make it have a streaming response. I don't have extensive experience with GRPC but I do know there is support for streaming there, and I would read up on it and make use of that.

Before proceeding, I wanted to get your thoughts on the matter. What do you think? Is this something that can and should be done?

An alternate approach that could be taken as a stop-gap measure - or maybe more permanently - would be to increase the allowed GRPC response size. I am not sure if this is a good idea but I tend to think it is likely to be fragile and eventually we would run into this issue again.

Thanks!

deps.dev collector diff I used:

diff --git a/pkg/handler/collector/deps_dev/deps_dev.go b/pkg/handler/collector/deps_dev/deps_dev.go
index b5cf44a9..5029aab9 100644
--- a/pkg/handler/collector/deps_dev/deps_dev.go
+++ b/pkg/handler/collector/deps_dev/deps_dev.go
@@ -168,6 +168,11 @@ func (d *depsCollector) populatePurls(ctx context.Context, docChannel chan<- *pr
                        return fmt.Errorf("failed to fetch dependencies: %w", err)
                }
        }
+
+       if err := d.collectDataSource.RemoveDataSources(ctx, ds); err != nil {
+               return err
+       }
+
        return nil
 }

nchelluri avatar Apr 18 '24 16:04 nchelluri

Just adding an update here... this issue was discussed at the GUAC Meeting yesterday and there are two action items:

  1. Make the GetCollectEntries() RPC response streaming
  2. At a future meeting, discuss how to implement something like a pubsub interface so that clients can connect to the Collector Subscriber and get only new data sources (this is an open-ended discussion and subject to change). If a client joined "late" and missed some data sources, it might have to traverse the GUAC graph to find them. Probably once this is implemented, the streaming/canonical RPC will go away entirely.

There was also a decision made to not implement removal of data sources.

I aim to work on (1) in the next few days. Maybe next week (2) will be discussed at the GUAC Meeting.

Edit: I implemented (1) in https://github.com/guacsec/guac/pull/1865

nchelluri avatar Apr 23 '24 14:04 nchelluri