Detaching Socket from HTTPSession
Is there any safe way to detach folly::AsyncTransport from proxygen::HTTPSession without closing the socket?
I don't think there's currently a way to do this
Is there any possibility of implementing this in future?
What is it you are trying to do? Pass the underlying AsyncSocket off after an HTTP/1.x CONNECT?
Could you instead write an AsyncTransport implementation that uses HTTPTransaction/Handler to send/receive data? This has the benefit of working with HTTP/2 and 3 also.
After HTTP CONNECT, I am planning to detach the folly::AsyncTransport from the HTTPSession, and then just forward the data from both ends. And I have optimized that forwarding code similar to HTTPSession, where writes are sent at the end of each event base loop. And also control the amount of data that can be read over a single event base loop. I am stuck at detaching the AsyncTransport from HTTPSession.
Can you provide any example code for implementation of AsyncTransport that uses HTTPTransaction/Handler.
Here's something to get started:
class HTTPTransactionConnectTransport
: public folly::AsyncTransport
, public HTTPTransactionHandler {
public:
HTTPTransactionConnectTransport(folly::EventBase* eventBase)
: eventBase_(eventBase) {
}
~HTTPTransactionConnectTransport() override = default;
folly::EventBase* getEventBase() const override {
return eventBase_;
}
void setReadCB(ReadCallback* callback) override {
readCallback_ = callback;
}
ReadCallback* getReadCallback() const override {
return readCallback_;
}
void write(WriteCallback* callback,
const void* buf,
size_t bytes,
folly::WriteFlags flags = folly::WriteFlags::NONE) override {
writeChain(callback, folly::IOBuf::wrapBuffer(buf, bytes), flags);
}
void writev(WriteCallback* callback,
const iovec* vec,
size_t count,
folly::WriteFlags flags = folly::WriteFlags::NONE) override {
for (size_t i = 0; i < count; i++) {
write(callback, vec[i].iov_base, vec[i].iov_len, flags);
}
}
void writeChain(WriteCallback* callback,
std::unique_ptr<folly::IOBuf>&& buf,
folly::WriteFlags flags = folly::WriteFlags::NONE) override {
txn_->sendBody(std::move(buf));
}
void close() override {
shutdownWrite();
shutdownRead();
}
void closeNow() override {
shutdownWriteNow();
shutdownRead();
}
void shutdownWrite() override {
txn_->sendEOM();
}
void shutdownWriteNow() override {
txn_->sendEOM();
}
void shutdownRead() {
txn_->sendAbort(ErrorCode::NO_ERROR);
}
bool good() const override {
return readable() && writable();
}
bool readable() const override {
// TODO: should only return true when data is available (AsyncSocket calls
// poll)
return true;
}
bool writable() const override {
// TODO: false after shutdownWrite
return true;
}
bool connecting() const override {
// Don't hand off the transport until connected
return false;
}
bool error() const override {
return ingressError_ || connectStream_->egressError_;
}
void attachEventBase(folly::EventBase* /*eventBase*/) override {
XLOG(FATAL) << "Cannot change eventBase";
}
void detachEventBase() override {
XLOG(FATAL) << "Cannot change eventBase";
}
bool isDetachable() const override {
return false;
}
void setSendTimeout(uint32_t sendTimeoutMs) override {
// TODO
}
uint32_t getSendTimeout() const override {
// TODO
}
void getLocalAddress(folly::SocketAddress* address) const override {
txn_->getLocalAddress(*address);
}
void getPeerAddress(folly::SocketAddress* address) const override {
txn_->getPeerAddress(*address);
}
bool isEorTrackingEnabled() const override {
return false;
}
void setEorTracking(bool track) override {
if (track) {
XLOG(WARNING)
<< "Cannot enable EOR tracking with HTTPTransactionConnectTransport";
}
}
size_t getAppBytesWritten() const override {
return egressOffset_;
}
size_t getRawBytesWritten() const override {
return egressOffset_ /* TODO: + HTTP overhead */;
}
size_t getAppBytesReceived() const override {
return ingressOffset_;
}
size_t getRawBytesReceived() const override {
return ingressOffset_ /* TODO: + HTTP overhead */;
}
private:
void setTransaction(HTTPTransaction* txn) noexcept override {
txn_ = txn;
}
void detachTransaction() noexcept override {
txn_ = nullptr;
}
void onHeadersComplete(std::unique_ptr<HTTPMessage> msg) noexcept override {
LOG(FATAL)
<< "onHeadersComplete should be called before constructing transport";
}
void onBody(std::unique_ptr<folly::IOBuf> chain) noexcept override {
if (readCallback_) {
readCallback_->readBufferAvailable(std::move(chain));
}
}
void onTrailers(std::unique_ptr<HTTPHeaders> trailers) noexcept override {
// Trailers are not supported
}
void onEOM() noexcept override {
if (readCallback_) {
readCallback_->readEOF();
}
}
void onUpgrade(UpgradeProtocol protocol) noexcept override {
LOG(FATAL) << "onUpgrade should not be called";
}
void onError(const HTTPException& error) noexcept override {
if (readCallback_) {
readCallback_->readErr(AsyncSocketException(error.what()));
}
}
void onEgressPaused() noexcept override {
// Ignore this signal, writes in writeChain will get buffered in txn
// might be nice to implement AsyncTransport buffering callbacks
}
void onEgressResumed() noexcept override {
// Ignore this signal, writes in writeChain will get buffered in txn
}
// resets readCallback_ and returns the previous value
ReadCallback* resetReadCb() {
return std::exchange(readCallback_, nullptr);
if (readCallback_) {
txn_->resumeIngress();
} else {
txn_->pauseIngress();
}
}
folly::EventBase* eventBase_{nullptr};
ReadCallback* readCallback_{nullptr};
HTTPTransaction* txn_{nullptr};
};