msgpack-java
msgpack-java copied to clipboard
Supporting event-driven I/O in unpackXXX
This topic is an excerpt of the discussion in https://github.com/msgpack/msgpack-java/pull/82#discussion_r12501596
In event-driven I/O, we call unpackXXX after we receive new data from a network (or another resource). That indicates unpackXXX might be called even if insufficient amount of packed data is available.
Because of this, behavior of unpackXXX (e.g., unpackInt, unpackString, etc.) would be as follows:
- It returns the unpacked value and proceeds the cursor on success.
- If it fails due to lack of the incoming data, it throws an InsufficientDataException. The cursor does not move in this case, so that the user can call unpackXXX after sufficient amount of data becomes available.
Here is a summary of comment from @frsyuki: https://github.com/msgpack/msgpack-java/pull/82#discussion_r12675357
- The previous version v06 implements
getXXX(), which does not move the cursor, andadvance()to support repeatable read of unpackXXX if we do not calladvance(). - But this interface was not good because buffering mechanism must be implemented in every adapter of MessageBufferInput interface. And also, only the unpacker knows the size of incoming data, so it was difficult to implement efficient buffering in the MessageBufferInput.
@frsyuki 's idea is to cache the header and data length information of message packed data to allow calling unpackXXX multiple times without making complex the implementation of MessageBufferInput.
Question:
- The head and data length are at most 5 bytes. Now that we have MessageBuffer for caching the input data, what is the benefit of having variables to store these data?
- Reading head and data length are cheap operations.
- If the data is at the boundary of MessageBuffer, we eventually need to relocate the buffer contents or preserve two or more consecutive buffers.
Here is my idea to support event-driven IO efficiently:
public short unpackShort() throws IOException {
final byte b = getHeadByte();
if (Code.isFixInt(b)) {
this.head = READ_NEXT_HEAD_BYTE;
return (short) b;
}
switch(b) {
case Code.INT32:
int value = readInt(); // may throw InsufficientDataException (retryable)
// readInt succeeded. Don't have to be retryable any more
this.head = READ_NEXT_HEAD_BYTE;
// ...
break;
// more cases ...
}
}
private MessageBuffer extraBuffer = MessageBuffer.wrap(new byte[8]);
private int extraBufferFilledSize = 0;
private int readInt() throws IOException {
if (extraBufferFilledSize > 0) {
if (extraBufferFilledSize == 4) {
int value = extraBuffer.getInt(0); // extraBuffer needs at most 8 bytes for readInt()
extraBufferFilledSize = 0;
return value;
}
} else if (buffer.size() - position >= 4) {
// this is the fast-path which should be inlined
int value = buffer.getInt(position);
position += 4;
return value;
}
// fill extraBuffer from buffer...
if (buffer.size() <= position) {
// buffer is empty
requireNextBuffer(); // may throw InsufficientDataException or IOException
}
// pseudo method MessageBuffer.get(MessageBuffer, srcOffset, dstOffset, size)
// copies data from the receiver to the given buffer at most size bytes.
int n = buffer.get(extraBufferFilledSize, position, extraBufferFilledSize, 4 - extraBufferFilledSize);
extraBufferFilledSize += n;
return readInt(); // tail call will be optimized by compiler
}
To make unpackShort repeatable, Unpacker needs 1 (head) + 8 (extraBuffer) bytes buffer (as commented in the code). This buffer can be one MessageBuffer as you suggest (9-byte MessageBuffer). 9-byte buffer (actually 13-byte because it needs to store size as well later) also works but byte + MessageBuffer is easier to implement, I guess.
We can optimize readInt as following by the way:
// this method should be inlined by JIT
private int readInt() throws IOException {
// requireBuffer() guarantees that buffer.size() == position if extraBufferFilledSize > 0.
// requireBuffer() guarantees that buffer can't be null (use private static final EMPTY_BUFFER = MessageBuffer.wrap(new byte[0])) so that here doesn't have to check buffer != null.
if (buffer.size() - position >= 4) {
int value = buffer.getInt(position);
position += 4;
return value;
} else {
return readIntOverBufferBoundary();
}
}
// this method including tail call doesn't have to be inlined because this is rarely called.
private int readIntOverBufferBoundary() throws IOException {
if (extraBufferFilledSize == 4) {
int value = extraBuffer.getInt(0);
extraBufferFilledSize = 0;
return value;
}
// fill extraBuffer from buffer...
if (buffer.size() <= position) {
// buffer is empty
requireNextBuffer(); // may throw InsufficientDataException or IOException
}
//// optional: possibly improve performance
//if(extraBufferFilledSize == 0 && buffer.size() >= 4) {
// int v = buffer.getInt(position);
// position += 4;
// return v;
//}
// pseudo method MessageBuffer.get(MessageBuffer, srcOffset, dstOffset, size)
// copies data to the given buffer at most given size bytes.
int n = buffer.get(extraBufferFilledSize, position, extraBufferFilledSize, 4 - extraBufferFilledSize);
position += n;
extraBufferFilledSize += n;
return readIntOverBufferBoundary(); // tail call
}
Another example is unpackArrayHeader:
public int unpackArrayHeader() throws IOException {
final byte b = getHeadByte();
if (Code.isFixedArray(b)) {
this.head = READ_NEXT_HEAD_BYTE;
return b & 0x0f;
}
switch (b) {
case Code.ARRAY32:
int u32 = readInt(); // may throw InsufficientDataException
// readInt succeeded. Don't have to be retryable any more
this.head = READ_NEXT_HEAD_BYTE;
if (u32 < 0) {
throw overflowU32Size(u32); // throws MessageFormatException (not retryable) but never throws InsufficientDataException (retryable)
}
return u32;
// ...
}
}
Unpacker doesn't need to store size to make unpackArrayHeader, unapckStringHeader or unpackBinaryHeader repeateble. size is necessary to make unpackExtendedTypeHeader, unpackString, and unapckRawString repeatable. Here is an example implementation of unpackExtendedTypeHeader:
public ExtendedTypeHeader unpackExtendedTypeHeader() throws IOException {
final byte b = getHeadByte();
switch (b) {
case Code.EXT32:
int extSize = getNextSize32(); // may throw InsufficientDataException (retryable)
byte extType = readByte(); // may throw InsufficientDataException (retryable)
// readByte succeeded. Don't have to be retryable any more
this.head = READ_NEXT_HEAD_BYTE;
this.size = READ_NEXT_SIZE;
return new ExtendedTypeHeader(extType, extSize);
// ...
}
}
private int getNextSize32() throws IOException {
if(size != READ_NEXT_SIZE) {
return size;
}
int u32 = readInt(); // may throw InsufficientDataException (retryable)
if(u32 < 0) {
throw overflowU32Size(u32);
}
this.size = u32;
}
unpackRawString also needs size but more tricky than unpackExtendedTypeHeader because it needs to read variable-length data. There're 2 possible implementations as I suggested before:
-
- allocate a new buffer to read the variable-length data. Unpacker can pre-allocate 8-byte (or 13-byte)
extraBufferbecauseunpackRawStringdoesn't useextraBuffer.
- allocate a new buffer to read the variable-length data. Unpacker can pre-allocate 8-byte (or 13-byte)
-
- use
extraBufferto read the variable-length data. Allocation, expansion, ownership management ofextraBuffercould be complicated.
- use
Here is an example implementation of idea 1):
private RawString readingRawString = null; // RawString might be MessageBuffer or byte[]
private int readingRawStringSize;
public RawString unpackRawString() throws IOException {
final byte b = getHeadByte();
switch (b) {
case Code.STR32:
int strSize = getNextSize32(); // may throw InsufficientDataException (retryable)
if(readingRawString == null) {
// Perhaps users might use a custom MessageBuffer allocator here.
// Then unpackRawString() will be unpackRawString(BufferAllocator)
readingRawString = MessageBuffer.allocate(size);
readingRawStringSize = 0;
}
String value = readRawStringCopy(strSize);
// readRawStringCopy succeeded. Don't have to be retryable any more
this.head = READ_NEXT_HEAD_BYTE;
this.size = READ_NEXT_SIZE;
return value;
// ...
}
}
private RawString readRawStringCopy(int size) throws IOException {
int remaining = size - readingRawStringSize;
// pseudo method readPayloadAtMost(size, RawString) reads at most given size bytes from
// underlaying buffer or input stream to RawString.
int readBytes = readPayloadAtMost(remaining, readingRawString);
if(readBytes >= remaining) {
// completed
// This is the fast-path. We might be able to create a optimized method like
// readInt() + readIntOverBufferBoundary().
RawString v = readingRawString;
readingRawString = null;
return v;
}
readingRawStringSize += readBytes;
// now buffer is empty
requireNextBuffer(); // may throw InsufficientDataException (retryable)
return readRawStringCopy(size); // tail call
}
Zero-copy (unpackRawStringReference) will be like this:
public RawStringReference unpackRawStringReference() throws IOException {
final byte b = getHeadByte();
switch (b) {
case Code.STR32:
int strSize = getNextSize32(); // may throw InsufficientDataException (retryable)
if(readingRawString == null && buffer.size() - position >= strSize) {
RawStringReference v = new RawStringReference(buffer.slice(position, strSize));
position += strSize;
return v;
}
return unpackRawStringCopy(); // this is valid because getHeadByte and getNextSize32 are repeatable
// ...
}
}
unpackString can have different buffering mechanism:
public String unpackString() throws IOException {
final byte b = getHeadByte();
switch (b) {
case Code.STR32:
int strSize = getNextSize32(); // may throw InsufficientDataException (retryable)
if(decodingStringBuffer == null) {
decodingStringBuffer = new StringBuilder();
decodingStringByteSize = 0;
}
String value = readString(strSize);
// readString succeeded. Don't have to be retryable any more
this.head = READ_NEXT_HEAD_BYTE;
this.size = READ_NEXT_SIZE;
return value;
// ...
}
}
private StringBuilder decodingStringBuffer = null;
private int decodingStringByteSize;
private String readString(int size) throws IOException {
int remaining = size - decodingStringByteSize;
// pseudo method readPayloadAtMostAndDecodeUtf8(MessageBuffer, size, StringBuilder) reads at most given size bytes from
// MessageBuffer and decode it to StringBuilder.
int decodedBytes = readPayloadAtMostAndDecodeUtf8(buffer, remaining, decodingStringBuffer);
if(decodedBytes >= remaining) {
// decoding completed
String v = decodingStringBuffer.toString();
decodingStringBuffer = null;
return v;
}
decodingStringByteSize += decodedBytes;
// now buffer is empty
requireNextBuffer(); // may throw InsufficientDataException (retryable)
return readString(size); // tail call
}
If Unpacker supports unpackExtendedType (reads ExtendedTypeHeader and payload), Unpacker needs to store type as well as head and size.
My concept is:
- valid:
getHeadByte()->readInt() - valid:
getHeadByte()->getNextSize32()->readLong - invalid:
readInt()->getHeadByte()(readInt is not repeatable)
Generally, getXxx 0 or more times -> readXxx is valid. In other words, unpackXxx methods call readXxx at the end. So we can create following methods (like: readIntAndResetHeadByte():
int readIntAndResetHeadByte() {
int v = readInt();
headByte = READ_NEXT_HEAD_BYTE;
return v;
}
Do you have a plan to release this feature?
I'm developing a library that consumes msgpack streams(Fluentd's forward requests) but I have noticed that ChannelBufferInput doesn't work well on non-blocking IO.
I want stream deserializer too much……