brpc icon indicating copy to clipboard operation
brpc copied to clipboard

brpc 访问etcd的restful接口的问题

Open zengkui opened this issue 2 years ago • 13 comments

我想尝试用brpc来访问etcd的kv服务,实现了一个put key value的功能,demo代码如下,功能是正常的,但是我想用brpc实现watch功能,就出现问题了。

void PutKeyValue(brpc::Channel& channel) {
  brpc::Controller cntl;
  brpc::URI& uri = cntl.http_request().uri();
  uri.set_path("/v3/kv/put");
  cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
  butil::IOBufBuilder os;
  os << "{\"key\":\"Zm9v\",\"value\":\"YmFy\"}";
  os.move_to(cntl.request_attachment());

  channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
  if (cntl.Failed()) {
    std::cerr << cntl.ErrorText() << std::endl;
    return;
  }
  std::cout << cntl.response_attachment() << std::endl;
}

watch的demo代码如下, 调用CallMethod后,就一直阻塞在这个调用这里了。 这个watch机制应该是http的 chunked mode, 不知道brpc client应该如何处理这种chunked mode?

void Watch(brpc::Channel& channel) {
  brpc::Controller cntl;
  cntl.http_request().uri().set_path("/v3/watch");
  cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
  butil::IOBufBuilder os;
  os << "{\"create_request\": {\"key\":\"Zm9v\"}}";
  os.move_to(cntl.request_attachment());

  channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
  if (cntl.Failed()) {
    std::cerr << cntl.ErrorText() << std::endl;
    return;
  }
  std::cout << cntl.response_attachment() << std::endl;
  while (1) {sleep(1111111);}
}

zengkui avatar Jun 04 '22 07:06 zengkui

试试这个接口,这是专门用来读chunked response的: https://github.com/apache/incubator-brpc/blob/master/src/brpc/progressive_reader.h

但此接口比较底层,获取events后需要append至一个IOBuf内然后按格式边界自行切割、使用

jamesge avatar Jun 04 '22 09:06 jamesge

试试这个接口,这是专门用来读chunked response的: https://github.com/apache/incubator-brpc/blob/master/src/brpc/progressive_reader.h

但此接口比较底层,获取events后需要append至一个IOBuf内然后按格式边界自行切割、使用

感谢回复。 我尝试用utest里的写了一个demo,可以正常运行了,效果和curl 的一致。 但是这个自行切割边界 确实是一个比较麻烦的事情。 需要切割的原因是event的频率很高的话 会有多个event的response append到一起送给一次OnReadOnePart调用的参数里么?

zengkui avatar Jun 04 '22 12:06 zengkui

能描述一下大致的格式,或是贴一段典型的数据么?IOBuf中可能有相对便利一些的方法。

jamesge avatar Jun 04 '22 12:06 jamesge

能描述一下大致的格式,或是贴一段典型的数据么?IOBuf中可能有相对便利一些的方法。

以下是三个json string是三次事件的response,第一个create watch的response,后面两个是监听到了2次修改。 我现在直接把这个字符串直接做json load是没有边界问题的. 我看了下文档,只有消息体非常大的时候才有边界问题?

{"result":{"header":{"cluster_id":"14841639068965178418","member_id":"10276657743932975437","revision":"40","raft_term":"10"},"created":true}}

{"result":{"header":{"cluster_id":"14841639068965178418","member_id":"10276657743932975437","revision":"41","raft_term":"10"},"events":[{"kv":{"key":"Zm9v","create_revision":"4","mod_revision":"41","version":"38","value":"Mw=="}}]}}

{"result":{"header":{"cluster_id":"14841639068965178418","member_id":"10276657743932975437","revision":"42","raft_term":"10"},"events":[{"kv":{"key":"Zm9v","create_revision":"4","mod_revision":"42","version":"39","value":"NA=="}}]}}

zengkui avatar Jun 04 '22 12:06 zengkui

我现在直接把这个字符串直接做json load是没有边界问题的. 我看了下文档,只有消息体非常大的时候才有边界问题?

这个就属于理论上是错的,但并不是每时每刻都会发生。如果是线上服务,还是建议做正确了比较好,否则追查偶现的bug会耗费更多的时间。

brpc中带一个json2pb,支持用pb message来解析json,之前要求输入只包含一个json,现在我更新了下,可以支持多个json了(需要把Json2PbOptions.allow_remaining_bytes_after_parsing设为true后,默认false),例子可以参考https://github.com/apache/incubator-brpc/blob/master/test/brpc_protobuf_json_unittest.cpp#L1563

jamesge avatar Jun 05 '22 10:06 jamesge

brpc中带一个json2pb,支持用pb message来解析json,之前要求输入只包含一个json,现在我更新了下,可以支持多个json了(需要把Json2PbOptions.allow_remaining_bytes_after_parsing设为true后,默认false),例子可以参考https://github.com/apache/incubator-brpc/blob/master/test/brpc_protobuf_json_unittest.cpp#L1563

这个理论错误只会出现多个完整的json string在一个response里,不会出现残缺部分数据的情况吧?

zengkui avatar Jun 05 '22 10:06 zengkui

这个理论错误只会出现多个完整的json string在一个response里,不会出现残缺部分数据的情况吧?

不是,这里的数据就读自对应http链接的socket中,任何一种情况都可能出现,这是没有保证的,特别是出现中断(signal)和网络抖动时。只是在很多时候,数据已经在那了,且正好是一份。但如果是一个7*24运行的线上系统,出问题的概率应该就比较高了。

jamesge avatar Jun 05 '22 11:06 jamesge

这个理论错误只会出现多个完整的json string在一个response里,不会出现残缺部分数据的情况吧?

不是,这里的数据就读自对应http链接的socket中,任何一种情况都可能出现,这是没有保证的,特别是出现中断(signal)和网络抖动时。只是在很多时候,数据已经在那了,且正好是一份。但如果是一个7*24运行的线上系统,出问题的概率应该就比较高了。

谢谢,etcd的这个监听时间的功能基本没啥问题了。但是实际上watch的服务定义如下,不知道这个cancel 操作怎么才能通过BRPC发送到server端? 我的理解这是一个类似stream rpc的功能,用brpc的http client似乎做不到,只能在需要cancel的时候通过另外一个连接来发送? 类似的keep alive的操作似乎也只能采用一个专用线程来间隔发送心跳信息了。

  // Watch watches for events happening or that have happened. Both input and output
  // are streams; the input stream is for creating and canceling watchers and the output
  // stream sends events. One watch RPC can watch on multiple key ranges, streaming events
  // for several watches at once. The entire event history can be watched starting from the
  // last compaction revision.
  rpc Watch(stream WatchRequest) returns (stream WatchResponse) {
      option (google.api.http) = {
        post: "/v3/watch"
        body: "*"
    };
  }
}

zengkui avatar Jun 05 '22 14:06 zengkui

Related: #1589

wwbmmm avatar Jun 06 '22 03:06 wwbmmm

@zengkui 这块能封装个形如EtcdUtils的通用类,让其他用户也方便注册到etcd上么(通过restful的方式)

jamesge avatar Jun 14 '22 15:06 jamesge

@zengkui 这块能封装个形如EtcdUtils的通用类,让其他用户也方便注册到etcd上么(通过restful的方式)

因为etcd的接口比较多,另外etcd的版本变化会导致PB文件中接口变化问题,这个在brpc里不是太好维护。 我之前是想写几个example放在brpc里 可以参考着用。 不知道这块你有什么好的建议没?

zengkui avatar Jun 16 '22 11:06 zengkui

@zengkui 字段变化有没有更具体的例子,我不是特别理解,因为etcd修改接口一般会考虑后向兼容吧,而且pb文件加字段也是后向兼容的。utils本身的接口能覆盖主要功能就行了,暂时不用求全。

jamesge avatar Jun 16 '22 12:06 jamesge

@zengkui 字段变化有没有更具体的例子,我不是特别理解,因为etcd修改接口一般会考虑后向兼容吧,而且pb文件加字段也是后向兼容的。utils本身的接口能覆盖主要功能就行了,暂时不用求全。

嗯,etcd升级到V3后就是PB了,后向兼容应该是没有问题。 只是新增的字段的话就涉及到pb文件同步的问题,如果brpc这边觉的没啥问题的话,我可以实现目前常用到的一些接口 先push进去,然后大家再一起更新完善也是ok的。 后续可能还有一些处理细节问题,可以后面候聊下。

zengkui avatar Jun 16 '22 12:06 zengkui