zello-channel-api
zello-channel-api copied to clipboard
Turning the raw data into audio files
I have written some python code to connect to a channel, stream, remove first 9 bytes from the data packets, condense them into one text file and save.
From there I am having trouble turning that raw data into and audio file using python. I have seen the opus encoding stuff but cannot figure out how to use it correctly, currently the file just plays a quick burst of static when played with pyaudio and when b64decode/encode is used before before playing.
@JasonF2 incoming voice data contains of opus encoded packets with 9 byte headers. Once you strip off the header to detect message id and packet id, you can then decode the rest of the packet. These opus packets are not wrapped in ogg container and not base64 encoded, so you don't need to base64decode them
Hi @JasonF2
I'm struggling same task. and I almost solved this using the following. I hope it is useful for you.
import asyncio
import base64
import json
import time
import aiohttp
import opuslib
refresh_token = None
ENDIAN = "big"
async def main():
async with aiohttp.ClientSession() as session:
async with session.ws_connect("wss://zello.io/ws") as ws:
# login
await ws.send_str(json.dumps({
"command": "logon",
"seq": 1,
"auth_token": "developer token or production token. ref: https://github.com/zelloptt/zello-channel-api/blob/master/AUTH.md",
"username": "sign up in zello app.",
"password": "sign up in zello app.",
"channel": "hogehoge"
}))
authorized = False
channel_available = None
async for msg in ws:
print(msg)
if msg.type == aiohttp.WSMsgType.TEXT:
res = json.loads(msg.data)
if "refresh_token" in res:
refresh_token = res["refresh_token"]
authorized = True
elif "command" in res and res["command"] == "on_channel_status":
channel_available = res["status"] == "online"
if authorized and channel_available:
print("auth and channel status are OK.")
break
else:
print("unexpected message type[{}]".format(msg.type))
if not authorized or not channel_available:
print("auth error or channel status error.")
return
# audio streaming
# ref: https://github.com/zelloptt/zello-channel-api/issues/34
# ref: http://opus-codec.org/downloads/
# sample audio is download from "https://opus-codec.org/examples/". convert wav to opus with opustools.
sample_rate = 48000
num_frames_per_packet = 1
frame_size = 20
codec_header = base64.b64encode(
# sample_rate's endian is little endian.
# https://github.com/zelloptt/zello-channel-api/blob/409378acd06257bcd07e3f89e4fbc885a0cc6663/sdks/js/src/classes/utils.js#L63
sample_rate.to_bytes(2, "little")
+ num_frames_per_packet.to_bytes(1, ENDIAN)
+ frame_size.to_bytes(1, ENDIAN)).decode()
await ws.send_str(json.dumps({
"command": "start_stream",
"seq": 2,
"type": "audio",
"codec": "opus",
"codec_header": codec_header,
"packet_duration": frame_size
}))
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
data = json.loads(msg.data)
if data["success"]:
stream_id = data["stream_id"]
print("stream_id=", stream_id)
break
else:
print("unexpected message type[{}]".format(msg.type))
# send
# https://github.com/zelloptt/zello-channel-api/blob/master/API.md#stream-data
with open("data/speech_orig.opus", "rb") as f:
next = f.read()
total = 0
# initial value of packet_id must be none zero...? part of audio streams are not arrive at zello app when packet_id starts 0.
packet_id = 1088
while next:
# TODO support segments that continue in next page.
for packet, next, size in iterate_ogg_segment(next):
# print(size, packet[:10])
header = (1).to_bytes(1, ENDIAN) + \
stream_id.to_bytes(4, ENDIAN) + \
packet_id.to_bytes(4, ENDIAN)
# print(header)
raw_data = header + packet
await ws.send_bytes(raw_data)
total += size
packet_id += 1
# time.sleep(0.02)
print("send size=", total, " packet_id=", packet_id)
# close
await ws.send_str(json.dumps({
"command": "stop_stream",
"stream_id": stream_id
}))
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
print("maybe stopped")
break
else:
print("unexpected message type[{}]".format(msg.type))
def iterate_ogg_segment(ogg_page_head):
# opus format
# https://tools.ietf.org/html/rfc7845
def _split(b, pos):
return b[:pos], b[pos:]
# Each page begins with the characters, "OggS"
current, next = _split(ogg_page_head, 4)
# print(f"Capture pattern= {current}")
current, next = _split(next, 1)
# print(f"Version = {current}")
current, next = _split(next, 1)
# print(f"Header type = {current}")
current, next = _split(next, 8)
# print(f"Granule position = {current}")
current, next = _split(next, 4)
# print(f"Bitstream serial number = {current.hex()}")
current, next = _split(next, 4)
# print(f"Page sequence number = {current.hex()}")
current, next = _split(next, 4)
# print(f"Checksum = {current.hex()}")
current, next = _split(next, 1)
# print(f"Page segments(number of bytes) = {current.hex()}")
num_pages = int.from_bytes(current, "big")
# segment sizes per page
current, next = _split(next, num_pages)
# page_contents = None
segment_table_size = 0
# print(current)
for length in current:
segment_table_size += length
# TODO support segments that continue in next page. if length is 255 this segment must be concat next page.
if length < 255:
current, next = _split(next, segment_table_size)
yield (current, next, segment_table_size)
segment_table_size = 0
# else:
# print(segment_table_size)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
can you add code to save stream into ogg?
I need the code as well, I have some trouble understanding how their receiving API Library works