kubo icon indicating copy to clipboard operation
kubo copied to clipboard

Expose PubSub over gRPC

Open Pandapip1 opened this issue 3 years ago • 8 comments

Checklist

  • [X] My issue is specific & actionable.
  • [X] I am not suggesting a protocol enhancement.
  • [X] I have searched on the issue tracker for my issue.

Description

Currently, the HTTP API for PubSub is less than ideal. The fact that you have to send an HTTP GET request to publish something and listen to a stream to subscribe to something is clunky (and slow!), and is exactly what gRPC is meant to solve! I suggest that a new endpoint be added that exposes PubSub using gRPC.

Pandapip1 avatar Dec 09 '21 17:12 Pandapip1

Thank you for submitting your first issue to this repository! A maintainer will be here shortly to triage and review. In the meantime, please double-check that you have provided all the necessary information to make this process easy! Any information that can help save additional round trips is useful! We currently aim to give initial feedback within two business days. If this does not happen, feel free to leave a comment. Please keep an eye on how this issue will be labeled, as labels give an overview of priorities, assignments and additional actions requested by the maintainers:

  • "Priority" labels will show how urgent this is for the team.
  • "Status" labels will show if this is ready to be worked on, blocked, or in progress.
  • "Need" labels will indicate if additional input or analysis is required.

Finally, remember to use https://discuss.ipfs.io if you just need general support.

welcome[bot] avatar Dec 09 '21 17:12 welcome[bot]

Since gRPC has more browser support than it did in 2021, I am changing this to request gRPC instead for parity with js-ipfs.

Pandapip1 avatar Sep 29 '22 12:09 Pandapip1

@Pandapip1 gRPC is more complicated than websocket, I don't know if it's a thing we want to take on.

Jorropo avatar Sep 29 '22 14:09 Jorropo

Rationale:

IPFS is really useful for websites. Particularly, online games are often hard to run because they are fairly easily DDoSed, and it's hard to make a well-designed game server. IPFS PubSub just straight up solves this problem, and if you design it well, reduces latency manyfold as well.

IPFS PubSub has many other uses for websites too. But IPFS for online games would be practically the holy grail for amateur game devs (particularly those that hypothetically didn't yet have access to online payments). No need to use an expensive VPS. Just run a static site.

Pandapip1 avatar Sep 30 '22 11:09 Pandapip1

Should have a PR for this within a week.

GregoryVPerry avatar Jan 13 '23 11:01 GregoryVPerry

So here is the proposed architecture, please let me know if there are any issues with this proposed approach for the PR @Jorropo

Currently IPFS supports a pubsub implementation via:

ipfs daemon --enable-pubsub-experiment  # https://github.com/ipfs/kubo/blob/master/docs/experimental-features.md#ipfs-pubsub

This implementation however is REST-based so there are no options for push currently, where a pubsub subscription could be both initiated and monitored via a streaming method, which in this instance we will be implementing gRPC streaming for the PR.

So the goal of this task is to simply provide an alternative method for both pubsub topic subscription and notification, using gRPC streaming as an alternative method to the current method exposed within https://github.com/ipfs/kubo/blob/master/core/commands/pubsub.go.

First steps would be to fork and then:

make test  # from installed IPFS kubo directory

...to insure that your installation is correct, after pulling the ipfs kubo repo @ https://github.com/ipfs/kubo

From the repo (and per https://github.com/ipfs/kubo#development), main file is:

./cmd/ipfs/main.go

Sample code to add a gRPC server and listener in the main function:

grpcServer := grpc.NewServer()
pubsubGRPCServer := &pubsubGRPCServer{api.PubSubAPI}
RegisterPubSubServiceServer(grpcServer, pubsubGRPCServer)
listener, err := net.Listen("tcp", ":50051")
if err != nil {
    log.Fatalf("failed to listen: %v", err)
}
grpcServer.Serve(listener)

Imports to https://github.com/ipfs/kubo/blob/master/core/commands/pubsub.go:

import (
    "google.golang.org/grpc"
)

A struct for the gRPC service interface:

type pubsubGRPCServer struct {
    api.PubSubAPI
}

Define the gRPC service interface, including the new SubscribeGRPC method:

type PubSubServiceServer interface {
    Subscribe(context.Context, *Topic) (*Subscription, error)
    SubscribeGRPC(stream PubSubService_SubscribeGRPCServer) error
}

Implement the new SubscribeGRPC method in the pubsubGRPCServer struct:

func (s *pubsubGRPCServer) SubscribeGRPC(stream PubSubService_SubscribeGRPCServer) error {
    for {
        in, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        if err != nil {
            return err
        }
        topic := in.Topic

        sub, err := s.PubSub().Subscribe(stream.Context(), topic)
        if err != nil {
            return err
        }
        defer sub.Close()

        for {
            msg, err := sub.Next(stream.Context())
            if err == io.EOF || err == context.Canceled {
                return nil
            } else if err != nil {
                return err
            }

            // send the message to the client
            if err := stream.Send(&Message{Data: msg.Data}); err != nil {
                return err
            }
        }
    }
}

So in this instance we are providing sample golang code that modifies the current IPFS pubsub implementation for a gRPC server and listener, e.g.:

package main

import (
	"context"
	"fmt"
	"net"
	"net/http"

	"google.golang.org/grpc"

	pb "github.com/your/package/path/proto"
)

func main() {
	// ... existing code ...

	// Create a gRPC server
	grpcServer := grpc.NewServer()
	pb.RegisterYourServiceServer(grpcServer, &yourServer{})

	// Create a listener
	listener, err := net.Listen("tcp", ":50051")
	if err != nil {
		fmt.Printf("Failed to listen: %v", err)
	}

	// Register the gRPC server with the http.Server
	httpServer := &http.Server{
		Handler: grpcServer.ServeMux,
	}

	// Start the gRPC server and the http.Server
	go func() {
		if err := grpcServer.Serve(listener); err != nil {
			fmt.Printf("Failed to serve gRPC server: %v", err)
		}
	}()
	if err := httpServer.Serve(httpListener); err != nil {
		fmt.Printf("Failed to serve http server: %v", err)
	}
}

An additional library such as grpc_server.go would look like:

package main

import (
    "fmt"
    "net"

    "google.golang.org/grpc"
    "google.golang.org/grpc/reflection"

    pb "path/to/your/protobuf/files"
)

const (
    port = ":50051"
)

type server struct{}

func (s *server) YourRPCMethod(ctx context.Context, in *pb.YourRequestType) (*pb.YourResponseType, error) {
    // your implementation here
}

func main() {
    lis, err := net.Listen("tcp", port)
    if err != nil {
        fmt.Println("Failed to listen:", err)
    }
    s := grpc.NewServer()
    pb.RegisterYourServer(s, &server{})
    // Register reflection service on gRPC server.
    reflection.Register(s)
    if err := s.Serve(lis); err != nil {
        fmt.Println("Failed to serve:", err)
    }
}

The directory structure for the protos should just be a subdir in the current working directory, protos.

In the main() function the gRPC server is started, and with requests being served via gRPC as well as the native pubsub implementation provided by --experiment

https://blog.ipfs.tech/25-pubsub/
https://github.com/ipfs/js-ipfs/blob/master/docs/core-api/PUBSUB.md
https://github.com/ipfs/kubo/issues/8602

The PR will be submitted under 8602.

GregoryVPerry avatar Jan 15 '23 02:01 GregoryVPerry

LGTM!

Pandapip1 avatar Jan 15 '23 13:01 Pandapip1

It's been a year. Any updates?

Pandapip1 avatar Mar 11 '24 17:03 Pandapip1