cpp2sky
cpp2sky copied to clipboard
E1210 05:57:53.019580400 7193 proto_buffer_writer.h:65] assertion failed: !byte_buffer->Valid()
I have a new problem. this problem occur in my test demo. demo has only one thread. this is my demo code:
#include
#include
#include "cpp2sky/propagation.h"
#include "cpp2sky/tracer.h"
#include "cpp2sky/tracing_context.h"
#include "cpp2sky/well_known_names.h"
using namespace cpp2sky;
TracerConfig config;
void DoTest(int num){
std::fprintf(stdout, "start\n");
config.set_instance_name("node_0");
config.set_service_name("mesh");
config.set_address("0.0.0.0:11800");
TracerPtr _Kcbp_tracer = createInsecureGrpcTracer(config);
std::fprintf(stdout, "start1\n");
while (num-- > 0) {
std::fprintf(stdout, "num:%d\n", num);
auto tracing_context = _Kcbp_tracer->newContext();
{
std::shared_ptr spExitAf = std::make_shared(tracing_context, spSpan->get(), "KCBP_SKYNAME_BpcallAfter");
auto span = tracing_context->createEntrySpan();
span->startSpan("/grpc_client.Greeter/handle");
if (tracing_context != nullptr) {
span->endSpan();
}
}
bool bRet = _Kcbp_tracer->report(std::move(tracing_context));
}
std::fprintf(stdout, "end\n");
}
int main() {
std::fprintf(stdout, "main\n");
DoTest(3000);
return 0;
}
the cpp2sky branch is main. gcc 4.9.0
@wu-sheng I think I know where is the problem, Incorrect use of the grpc asynclient, modify the “grpc_async_client_impl.cc” like this
// Copyright 2020 SkyAPM
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "grpc_async_client_impl.h"
#include <chrono>
#include <thread>
#include "absl/strings/string_view.h"
#include "cpp2sky/exception.h"
#include "spdlog/spdlog.h"
namespace cpp2sky {
namespace {
static constexpr absl::string_view authenticationKey = "authentication";
}
using namespace spdlog;
GrpcAsyncSegmentReporterClient::GrpcAsyncSegmentReporterClient(
const std::string& address, grpc::CompletionQueue& cq,
ClientStreamingStreamBuilderPtr<TracerRequestType, TracerResponseType>
factory,
std::shared_ptr<grpc::ChannelCredentials> cred)
: factory_(std::move(factory)),
cq_(cq),
stub_(grpc::CreateChannel(address, cred)) {
startStream();
}
GrpcAsyncSegmentReporterClient::~GrpcAsyncSegmentReporterClient() {
// It will wait until there is no drained messages with 5 second timeout.
if (stream_) {
std::unique_lock<std::mutex> lck(mux_);
while (!pending_messages_.empty()) {
cv_.wait_for(lck, std::chrono::seconds(5));
pending_messages_.clear();
}
}
resetStream();
}
void GrpcAsyncSegmentReporterClient::sendMessage(TracerRequestType message) {
pending_messages_.push(message);
if (!stream_) {
info(
"[Reporter] No active stream, inserted message into pending message "
"queue. "
"pending message size: {}",
pending_messages_.size());
return;
}
stream_->sendMessage(message);
}
void GrpcAsyncSegmentReporterClient::startStream() {
resetStream();
stream_ = factory_->create(*this, cv_);
info("[Reporter] Stream {} had created.", fmt::ptr(stream_.get()));
}
void GrpcAsyncSegmentReporterClient::resetStream() {
if (stream_) {
info("[Reporter] Stream {} has destroyed.", fmt::ptr(stream_.get()));
stream_.reset();
}
}
GrpcAsyncSegmentReporterStream::GrpcAsyncSegmentReporterStream(
AsyncClient<TracerRequestType, TracerResponseType>& client,
std::condition_variable& cv, const std::string& token)
: client_(client), cv_(cv) {
if (!token.empty()) {
ctx_.AddMetadata(authenticationKey.data(), token);
}
// Ensure pending RPC will complete if connection to the server is not
// established first because of like server is not ready. This will queue
// pending RPCs and when connection has established, Connected tag will be
// sent to CompletionQueue.
ctx_.set_wait_for_ready(true);
request_writer_ = client_.stub().PrepareCall(
&ctx_, "/TraceSegmentReportService/collect", &client_.completionQueue());
request_writer_->StartCall(reinterpret_cast<void*>(&ready_));
}
void GrpcAsyncSegmentReporterStream::sendMessage(TracerRequestType message) {
// clearPendingMessage();
}
bool GrpcAsyncSegmentReporterStream::clearPendingMessage() {
if (state_ != StreamState::Idle || client_.pendingMessages().empty()) {
return false;
}
auto message = client_.pendingMessages().front();
if (!message.has_value()) {
return false;
}
request_writer_->Write(message.value(),
reinterpret_cast<void*>(&write_done_));
return true;
}
void GrpcAsyncSegmentReporterStream::onReady() {
info("[Reporter] Stream ready");
clearPendingMessage();
state_ = StreamState::Idle;
onIdle();
}
void GrpcAsyncSegmentReporterStream::onIdle() {
info("[Reporter] Stream idleing");
// Release pending messages which are inserted when stream is not ready
// to write.
if (!clearPendingMessage()) {
cv_.notify_all();
sleep(1);
onIdle();
}
}
void GrpcAsyncSegmentReporterStream::onWriteDone() {
info("[Reporter] Write finished");
// Dequeue message after sending message finished.
// With this, messages which failed to sent never lost even if connection
// was closed. because pending messages with messages which failed to send
// will drained and resend another stream.
client_.pendingMessages().pop();
state_ = StreamState::Idle;
onIdle();
}
AsyncStreamSharedPtr<TracerRequestType, TracerResponseType>
GrpcAsyncSegmentReporterStreamBuilder::create(
AsyncClient<TracerRequestType, TracerResponseType>& client,
std::condition_variable& cv) {
return std::make_shared<GrpcAsyncSegmentReporterStream>(client, cv, token_);
}
} // namespace cpp2sky
the problem will solved。 Please help to confirm whether this method is correct。 by the way I think “circular_buffer.h” have repeated submitted question。
@wbpcode Could you take a look?
cc @xlj Thanks for calling this out. Could you create a PR directly then we can review the code more easily? It's a little hard to check the actually issue by this way. Thanks again.
@wbpcode please help check the PR
@wbpcode
calling
it's ok,go ahead please.
What's the latest update on @wu-sheng @wbpcode ? I meet the same problem with last commit on main branch; when i pick the last commit of https://github.com/SkyAPM/cpp2sky/pull/118, the problem is resolved...
@jackxuqq Sorry. But I have no free bandwidth to create the patch. It's happy to see the #118 (huge thanks to @xlj) could resolve your problem. But we have no plan to merge it because reason that noticed in the #118 and it's closed. (Although I regret, tell the truth).
Hope I can get some free time in this dragon boat day.
@jackxuqq Sorry. But I have no free bandwidth to create the patch. It's happy to see the #118 (huge thanks to @xlj) could resolve your problem. But we have no plan to merge it because reason that noticed in the #118 and it's closed. (Although I regret, tell the truth).
Hope I can get some free time in this dragon boat day.
Can't wait!