brpc
brpc copied to clipboard
brpc 访问etcd的restful接口的问题
我想尝试用brpc来访问etcd的kv服务,实现了一个put key value的功能,demo代码如下,功能是正常的,但是我想用brpc实现watch功能,就出现问题了。
void PutKeyValue(brpc::Channel& channel) {
brpc::Controller cntl;
brpc::URI& uri = cntl.http_request().uri();
uri.set_path("/v3/kv/put");
cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
butil::IOBufBuilder os;
os << "{\"key\":\"Zm9v\",\"value\":\"YmFy\"}";
os.move_to(cntl.request_attachment());
channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
if (cntl.Failed()) {
std::cerr << cntl.ErrorText() << std::endl;
return;
}
std::cout << cntl.response_attachment() << std::endl;
}
watch的demo代码如下, 调用CallMethod后,就一直阻塞在这个调用这里了。 这个watch机制应该是http的 chunked mode, 不知道brpc client应该如何处理这种chunked mode?
void Watch(brpc::Channel& channel) {
brpc::Controller cntl;
cntl.http_request().uri().set_path("/v3/watch");
cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
butil::IOBufBuilder os;
os << "{\"create_request\": {\"key\":\"Zm9v\"}}";
os.move_to(cntl.request_attachment());
channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
if (cntl.Failed()) {
std::cerr << cntl.ErrorText() << std::endl;
return;
}
std::cout << cntl.response_attachment() << std::endl;
while (1) {sleep(1111111);}
}
试试这个接口,这是专门用来读chunked response的: https://github.com/apache/incubator-brpc/blob/master/src/brpc/progressive_reader.h
但此接口比较底层,获取events后需要append至一个IOBuf内然后按格式边界自行切割、使用
试试这个接口,这是专门用来读chunked response的: https://github.com/apache/incubator-brpc/blob/master/src/brpc/progressive_reader.h
但此接口比较底层,获取events后需要append至一个IOBuf内然后按格式边界自行切割、使用
感谢回复。 我尝试用utest里的写了一个demo,可以正常运行了,效果和curl 的一致。 但是这个自行切割边界 确实是一个比较麻烦的事情。 需要切割的原因是event的频率很高的话 会有多个event的response append到一起送给一次OnReadOnePart调用的参数里么?
能描述一下大致的格式,或是贴一段典型的数据么?IOBuf中可能有相对便利一些的方法。
能描述一下大致的格式,或是贴一段典型的数据么?IOBuf中可能有相对便利一些的方法。
以下是三个json string是三次事件的response,第一个create watch的response,后面两个是监听到了2次修改。 我现在直接把这个字符串直接做json load是没有边界问题的. 我看了下文档,只有消息体非常大的时候才有边界问题?
{"result":{"header":{"cluster_id":"14841639068965178418","member_id":"10276657743932975437","revision":"40","raft_term":"10"},"created":true}}
{"result":{"header":{"cluster_id":"14841639068965178418","member_id":"10276657743932975437","revision":"41","raft_term":"10"},"events":[{"kv":{"key":"Zm9v","create_revision":"4","mod_revision":"41","version":"38","value":"Mw=="}}]}}
{"result":{"header":{"cluster_id":"14841639068965178418","member_id":"10276657743932975437","revision":"42","raft_term":"10"},"events":[{"kv":{"key":"Zm9v","create_revision":"4","mod_revision":"42","version":"39","value":"NA=="}}]}}
我现在直接把这个字符串直接做json load是没有边界问题的. 我看了下文档,只有消息体非常大的时候才有边界问题?
这个就属于理论上是错的,但并不是每时每刻都会发生。如果是线上服务,还是建议做正确了比较好,否则追查偶现的bug会耗费更多的时间。
brpc中带一个json2pb,支持用pb message来解析json,之前要求输入只包含一个json,现在我更新了下,可以支持多个json了(需要把Json2PbOptions.allow_remaining_bytes_after_parsing设为true后,默认false),例子可以参考https://github.com/apache/incubator-brpc/blob/master/test/brpc_protobuf_json_unittest.cpp#L1563
brpc中带一个json2pb,支持用pb message来解析json,之前要求输入只包含一个json,现在我更新了下,可以支持多个json了(需要把Json2PbOptions.allow_remaining_bytes_after_parsing设为true后,默认false),例子可以参考https://github.com/apache/incubator-brpc/blob/master/test/brpc_protobuf_json_unittest.cpp#L1563
这个理论错误只会出现多个完整的json string在一个response里,不会出现残缺部分数据的情况吧?
这个理论错误只会出现多个完整的json string在一个response里,不会出现残缺部分数据的情况吧?
不是,这里的数据就读自对应http链接的socket中,任何一种情况都可能出现,这是没有保证的,特别是出现中断(signal)和网络抖动时。只是在很多时候,数据已经在那了,且正好是一份。但如果是一个7*24运行的线上系统,出问题的概率应该就比较高了。
这个理论错误只会出现多个完整的json string在一个response里,不会出现残缺部分数据的情况吧?
不是,这里的数据就读自对应http链接的socket中,任何一种情况都可能出现,这是没有保证的,特别是出现中断(signal)和网络抖动时。只是在很多时候,数据已经在那了,且正好是一份。但如果是一个7*24运行的线上系统,出问题的概率应该就比较高了。
谢谢,etcd的这个监听时间的功能基本没啥问题了。但是实际上watch的服务定义如下,不知道这个cancel 操作怎么才能通过BRPC发送到server端? 我的理解这是一个类似stream rpc的功能,用brpc的http client似乎做不到,只能在需要cancel的时候通过另外一个连接来发送? 类似的keep alive的操作似乎也只能采用一个专用线程来间隔发送心跳信息了。
// Watch watches for events happening or that have happened. Both input and output
// are streams; the input stream is for creating and canceling watchers and the output
// stream sends events. One watch RPC can watch on multiple key ranges, streaming events
// for several watches at once. The entire event history can be watched starting from the
// last compaction revision.
rpc Watch(stream WatchRequest) returns (stream WatchResponse) {
option (google.api.http) = {
post: "/v3/watch"
body: "*"
};
}
}
Related: #1589
@zengkui 这块能封装个形如EtcdUtils的通用类,让其他用户也方便注册到etcd上么(通过restful的方式)
@zengkui 这块能封装个形如EtcdUtils的通用类,让其他用户也方便注册到etcd上么(通过restful的方式)
因为etcd的接口比较多,另外etcd的版本变化会导致PB文件中接口变化问题,这个在brpc里不是太好维护。 我之前是想写几个example放在brpc里 可以参考着用。 不知道这块你有什么好的建议没?
@zengkui 字段变化有没有更具体的例子,我不是特别理解,因为etcd修改接口一般会考虑后向兼容吧,而且pb文件加字段也是后向兼容的。utils本身的接口能覆盖主要功能就行了,暂时不用求全。
@zengkui 字段变化有没有更具体的例子,我不是特别理解,因为etcd修改接口一般会考虑后向兼容吧,而且pb文件加字段也是后向兼容的。utils本身的接口能覆盖主要功能就行了,暂时不用求全。
嗯,etcd升级到V3后就是PB了,后向兼容应该是没有问题。 只是新增的字段的话就涉及到pb文件同步的问题,如果brpc这边觉的没啥问题的话,我可以实现目前常用到的一些接口 先push进去,然后大家再一起更新完善也是ok的。 后续可能还有一些处理细节问题,可以后面候聊下。