json-stream icon indicating copy to clipboard operation
json-stream copied to clipboard

Read multiple JSON objects in a stream

Open ygoe opened this issue 2 years ago • 9 comments

It's not mentioned in the documentation, so I'm wondering if this is already possible. I need a streaming JSON parser that can give me one JSON object after the other. No need for streaming each object but I'm going to receive multiple JSON objects over a byte stream (TCP or similar). The problem is that these are byte streams not message streams. I can parse a JSON object by whatever means but first need to separate them from the stream. And one object (or more) might be read completely while the next object was only read partially. Can this library already do this?

The intended purpose is for JSON-RPC over TCP. I couldn't find any Python library (with a free license) for that. Actually JSON-RPC is very simple. It's the message splitting that's hard. It can be done with separate protocol overhead (like WebSockets does it) or by reading complete JSON objects. (I could do that myself, too, but wanted to see if there's a ready solution.)

ygoe avatar Dec 05 '22 14:12 ygoe

I think this is possible:

from time import sleep

from json_stream.tokenizer import tokenize

import json_stream


def input_stream():
    # simulate JSON-RPC messages
    yield '{"bob": 1, "bobby": 4}'.encode()
    sleep(5)
    yield '{"bobo": 3}'.encode()


def json_documents(f):
    # process each new JSON document in the stream and yield
    # a json_stream parser
    try:
        while True:
            yield json_stream.load(f, tokenizer=tokenize)
    except StopIteration:
        pass


f = input_stream()
for document in json_documents(f):
    # once for each new JSON-RPC message
    print("got new message")
    for k, v in document.items():
        # process message
        print(f"{k} = {v}")
    print("end of message")

Note: due to an issue with the rust tokenizer, this code uses the non-default pure-python tokenizer implementation.

daggaz avatar Dec 05 '22 16:12 daggaz

There is one tricky moment - JSONs may be fragmented in different ways.

Example:

"{'alpha': 'bra"
"vo'}{'chadlie': 'delta'}"

Does suggested method works In this case?

jorektheglitch avatar Mar 20 '23 17:03 jorektheglitch

@jorektheglitch I'm not 100% clear what you're saying, that just looks like malformed JSON?

daggaz avatar Mar 23 '23 18:03 daggaz

@daggaz, I meant situation in with JSON readed chunk-by-chunk. There is absolutely no guarantee that JSONs will be read from start to end and nothing else. In example I just show two possible chunks readed from stream.

jorektheglitch avatar Mar 28 '23 18:03 jorektheglitch

I've meanwhile lost interest in this but did the same in C# as the application moved to that language.

  • What happens if you receive an incomplete chunk? Nothing, keep it for later.
  • What happens if you receive a continuation of that? Consider the previous chunks and this one.
  • What happens if a single chunk contains multiple objects? Return them all. And keep the leftovers for when more arrives.

I implemented this with a preparser. It cannot deserialise JSON objects into anything, it can just track the syntax and tell me if and where an object is complete so I can extract that part of the data and pass it to a real deserialiser. It then continues with what comes afterwards.

ygoe avatar Apr 02 '23 20:04 ygoe

I think this is possible:

This specific example is working for me, but I'm having a difficult time making it work for an httpx stream. Feel like I'm missing something obvious, but any pointers appreciated. I swapped to the httpx.load function.

def json_documents(f):
    # process each new JSON document in the stream and yield
    # a json_stream parser
    try:
        while True:
            yield json_stream.httpx.load(f, tokenizer=tokenize)
    except StopIteration:
        pass

with self.client.stream("POST", url, json=data) as resp:
    for document in json_documents(resp):
        # once for each new JSON-RPC message
        print("got new message")
        for k, v in document.items():
            # process message
            print(f"{k} = {v}")
        print("end of message")

This will get me the first object returned, but the next is throwing httpx.StreamConsumed.

chrishas35 avatar Sep 28 '23 18:09 chrishas35

Classic situation of needing to ask the question to be able to figure out the answer (I think?)

def json_documents(f):
    # process each new JSON document in the stream and yield
    # a json_stream parser
    try:
        while True:
            yield json_stream.load(f, tokenizer=tokenize)
    except StopIteration:
        pass

with self.client.stream("POST", url, json=data) as resp:
    for document in json_documents(resp.iter_bytes()):
        # once for each new JSON-RPC message
        print("got new message")
        for k, v in document.items():
            # process message
            print(f"{k} = {v}")
        print("end of message")

chrishas35 avatar Sep 28 '23 18:09 chrishas35

@chrishas35 great that works for you, but I think it's only by accident!

We will be merging big changes to the backend tokenizer API to allow this to work all the time.

daggaz avatar Sep 28 '23 18:09 daggaz

I may not be the best programmer, but I get lucky some times! 🤣

chrishas35 avatar Sep 28 '23 18:09 chrishas35