proxy-wasm-cpp-sdk icon indicating copy to clipboard operation
proxy-wasm-cpp-sdk copied to clipboard

[Question] Using gRPC bidi streams

Open NomadXD opened this issue 4 years ago • 7 comments

Any example for how to use gRPC bidi streams from a wasm filter ? As I understand we can make gRPC calls from either Context or RootContext.

NomadXD avatar Mar 20 '21 16:03 NomadXD

what's so confusing is auto res = root()->grpcStreamHandler(grpc_service_string, EchoServerServiceName, PublishMetadataMethodName, initial_metadata,std::unique_ptr<GrpcStreamHandlerBase>(new MyGrpcCallStreamHandler(this)));
returns a WasmResult. So if we are using a grpc streaming from a filter we have to ,


MyGrpcCallStreamHandler streamHandler_ = new MyGrpcCallStreamHandler(this);
auto res = root()->grpcStreamHandler(grpc_service_string, EchoServerServiceName,EchoMethodName, initial_metadata,std::unique_ptr<GrpcStreamHandlerBase>(streamHandler_));

from onCreate in context.

and then implement some methods in MyGrpcCallStreamHandler to make it possible for the ExampleFilter to call the send method in MyGrpcCallStreamHandler. Is that the correct way to do it ?

NomadXD avatar Mar 22 '21 07:03 NomadXD

Really appreciate if someone can provide a sample code snippet of the way to use grpcStreamHandler since there's no any resource available to refer. Thanks

NomadXD avatar Mar 22 '21 08:03 NomadXD

@NomadXD this is the only example of grpcStreamHandler that I could find: https://github.com/envoyproxy/envoy/blob/main/test/extensions/filters/http/wasm/test_data/test_grpc_stream_cpp.cc

In our production code, we actually use root()->grpcSimpleCall(...).

PiotrSikora avatar Mar 23 '21 06:03 PiotrSikora

@PiotrSikora I'm doing the same thing like in that example, but I want to send a message through a opened stream when onRequestBody() is called. So in my filter I open a stream in onRequestHeaders() and keep a reference to MyGrpcStreamHandler in my class. When onRequestBody() is called, using the reference to MyGrpcStreamHandler, I send the message through the open grpc stream. You can have a look at the following code snippets to get an understanding.

This is my filter class. I will only include what's necessary for you to understand. Header file


class ExampleContext : public Context {
public:
  explicit ExampleContext(uint32_t id, RootContext* root) : Context(id, root) {}

  void onCreate() override;
  FilterHeadersStatus onRequestHeaders(uint32_t headers, bool end_of_stream) override;
  FilterDataStatus onRequestBody(size_t body_buffer_length, bool end_of_stream) override;
  FilterHeadersStatus onResponseHeaders(uint32_t headers, bool end_of_stream) override;
  FilterDataStatus onResponseBody(size_t body_buffer_length, bool end_of_stream) override;
  void onDone() override;
  void onLog() override;
  void onDelete() override;

private:
  std::unique_ptr<GrpcStreamHandler<echo::EchoRequest, echo::EchoReply>> stream_handler_{};
  bool is_stream_ = false;

};

onRequestHeader() and onRequestBody() methods

FilterHeadersStatus ExampleContext::onRequestHeaders(uint32_t, bool) {
  LOG_INFO(std::string("onRequestHeaders called ") + std::to_string(id()));
// This is where I instantiate the handler. 
  this->stream_handler_ = std::unique_ptr<GrpcStreamHandler<EchoRequest, EchoReply>>(new MyGrpcCallStreamHandler(this));
  GrpcService grpc_service;
  ExampleRootContext *a = dynamic_cast<ExampleRootContext*>(root());
  grpc_service.mutable_envoy_grpc()->set_cluster_name(a->config_.clustername());  
  std::string grpc_service_string;
  grpc_service.SerializeToString(&grpc_service_string);
  HeaderStringPairs initial_metadata;
  initial_metadata.push_back(std::pair("parent", "bar"));
// And then I instantiate the grpc stream here
  auto res1 = root()->grpcStreamHandler(grpc_service_string, EchoServerServiceName, SayHelloBidiStream, initial_metadata,std::move(this->stream_handler_ ) );
  if (res1 != WasmResult::Ok) {
    LOG_ERROR("Calling gRPC server failed: " + toString(res1));
  }else{
    this->is_stream_ = true;
    LOG_INFO(std::string("gRPC stream initiated"));     
  }                 
  return FilterHeadersStatus::Continue;
}


FilterDataStatus ExampleContext::onRequestBody(size_t body_buffer_length,
                                               bool /* end_of_stream */) {
  auto body = getBufferBytes(WasmBufferType::HttpRequestBody, 0, body_buffer_length);
  LOG_INFO(std::string("onRequestBody ") + std::string(body->view()));
  std::string sample_string = "Hello !";
  EchoRequest request;
  request.set_name(sample_string);
  if(this->is_stream_ == true){
    LOG_INFO(std::string("stream available sending message"));
// using the stream created, i try to send a message. 
    auto res = this->stream_handler_->send(request, false);
    if (res != WasmResult::Ok) {
      LOG_INFO(std::string("error sending gRPC >>>>>>>")+ toString(res));
    }
    LOG_INFO(std::string("grpc sent:"+ toString(res)));
  }
  return FilterDataStatus::Continue;
}

This is the handler class.

class MyGrpcCallStreamHandler : public GrpcStreamHandler<EchoRequest, EchoReply> {
  public:
    MyGrpcCallStreamHandler(ExampleContext *context) {context_ = context;}

    void onReceive(size_t body_size) override {
      LOG_INFO("gRPC streaming onReceive");
    }

    void onRemoteClose(GrpcStatus status) override {
      LOG_INFO("gRPC streaming onRemoteClose");
    }

  private:
    ExampleContext *context_;

};

I tried it as above. The bidi stream gets created but messages don't get sent. But the stream gets created. Any ideas or tips ? Thanks in advance

NomadXD avatar Mar 23 '21 09:03 NomadXD

@PiotrSikora I think there's an issue with the way gRPC bidi streams are implemented in C++ SDK. Since the method which creates the gRPC stream ,

WasmResult grpcStreamHandler(std::string_view service, std::string_view service_name,
                               std::string_view method_name,
                               const HeaderStringPairs &initial_metadata,
                               std::unique_ptr<GrpcStreamHandlerBase> handler);

takes std::unique_ptr<GrpcStreamHandlerBase> handler as a parameter, if we are calling this method from a filter, we can't keep a pointer to the handler. Which means we don't have access to the handler , which means we can't call the send() method in the StreamHandler. So that means we can't send a messages through a open grpc streams from a filter. Any ideas ?

NomadXD avatar Mar 24 '21 19:03 NomadXD

The API is indeed quite unfortunate, but I think you could use it like this:

  this->stream_handler_ = new MyGrpcCallStreamHandler(this);
  ...
  root()->grpcStreamHandler(..., std::unique_ptr<GrpcCallHandlerBase>(this->stream_handler_));
  ...
  this->stream_handler_->send(request, false);

PiotrSikora avatar Mar 31 '21 11:03 PiotrSikora

@PiotrSikora yeah that's how i use it now. My worry is that whether it's safe to keep a raw pointer in the filter class. When we pass it as std::unique_ptr , we don't have ownership to the stream_handler_.

Now when onRemoteClose gets triggered , I do a callback to the filter and let it know that handler is not there so it won't call stream_handler->send(). And then I create another handler. (my filter is kind of a persistent one that stays there for the duration of a websocket connection)

Also when onDelete should I do this->stream_handler_->close() or reset() ??

NomadXD avatar Mar 31 '21 11:03 NomadXD