proxygen icon indicating copy to clipboard operation
proxygen copied to clipboard

Detaching Socket from HTTPSession

Open SteveSelva opened this issue 1 year ago • 5 comments

Is there any safe way to detach folly::AsyncTransport from proxygen::HTTPSession without closing the socket?

SteveSelva avatar Nov 06 '24 05:11 SteveSelva

I don't think there's currently a way to do this

hanidamlaj avatar Nov 06 '24 17:11 hanidamlaj

Is there any possibility of implementing this in future?

SteveSelva avatar Nov 07 '24 04:11 SteveSelva

What is it you are trying to do? Pass the underlying AsyncSocket off after an HTTP/1.x CONNECT?

Could you instead write an AsyncTransport implementation that uses HTTPTransaction/Handler to send/receive data? This has the benefit of working with HTTP/2 and 3 also.

afrind avatar Nov 22 '24 22:11 afrind

After HTTP CONNECT, I am planning to detach the folly::AsyncTransport from the HTTPSession, and then just forward the data from both ends. And I have optimized that forwarding code similar to HTTPSession, where writes are sent at the end of each event base loop. And also control the amount of data that can be read over a single event base loop. I am stuck at detaching the AsyncTransport from HTTPSession.

Can you provide any example code for implementation of AsyncTransport that uses HTTPTransaction/Handler.

SteveSelva avatar Nov 27 '24 05:11 SteveSelva

Here's something to get started:

class HTTPTransactionConnectTransport
    : public folly::AsyncTransport
    , public HTTPTransactionHandler {

 public:
  HTTPTransactionConnectTransport(folly::EventBase* eventBase)
      : eventBase_(eventBase) {
  }

  ~HTTPTransactionConnectTransport() override = default;

  folly::EventBase* getEventBase() const override {
    return eventBase_;
  }

  void setReadCB(ReadCallback* callback) override {
    readCallback_ = callback;
  }

  ReadCallback* getReadCallback() const override {
    return readCallback_;
  }
  void write(WriteCallback* callback,
             const void* buf,
             size_t bytes,
             folly::WriteFlags flags = folly::WriteFlags::NONE) override {
    writeChain(callback, folly::IOBuf::wrapBuffer(buf, bytes), flags);
  }
  void writev(WriteCallback* callback,
              const iovec* vec,
              size_t count,
              folly::WriteFlags flags = folly::WriteFlags::NONE) override {
    for (size_t i = 0; i < count; i++) {
      write(callback, vec[i].iov_base, vec[i].iov_len, flags);
    }
  }
  void writeChain(WriteCallback* callback,
                  std::unique_ptr<folly::IOBuf>&& buf,
                  folly::WriteFlags flags = folly::WriteFlags::NONE) override {
    txn_->sendBody(std::move(buf));
  }

  void close() override {
    shutdownWrite();
    shutdownRead();
  }

  void closeNow() override {
    shutdownWriteNow();
    shutdownRead();
  }
  void shutdownWrite() override {
    txn_->sendEOM();
  }

  void shutdownWriteNow() override {
    txn_->sendEOM();
  }

  void shutdownRead() {
    txn_->sendAbort(ErrorCode::NO_ERROR);
  }

  bool good() const override {
    return readable() && writable();
  }
  bool readable() const override {
    // TODO: should only return true when data is available (AsyncSocket calls
    // poll)
    return true;
  }
  bool writable() const override {
    // TODO: false after shutdownWrite
    return true;
  }
  bool connecting() const override {
    // Don't hand off the transport until connected
    return false;
  }
  bool error() const override {
    return ingressError_ || connectStream_->egressError_;
  }
  void attachEventBase(folly::EventBase* /*eventBase*/) override {
    XLOG(FATAL) << "Cannot change eventBase";
  }
  void detachEventBase() override {
    XLOG(FATAL) << "Cannot change eventBase";
  }
  bool isDetachable() const override {
    return false;
  }
  void setSendTimeout(uint32_t sendTimeoutMs) override {
    // TODO
  }
  uint32_t getSendTimeout() const override {
    // TODO
  }
  void getLocalAddress(folly::SocketAddress* address) const override {
    txn_->getLocalAddress(*address); 
  }
  void getPeerAddress(folly::SocketAddress* address) const override {
    txn_->getPeerAddress(*address);
  }

  bool isEorTrackingEnabled() const override {
    return false;
  }

  void setEorTracking(bool track) override {
    if (track) {
      XLOG(WARNING)
          << "Cannot enable EOR tracking with HTTPTransactionConnectTransport";
    }
  }

  size_t getAppBytesWritten() const override {
    return egressOffset_;
  }
  size_t getRawBytesWritten() const override {
    return egressOffset_ /* TODO: + HTTP overhead */;
  }
  size_t getAppBytesReceived() const override {
    return ingressOffset_;
  }
  size_t getRawBytesReceived() const override {
    return ingressOffset_ /* TODO: + HTTP overhead */;
  }

 private:
  void setTransaction(HTTPTransaction* txn) noexcept override {
    txn_ = txn;
  }

  void detachTransaction() noexcept override {
    txn_ = nullptr;
  }

  void onHeadersComplete(std::unique_ptr<HTTPMessage> msg) noexcept override {
    LOG(FATAL)
        << "onHeadersComplete should be called before constructing transport";
  }

  void onBody(std::unique_ptr<folly::IOBuf> chain) noexcept override {
    if (readCallback_) {
      readCallback_->readBufferAvailable(std::move(chain));
    }
  }

  void onTrailers(std::unique_ptr<HTTPHeaders> trailers) noexcept override {
    // Trailers are not supported
  }

  void onEOM() noexcept override {
    if (readCallback_) {
      readCallback_->readEOF();
    }
  }

  void onUpgrade(UpgradeProtocol protocol) noexcept override {
    LOG(FATAL) << "onUpgrade should not be called";
  }

  void onError(const HTTPException& error) noexcept override {
    if (readCallback_) {
      readCallback_->readErr(AsyncSocketException(error.what()));
    }
  }

  void onEgressPaused() noexcept override {
    // Ignore this signal, writes in writeChain will get buffered in txn
    // might be nice to implement AsyncTransport buffering callbacks
  }

  void onEgressResumed() noexcept override {
    // Ignore this signal, writes in writeChain will get buffered in txn
  }

  // resets readCallback_ and returns the previous value
  ReadCallback* resetReadCb() {
    return std::exchange(readCallback_, nullptr);
    if (readCallback_) {
      txn_->resumeIngress();
    } else {
      txn_->pauseIngress();
    }
  }

  folly::EventBase* eventBase_{nullptr};
  ReadCallback* readCallback_{nullptr};
  HTTPTransaction* txn_{nullptr};
};

afrind avatar Apr 14 '25 18:04 afrind