cpp2sky icon indicating copy to clipboard operation
cpp2sky copied to clipboard

E1210 05:57:53.019580400 7193 proto_buffer_writer.h:65] assertion failed: !byte_buffer->Valid()

Open xlj opened this issue 2 years ago • 8 comments

I have a new problem. this problem occur in my test demo. demo has only one thread. this is my demo code:

#include
#include

#include "cpp2sky/propagation.h"
#include "cpp2sky/tracer.h"
#include "cpp2sky/tracing_context.h"
#include "cpp2sky/well_known_names.h"
using namespace cpp2sky;
TracerConfig config;

void DoTest(int num){
std::fprintf(stdout, "start\n");
config.set_instance_name("node_0");
config.set_service_name("mesh");
config.set_address("0.0.0.0:11800");
TracerPtr _Kcbp_tracer = createInsecureGrpcTracer(config);
std::fprintf(stdout, "start1\n");
while (num-- > 0) {
std::fprintf(stdout, "num:%d\n", num);
auto tracing_context = _Kcbp_tracer->newContext();
{
std::shared_ptr spExitAf = std::make_shared(tracing_context, spSpan->get(), "KCBP_SKYNAME_BpcallAfter");

        auto span = tracing_context->createEntrySpan();
        span->startSpan("/grpc_client.Greeter/handle");
        if (tracing_context != nullptr) {
            span->endSpan();
        }
    }
    bool bRet = _Kcbp_tracer->report(std::move(tracing_context));
}

std::fprintf(stdout, "end\n");
}

int main() {
std::fprintf(stdout, "main\n");
DoTest(3000);
return 0;
}

the cpp2sky branch is main. gcc 4.9.0

xlj avatar Dec 10 '22 05:12 xlj

@wu-sheng I think I know where is the problem, Incorrect use of the grpc asynclient, modify the “grpc_async_client_impl.cc” like this

// Copyright 2020 SkyAPM

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at

//     http://www.apache.org/licenses/LICENSE-2.0

// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "grpc_async_client_impl.h"

#include <chrono>
#include <thread>

#include "absl/strings/string_view.h"
#include "cpp2sky/exception.h"
#include "spdlog/spdlog.h"

namespace cpp2sky {

namespace {
static constexpr absl::string_view authenticationKey = "authentication";
}

using namespace spdlog;

GrpcAsyncSegmentReporterClient::GrpcAsyncSegmentReporterClient(
    const std::string& address, grpc::CompletionQueue& cq,
    ClientStreamingStreamBuilderPtr<TracerRequestType, TracerResponseType>
        factory,
    std::shared_ptr<grpc::ChannelCredentials> cred)
    : factory_(std::move(factory)),
      cq_(cq),
      stub_(grpc::CreateChannel(address, cred)) {
  startStream();
}

GrpcAsyncSegmentReporterClient::~GrpcAsyncSegmentReporterClient() {
  // It will wait until there is no drained messages with 5 second timeout.
  if (stream_) {
    std::unique_lock<std::mutex> lck(mux_);
    while (!pending_messages_.empty()) {
      cv_.wait_for(lck, std::chrono::seconds(5));
      pending_messages_.clear();
    }
  }

  resetStream();
}

void GrpcAsyncSegmentReporterClient::sendMessage(TracerRequestType message) {
  pending_messages_.push(message);

  if (!stream_) {
    info(
        "[Reporter] No active stream, inserted message into pending message "
        "queue. "
        "pending message size: {}",
        pending_messages_.size());
    return;
  }

  stream_->sendMessage(message);
}

void GrpcAsyncSegmentReporterClient::startStream() {
  resetStream();

  stream_ = factory_->create(*this, cv_);
  info("[Reporter] Stream {} had created.", fmt::ptr(stream_.get()));
}

void GrpcAsyncSegmentReporterClient::resetStream() {
  if (stream_) {
    info("[Reporter] Stream {} has destroyed.", fmt::ptr(stream_.get()));
    stream_.reset();
  }
}

GrpcAsyncSegmentReporterStream::GrpcAsyncSegmentReporterStream(
    AsyncClient<TracerRequestType, TracerResponseType>& client,
    std::condition_variable& cv, const std::string& token)
    : client_(client), cv_(cv) {
  if (!token.empty()) {
    ctx_.AddMetadata(authenticationKey.data(), token);
  }

  // Ensure pending RPC will complete if connection to the server is not
  // established first because of like server is not ready. This will queue
  // pending RPCs and when connection has established, Connected tag will be
  // sent to CompletionQueue.
  ctx_.set_wait_for_ready(true);

  request_writer_ = client_.stub().PrepareCall(
      &ctx_, "/TraceSegmentReportService/collect", &client_.completionQueue());
  request_writer_->StartCall(reinterpret_cast<void*>(&ready_));
}

void GrpcAsyncSegmentReporterStream::sendMessage(TracerRequestType message) {
//  clearPendingMessage();
}

bool GrpcAsyncSegmentReporterStream::clearPendingMessage() {
  if (state_ != StreamState::Idle || client_.pendingMessages().empty()) {
    return false;
  }
  auto message = client_.pendingMessages().front();
  if (!message.has_value()) {
    return false;
  }
  request_writer_->Write(message.value(),
                         reinterpret_cast<void*>(&write_done_));
  return true;
}

void GrpcAsyncSegmentReporterStream::onReady() {
  info("[Reporter] Stream ready");

  clearPendingMessage();
  state_ = StreamState::Idle;
  onIdle();
}

void GrpcAsyncSegmentReporterStream::onIdle() {
  info("[Reporter] Stream idleing");

  // Release pending messages which are inserted when stream is not ready
  // to write.
  if (!clearPendingMessage()) {
    cv_.notify_all();
     sleep(1);
      onIdle();
  }
}

void GrpcAsyncSegmentReporterStream::onWriteDone() {
  info("[Reporter] Write finished");

  // Dequeue message after sending message finished.
  // With this, messages which failed to sent never lost even if connection
  // was closed. because pending messages with messages which failed to send
  // will drained and resend another stream.
  client_.pendingMessages().pop();
  state_ = StreamState::Idle;

  onIdle();
}

AsyncStreamSharedPtr<TracerRequestType, TracerResponseType>
GrpcAsyncSegmentReporterStreamBuilder::create(
    AsyncClient<TracerRequestType, TracerResponseType>& client,
    std::condition_variable& cv) {
  return std::make_shared<GrpcAsyncSegmentReporterStream>(client, cv, token_);
}

}  // namespace cpp2sky

the problem will solved。 Please help to confirm whether this method is correct。 by the way I think “circular_buffer.h” have repeated submitted question。

xlj avatar Dec 23 '22 03:12 xlj

@wbpcode Could you take a look?

wu-sheng avatar Dec 23 '22 03:12 wu-sheng

cc @xlj Thanks for calling this out. Could you create a PR directly then we can review the code more easily? It's a little hard to check the actually issue by this way. Thanks again.

wbpcode avatar Dec 25 '22 08:12 wbpcode

@wbpcode please help check the PR

xlj avatar Dec 26 '22 06:12 xlj

@wbpcode

calling

it's ok,go ahead please.

xlj avatar Dec 28 '22 01:12 xlj

What's the latest update on @wu-sheng @wbpcode ? I meet the same problem with last commit on main branch; when i pick the last commit of https://github.com/SkyAPM/cpp2sky/pull/118, the problem is resolved...

jackxuqq avatar Jun 03 '24 07:06 jackxuqq

@jackxuqq Sorry. But I have no free bandwidth to create the patch. It's happy to see the #118 (huge thanks to @xlj) could resolve your problem. But we have no plan to merge it because reason that noticed in the #118 and it's closed. (Although I regret, tell the truth).

Hope I can get some free time in this dragon boat day.

wbpcode avatar Jun 03 '24 09:06 wbpcode

@jackxuqq Sorry. But I have no free bandwidth to create the patch. It's happy to see the #118 (huge thanks to @xlj) could resolve your problem. But we have no plan to merge it because reason that noticed in the #118 and it's closed. (Although I regret, tell the truth).

Hope I can get some free time in this dragon boat day.

Can't wait!

jackxuqq avatar Jun 04 '24 12:06 jackxuqq