sseclient
sseclient copied to clipboard
Idea: Is it possible to wait for a given/ hardcoded custom line ending "end_of_field" vs chunk_size
Hello a newbie question
Idea: Is it possible to wait for a given/ hardcoded custom line ending "end_of_field" vs chunk_size
for example this one
end_of_field = re.compile(r'\n\n\r\n\x27|\r\n\x27') # \n\n\r\n' or \r\n'
My Signify Hue Bridge does send varialbe lenght of event message data which end with the \n\n\r\n'
or with \r\n'
incase of long event message data then the Hue Bridge does split up in arround 4096bytes chunks followed \r\n'
and the end of any event message data is always indicated by the \n\n\r\n'
maybe chunk_size behaviour can be set to None to skip completely or set to chunk_size a max safeguard value like 10000
or for now i set chunk_size to 9 \n\n\r\n'
#end_of_field = re.compile(r'\r\n\r\n|\r\r|\n\n')
#end_of_field = re.compile(str('\x5C''n''\x5C''n''\x5C''r''\x5C''n''\x27')) # \n\n\r\n'
#end_of_field = re.compile(str('\x5C''r''\x5C''n')) # \r\n
#end_of_field = re.compile(str('$''\x5C''r''\x5C''n'), re.MULTILINE) # <MatchesEndOfString_$>\r\n
end_of_field = re.compile(str('$''\x5C''r''\x5C''n''\x27'), re.MULTILINE) # <MatchesEndOfString_$>\r\n'
https://pynative.com/python-regex-flags/
I just edit sseclient.py which i found at the appdata folder Python\Python38\site-packages did build and run my test code and seems working, i use chunk_size of 8192 and if i hardcoded set chunk_size to 1 as test at sseclient.py this is used and working therefore i know the change i made is used
https://docs.python.org/3/library/io.html#io.BufferedReader
https://stackoverflow.com/questions/57726771/what-the-difference-between-read-and-read1-in-python
if size is omitted or < 0, then the size of available buffer is used, So no read() call performed on the raw stream in this case. Hmmm i think i should set chunk_size to -1, seems to work
b'6\r\n: hi\n\n\r\n'
i assume the Hue Bridge does add b'6\r\n
and add the single quote character
at the event message data '
but # <MatchesEndOfString_$>\r\n or <MatchesEndOfString_$>\r\n'
seems to make no difference, i like the last regex, for my particular usage
As test purpose i stripped out the parse, for my purpose perspective the Signify Hue Bridge does send alread valid event message data therefore no need to parse, but i am newbie therefore i can overlook and also almost none knowdlege but try to intent to learn
end_of_field = re.compile(str('$''\x5C''r''\x5C''n''\x27'), re.MULTILINE) # <MatchesEndOfString_$>\r\n'
class SSEClient(object):
def __init__(self, url, last_id=None, retry=3000, session=None, chunk_size=1024, **kwargs):
self.url = url
self.last_id = last_id
self.retry = retry
self.chunk_size = chunk_size
# Optional support for passing in a requests.Session()
self.session = session
# Any extra kwargs will be fed into the requests.get call later.
self.requests_kwargs = kwargs
# The SSE spec requires making requests with Cache-Control: nocache
if 'headers' not in self.requests_kwargs:
self.requests_kwargs['headers'] = {}
self.requests_kwargs['headers']['Cache-Control'] = 'no-cache'
# The 'Accept' header is not required, but explicit > implicit
self.requests_kwargs['headers']['Accept'] = 'text/event-stream'
# Keep data here as it streams in
self.buf = ''
self._connect()
def _connect(self):
if self.last_id:
self.requests_kwargs['headers']['Last-Event-ID'] = self.last_id
# Use session if set. Otherwise fall back to requests module.
requester = self.session or requests
self.resp = requester.get(self.url, stream=True, **self.requests_kwargs)
self.resp_iterator = self.iter_content()
encoding = self.resp.encoding or self.resp.apparent_encoding
self.decoder = codecs.getincrementaldecoder(encoding)(errors='replace')
# TODO: Ensure we're handling redirects. Might also stick the 'origin'
# attribute on Events like the Javascript spec requires.
self.resp.raise_for_status()
def iter_content(self):
def generate():
while True:
if hasattr(self.resp.raw, '_fp') and \
hasattr(self.resp.raw._fp, 'fp') and \
hasattr(self.resp.raw._fp.fp, 'read1'):
chunk = self.resp.raw._fp.fp.read1(self.chunk_size)
else:
# _fp is not available, this means that we cannot use short
# reads and this will block until the full chunk size is
# actually read
chunk = self.resp.raw.read(self.chunk_size)
if not chunk:
break
yield chunk
return generate()
def _event_complete(self):
return re.search(end_of_field, self.buf) is not None
def __iter__(self):
return self
def __next__(self):
while not self._event_complete():
try:
next_chunk = next(self.resp_iterator)
if not next_chunk:
raise EOFError()
self.buf += self.decoder.decode(next_chunk)
except (StopIteration, requests.RequestException, EOFError, six.moves.http_client.IncompleteRead) as e:
print(e)
time.sleep(self.retry / 1000.0)
self._connect()
# The SSE spec only supports resuming from a whole message, so
# if we have half a message we should throw it out.
head, sep, tail = self.buf.rpartition('\n')
self.buf = head + sep
continue
# Split the complete event (up to the end_of_field) into event_string,
# and retain anything after the current complete event in self.buf
# for next time.
(event_string, self.buf) = re.split(end_of_field, self.buf, maxsplit=1)
msg = event_string
# If the server requests a specific retry delay, we need to honor it.
if msg.retry:
self.retry = msg.retry
# last_id should only be set if included in the message. It's not
# forgotten if a message omits it.
if msg.id:
self.last_id = msg.id
return msg
if six.PY2:
next = __next__
class Event(object):
sse_line_pattern = re.compile('(?P<name>[^:]*):?( ?(?P<value>.*))?')
def __init__(self, data='', event='message', id=None, retry=None):
assert isinstance(data, six.string_types), "Data must be text"
self.data = data
self.event = event
self.id = id
self.retry = retry
def __str__(self):
return self.data