msgpack-java icon indicating copy to clipboard operation
msgpack-java copied to clipboard

Supporting event-driven I/O in unpackXXX

Open xerial opened this issue 11 years ago • 5 comments

This topic is an excerpt of the discussion in https://github.com/msgpack/msgpack-java/pull/82#discussion_r12501596

In event-driven I/O, we call unpackXXX after we receive new data from a network (or another resource). That indicates unpackXXX might be called even if insufficient amount of packed data is available.

Because of this, behavior of unpackXXX (e.g., unpackInt, unpackString, etc.) would be as follows:

  1. It returns the unpacked value and proceeds the cursor on success.
  2. If it fails due to lack of the incoming data, it throws an InsufficientDataException. The cursor does not move in this case, so that the user can call unpackXXX after sufficient amount of data becomes available.

xerial avatar May 15 '14 04:05 xerial

Here is a summary of comment from @frsyuki: https://github.com/msgpack/msgpack-java/pull/82#discussion_r12675357

  • The previous version v06 implements getXXX(), which does not move the cursor, and advance() to support repeatable read of unpackXXX if we do not call advance().
  • But this interface was not good because buffering mechanism must be implemented in every adapter of MessageBufferInput interface. And also, only the unpacker knows the size of incoming data, so it was difficult to implement efficient buffering in the MessageBufferInput.

xerial avatar May 15 '14 07:05 xerial

@frsyuki 's idea is to cache the header and data length information of message packed data to allow calling unpackXXX multiple times without making complex the implementation of MessageBufferInput.

Question:

  • The head and data length are at most 5 bytes. Now that we have MessageBuffer for caching the input data, what is the benefit of having variables to store these data?
    • Reading head and data length are cheap operations.
    • If the data is at the boundary of MessageBuffer, we eventually need to relocate the buffer contents or preserve two or more consecutive buffers.

xerial avatar May 15 '14 07:05 xerial

Here is my idea to support event-driven IO efficiently:

public short unpackShort() throws IOException {
    final byte b = getHeadByte();
    if (Code.isFixInt(b)) {
        this.head = READ_NEXT_HEAD_BYTE;
        return (short) b;
    }
    switch(b) {
    case Code.INT32:
        int value = readInt();  // may throw InsufficientDataException (retryable)
        // readInt succeeded. Don't have to be retryable any more
        this.head = READ_NEXT_HEAD_BYTE;
        // ...
        break;
    // more cases ...
    }
}

private MessageBuffer extraBuffer = MessageBuffer.wrap(new byte[8]);
private int extraBufferFilledSize = 0;

private int readInt() throws IOException {
    if (extraBufferFilledSize > 0) {
        if (extraBufferFilledSize == 4) {
            int value = extraBuffer.getInt(0);  // extraBuffer needs at most 8 bytes for readInt()
            extraBufferFilledSize = 0;
            return value;
        }
    } else if (buffer.size() - position >= 4) {
        // this is the fast-path which should be inlined
        int value = buffer.getInt(position);
        position += 4;
        return value;
    }

    // fill extraBuffer from buffer...
    if (buffer.size() <= position) {
        // buffer is empty
        requireNextBuffer();  // may throw InsufficientDataException or IOException
    }

    // pseudo method MessageBuffer.get(MessageBuffer, srcOffset, dstOffset, size)
    // copies data from the receiver to the given buffer at most size bytes.
    int n = buffer.get(extraBufferFilledSize, position, extraBufferFilledSize, 4 - extraBufferFilledSize);
    extraBufferFilledSize += n;

    return readInt();  // tail call will be optimized by compiler
}

To make unpackShort repeatable, Unpacker needs 1 (head) + 8 (extraBuffer) bytes buffer (as commented in the code). This buffer can be one MessageBuffer as you suggest (9-byte MessageBuffer). 9-byte buffer (actually 13-byte because it needs to store size as well later) also works but byte + MessageBuffer is easier to implement, I guess.

We can optimize readInt as following by the way:

// this method should be inlined by JIT
private int readInt() throws IOException {
    // requireBuffer() guarantees that buffer.size() == position if extraBufferFilledSize > 0.
    // requireBuffer() guarantees that buffer can't be null (use private static final EMPTY_BUFFER = MessageBuffer.wrap(new byte[0])) so that here doesn't have to check buffer != null.
    if (buffer.size() - position >= 4) {
        int value = buffer.getInt(position);
        position += 4;
        return value;
    } else {
        return readIntOverBufferBoundary();
    }
}

// this method including tail call doesn't have to be inlined because this is rarely called.
private int readIntOverBufferBoundary() throws IOException {
    if (extraBufferFilledSize == 4) {
        int value = extraBuffer.getInt(0);
        extraBufferFilledSize = 0;
        return value;
    }

    // fill extraBuffer from buffer...
    if (buffer.size() <= position) {
        // buffer is empty
        requireNextBuffer();  // may throw InsufficientDataException or IOException
    }

    //// optional: possibly improve performance
    //if(extraBufferFilledSize == 0 && buffer.size() >= 4) {
    //    int v = buffer.getInt(position);
    //    position += 4;
    //    return v;
    //}

    // pseudo method MessageBuffer.get(MessageBuffer, srcOffset, dstOffset, size)
    // copies data to the given buffer at most given size bytes.
    int n = buffer.get(extraBufferFilledSize, position, extraBufferFilledSize, 4 - extraBufferFilledSize);
    position += n;
    extraBufferFilledSize += n;

    return readIntOverBufferBoundary();  // tail call
}

Another example is unpackArrayHeader:

public int unpackArrayHeader() throws IOException {
    final byte b = getHeadByte();
    if (Code.isFixedArray(b)) {
        this.head = READ_NEXT_HEAD_BYTE;
        return b & 0x0f;
    }
    switch (b) {
    case Code.ARRAY32:
        int u32 = readInt();  // may throw InsufficientDataException
        // readInt succeeded. Don't have to be retryable any more
        this.head = READ_NEXT_HEAD_BYTE;
        if (u32 < 0) {
            throw overflowU32Size(u32);  // throws MessageFormatException (not retryable) but never throws InsufficientDataException (retryable)
        }
        return u32;
    // ...
    }
}

Unpacker doesn't need to store size to make unpackArrayHeader, unapckStringHeader or unpackBinaryHeader repeateble. size is necessary to make unpackExtendedTypeHeader, unpackString, and unapckRawString repeatable. Here is an example implementation of unpackExtendedTypeHeader:

public ExtendedTypeHeader unpackExtendedTypeHeader() throws IOException {
    final byte b = getHeadByte();
    switch (b) {
    case Code.EXT32:
        int extSize = getNextSize32();  // may throw InsufficientDataException (retryable)
        byte extType = readByte();  // may throw InsufficientDataException (retryable)
        // readByte succeeded. Don't have to be retryable any more
        this.head = READ_NEXT_HEAD_BYTE;
        this.size = READ_NEXT_SIZE;
        return new ExtendedTypeHeader(extType, extSize);
    // ...
    }
}

private int getNextSize32() throws IOException {
    if(size != READ_NEXT_SIZE) {
        return size;
    }

    int u32 = readInt();  // may throw InsufficientDataException (retryable)
    if(u32 < 0) {
        throw overflowU32Size(u32);
    }

    this.size = u32;
}

unpackRawString also needs size but more tricky than unpackExtendedTypeHeader because it needs to read variable-length data. There're 2 possible implementations as I suggested before:

    1. allocate a new buffer to read the variable-length data. Unpacker can pre-allocate 8-byte (or 13-byte) extraBuffer because unpackRawString doesn't use extraBuffer.
    1. use extraBuffer to read the variable-length data. Allocation, expansion, ownership management of extraBuffer could be complicated.

Here is an example implementation of idea 1):

private RawString readingRawString = null;  // RawString might be MessageBuffer or byte[]
private int readingRawStringSize;

public RawString unpackRawString() throws IOException {
    final byte b = getHeadByte();
    switch (b) {
    case Code.STR32:
        int strSize = getNextSize32();  // may throw InsufficientDataException (retryable)
        if(readingRawString == null) {
            // Perhaps users might use a custom MessageBuffer allocator here.
            // Then unpackRawString() will be unpackRawString(BufferAllocator)
            readingRawString = MessageBuffer.allocate(size);
            readingRawStringSize = 0;
        }
        String value = readRawStringCopy(strSize);
        // readRawStringCopy succeeded. Don't have to be retryable any more
        this.head = READ_NEXT_HEAD_BYTE;
        this.size = READ_NEXT_SIZE;
        return value;
    // ...
    }
}

private RawString readRawStringCopy(int size) throws IOException {
    int remaining = size - readingRawStringSize;

    // pseudo method readPayloadAtMost(size, RawString) reads at most given size bytes from
    // underlaying buffer or input stream to RawString.
    int readBytes = readPayloadAtMost(remaining, readingRawString);
    if(readBytes >= remaining) {
        // completed
        // This is the fast-path. We might be able to create a optimized method like
        // readInt() + readIntOverBufferBoundary().
        RawString v = readingRawString;
        readingRawString = null;
        return v;
    }
    readingRawStringSize += readBytes;

    // now buffer is empty

    requireNextBuffer();  // may throw InsufficientDataException (retryable)

    return readRawStringCopy(size);  // tail call
}

Zero-copy (unpackRawStringReference) will be like this:

public RawStringReference unpackRawStringReference() throws IOException {
    final byte b = getHeadByte();
    switch (b) {
    case Code.STR32:
        int strSize = getNextSize32();  // may throw InsufficientDataException (retryable)
        if(readingRawString == null && buffer.size() - position >= strSize) {
            RawStringReference v = new RawStringReference(buffer.slice(position, strSize));
            position += strSize;
            return v;
        }
        return unpackRawStringCopy();  // this is valid because getHeadByte and getNextSize32 are repeatable
    // ...
    }
}

unpackString can have different buffering mechanism:

public String unpackString() throws IOException {
    final byte b = getHeadByte();
    switch (b) {
    case Code.STR32:
        int strSize = getNextSize32();  // may throw InsufficientDataException (retryable)
        if(decodingStringBuffer == null) {
            decodingStringBuffer = new StringBuilder();
            decodingStringByteSize = 0;
        }
        String value = readString(strSize);
        // readString succeeded. Don't have to be retryable any more
        this.head = READ_NEXT_HEAD_BYTE;
        this.size = READ_NEXT_SIZE;
        return value;
    // ...
    }
}

private StringBuilder decodingStringBuffer = null;
private int decodingStringByteSize;

private String readString(int size) throws IOException {
    int remaining = size - decodingStringByteSize;

    // pseudo method readPayloadAtMostAndDecodeUtf8(MessageBuffer, size, StringBuilder) reads at most given size bytes from
    // MessageBuffer and decode it to StringBuilder.
    int decodedBytes = readPayloadAtMostAndDecodeUtf8(buffer, remaining, decodingStringBuffer);
    if(decodedBytes >= remaining) {
        // decoding completed
        String v = decodingStringBuffer.toString();
        decodingStringBuffer = null;
        return v;
    }
    decodingStringByteSize += decodedBytes;

    // now buffer is empty

    requireNextBuffer();  // may throw InsufficientDataException (retryable)

    return readString(size);  // tail call
}

If Unpacker supports unpackExtendedType (reads ExtendedTypeHeader and payload), Unpacker needs to store type as well as head and size.

frsyuki avatar May 15 '14 20:05 frsyuki

My concept is:

  • valid: getHeadByte() -> readInt()
  • valid: getHeadByte() -> getNextSize32() -> readLong
  • invalid: readInt() -> getHeadByte() (readInt is not repeatable)

Generally, getXxx 0 or more times -> readXxx is valid. In other words, unpackXxx methods call readXxx at the end. So we can create following methods (like: readIntAndResetHeadByte():

int readIntAndResetHeadByte() {
    int v = readInt();
    headByte = READ_NEXT_HEAD_BYTE;
    return v;
}

frsyuki avatar May 15 '14 20:05 frsyuki

Do you have a plan to release this feature?

I'm developing a library that consumes msgpack streams(Fluentd's forward requests) but I have noticed that ChannelBufferInput doesn't work well on non-blocking IO. I want stream deserializer too much……

okumin avatar Jul 30 '16 20:07 okumin