magic-wormhole icon indicating copy to clipboard operation
magic-wormhole copied to clipboard

dilation: add half-close support

Open warner opened this issue 5 years ago • 2 comments

The upcoming "dilation" feature offers multiple "subchannels" all multiplexed over a single durable+abstract connection. Each subchannel behaves like a regular Twisted TCP endpoint.

As I'm rewriting the file-transfer protocol to use these subchannels, I'm discovering that it would be really handy if they were half-closeable. In the original FTP protocol (RFC959, written 34 years ago) they'd use one TCP connection per file and just read until the socket was closed, instead of sending a length ahead of time and then fetching exactly that number of bytes. This is also how HTTP/1.0 worked, before Length: headers. It has the advantage of not needing to calculate the transfer size ahead of time (which is extra work when compression or aggregation is involved, and which required a new Encoding: chunked to work with HTTP/1.1).

For MW+Dilation transfers, I'm thinking we use one subchannel per file transfer. The first thing across the subchannel is a length-prefix and header, which tells the remote side:

  • the filename to store it under
  • the encoding to be used: single file or tarball of a directory, maybe some kind of compression
  • an estimated size, maybe pre-compression

Then the sender just streams the (maybe encoded) data, perhaps pausing when the receiver uses the producer/consumer API, and when they've sent the last byte, the sender half-closes the subchannel. The receiver observes the half-close and wraps things up, then sends back an ACK message with a hash of the received data. When the sender gets the ACK, they can check the hash to tell the user that the transfer succeeded.

When sending a directory, the estimated size could be a du -s of the disk. The receiver can count bytes written to disk (rather than bytes read from the subchannel, which might be compressed), and the progress bar will be mostly accurate. But the transfer will still work even if the estimated size is wrong; the only consequence will be that the progress meter will end too early or too late.

If we didn't have half-close, we'd need to be very precise about when the transfer is complete. For single files and no compression, we know ahead of time exactly how many bytes will be sent over the wire, so the header can specify that, and the receiver can hang up after receiving that many bytes. With compression, we'd either need to compress the file ahead of time to get the wire-size up front (expensive), or we'd use the uncompressed size from disk and have the receipient count the bytes coming out of the decompressor (and we'd have to flush the decompressor after every chunk, also expensive). With directories, we'd have the same dilemma.

The API for this should match the standard Twisted interfaces. The ability to half-close is activated by attaching a Protocol which implements (twisted.internet.interfaces.) IHalfCloseableProtocol to the subchannel. If you do that, then you are allowed to call self.transport.loseWriteConnection() instead of (or in addition to) the usual loseConnection(). And the protocol will be notified about p.readConnectionLost() and p.writeConnectionLost() instead of (or in addition to??) the usual p.connectionLost().

I'm still trying to figure out the instead-of vs in-addition-to questions. I'm looking at the original Twisted ticket for half-close (from 2003!) for hints, since the feature is somewhat underdocumented.

warner avatar Aug 12 '19 00:08 warner

Testing with Twisted-19.2.1 and a localhost TCP socket tells me:

  • a half-closeable protocol which doesn't write anything but does loseWriteConnection will get an immediate writeConnectionLost. Later, when the other side closes too, it gets a readConnectionLost. At no point does it get a plain connectionLost.
  • the same thing happens when it gets readConnectionLost first, and then later does loseWriteConnection
  • doing loseWriteConnection and then doing more writes doesn't seem to provoke an error, as far as I can tell the writes are just ignored
  • if a half-closeable protocol does a plain loseConnection, they get a connectionLost, but neither writeConnectionLost nor readConnectionLost
  • if the other side does a full loseConnection, we first see readConnectionLost, then if we keep writing, we'll get connectionLost after a few packets get back RST. At no point do we get writeConnectionLost
  • if you get readConnectionLost and then do a normal loseConnection, you get back connectionLost instead of writeConnectionLost

So it seems that writeConnectionLost only happens in response to you doing loseWriteConnection. And loseConnection will get you a connectionLost in both half-closeable and regular Protocols.

I'm not sure how much of this we should emulate. It feels like protocols should either be half-closeable (and only use loseWriteConnection and only expect writeConnectionLost/readConnectionLost, and never expect connectionLost), or non-half-closeable (and only use loseConnection, and only expect connectionLost). But at least two case appears to violate this rule.

For now, I'm going to proceed as if that ideal is maintained, except I'm also going to call connectionLost after the last of both readConnectionLost and writeConnectionLost have been called. I'll put some asserts in place to catch cases where one type of protocol calls the wrong methods. Hopefully we can find some more docs on IHalfCloseableProtocol in the future.

warner avatar Aug 12 '19 01:08 warner

For reference, here's the testing tool:

from __future__ import print_function, unicode_literals
import sys
from twisted.python import usage
from twisted.internet.endpoints import clientFromString, serverFromString
from twisted.internet.defer import Deferred
from twisted.internet.task import react, LoopingCall
from twisted.internet.protocol import Protocol, Factory
from twisted.internet.interfaces import IHalfCloseableProtocol
from zope.interface import implementer

from twisted.python import log
log.startLogging(sys.stderr)

class Options(usage.Options):
    def parseArgs(self, mode, *args):
        if mode not in ["tx", "rx"]:
            raise usage.UsageError("mode must be 'tx' or 'rx', not '%s'" % mode)
        self.mode = mode
        self.args = args

@implementer(IHalfCloseableProtocol)
class ReceiveProtocol(Protocol):
    def __init__(self):
        super(Protocol, self).__init__()
        self._rx = 0

    def connectionMade(self):
        print("RP.connectionMade")

    def dataReceived(self, data):
        old = self._rx
        self._rx += len(data)
        print("RP.dataReceived %d+%d=%d" %(old, len(data), self._rx))
        if self._rx == 2:
            print("RP.loseWriteConnection")
            self.transport.loseWriteConnection()
            #print("RP.loseConnection")
            #self.transport.loseConnection()

    def readConnectionLost(self):
        print("RP.readConnectionLost")

    def writeConnectionLost(self):
        print("RP.writeConnectionLost")

    def connectionLost(self, why=None):
        print("RP.connectionLost")

@implementer(IHalfCloseableProtocol)
class SendProtocol(Protocol):
    def __init__(self):
        super(Protocol, self).__init__()
        self._rx = 0
        self._tx = 0

    def connectionMade(self):
        print("SP.connectionMade")
        self.lc = LoopingCall(self.do_write)
        self.lc.start(1.0, False)

    def do_write(self):
        print("SP.write %s+1=%s" % (self._tx, self._tx+1))
        self._tx += 1
        self.transport.write(b"a")
        if self._tx == 6:
            print("SP.loseWriteConnection")
            self.transport.loseWriteConnection()
            #print("SP.loseConnection")
            #self.transport.loseConnection()

    def dataReceived(self, data):
        old = self._rx
        self._rx += len(data)
        print("SP.dataReceived %d+%d=%d" %(old, len(data), self._rx))

    def readConnectionLost(self):
        print("SP.readConnectionLost")

    def writeConnectionLost(self):
        print("SP.writeConnectionLost")

    def connectionLost(self, why=None):
        print("SP.connectionLost")

class InstanceFactory(Factory):
    def __init__(self, instance):
        super(Factory, self).__init__()
        self.instance = instance
    def buildProtocol(self, addr):
        return self.instance

def open(reactor, options):
    if options.mode == "tx":
        ep = clientFromString(reactor, "tcp:localhost:6777")
        ep.connect(InstanceFactory(SendProtocol()))
    else:
        ep = serverFromString(reactor, "tcp:6777")
        ep.listen(InstanceFactory(ReceiveProtocol()))
    return Deferred()

def run():
    options = Options()
    options.parseOptions()
    return react(open, (options,))

run()

warner avatar Aug 12 '19 01:08 warner