zello-channel-api icon indicating copy to clipboard operation
zello-channel-api copied to clipboard

Turning the raw data into audio files

Open ghost opened this issue 6 years ago • 6 comments

I have written some python code to connect to a channel, stream, remove first 9 bytes from the data packets, condense them into one text file and save.

From there I am having trouble turning that raw data into and audio file using python. I have seen the opus encoding stuff but cannot figure out how to use it correctly, currently the file just plays a quick burst of static when played with pyaudio and when b64decode/encode is used before before playing.

ghost avatar Feb 25 '19 14:02 ghost

@JasonF2 incoming voice data contains of opus encoded packets with 9 byte headers. Once you strip off the header to detect message id and packet id, you can then decode the rest of the packet. These opus packets are not wrapped in ogg container and not base64 encoded, so you don't need to base64decode them

megamk avatar Mar 07 '19 16:03 megamk

Hi @JasonF2

I'm struggling same task. and I almost solved this using the following. I hope it is useful for you.

import asyncio
import base64
import json
import time

import aiohttp
import opuslib

refresh_token = None
ENDIAN = "big"


async def main():
    async with aiohttp.ClientSession() as session:
        async with session.ws_connect("wss://zello.io/ws") as ws:
            # login
            await ws.send_str(json.dumps({
                "command": "logon",
                "seq": 1,
                "auth_token": "developer token or production token. ref: https://github.com/zelloptt/zello-channel-api/blob/master/AUTH.md",
                "username": "sign up in zello app.",
                "password": "sign up in zello app.",
                "channel": "hogehoge"
            }))

            authorized = False
            channel_available = None
            async for msg in ws:
                print(msg)
                if msg.type == aiohttp.WSMsgType.TEXT:
                    res = json.loads(msg.data)
                    if "refresh_token" in res:
                        refresh_token = res["refresh_token"]
                        authorized = True
                    elif "command" in res and res["command"] == "on_channel_status":
                        channel_available = res["status"] == "online"
                    if authorized and channel_available:
                        print("auth and channel status are OK.")
                        break
                else:
                    print("unexpected message type[{}]".format(msg.type))

            if not authorized or not channel_available:
                print("auth error or channel status error.")
                return

            # audio streaming
            # ref: https://github.com/zelloptt/zello-channel-api/issues/34
            # ref: http://opus-codec.org/downloads/
            # sample audio is download from "https://opus-codec.org/examples/". convert wav to opus with opustools.
            sample_rate = 48000
            num_frames_per_packet = 1
            frame_size = 20
            codec_header = base64.b64encode(
                # sample_rate's endian is little endian.
                # https://github.com/zelloptt/zello-channel-api/blob/409378acd06257bcd07e3f89e4fbc885a0cc6663/sdks/js/src/classes/utils.js#L63
                sample_rate.to_bytes(2, "little")
                + num_frames_per_packet.to_bytes(1, ENDIAN)
                + frame_size.to_bytes(1, ENDIAN)).decode()
            await ws.send_str(json.dumps({
                "command": "start_stream",
                "seq": 2,
                "type": "audio",
                "codec": "opus",
                "codec_header": codec_header,
                "packet_duration": frame_size
            }))

            async for msg in ws:
                if msg.type == aiohttp.WSMsgType.TEXT:
                    data = json.loads(msg.data)
                    if data["success"]:
                        stream_id = data["stream_id"]
                        print("stream_id=", stream_id)
                        break
                else:
                    print("unexpected message type[{}]".format(msg.type))

            # send
            # https://github.com/zelloptt/zello-channel-api/blob/master/API.md#stream-data
            with open("data/speech_orig.opus", "rb") as f:
                next = f.read()
                total = 0
                # initial value of packet_id must be none zero...? part of audio streams are not arrive at zello app when packet_id starts 0.
                packet_id = 1088
                while next:
                    # TODO support segments that continue in next page.
                    for packet, next, size in iterate_ogg_segment(next):
                        # print(size, packet[:10])
                        header = (1).to_bytes(1, ENDIAN) + \
                            stream_id.to_bytes(4, ENDIAN) + \
                            packet_id.to_bytes(4, ENDIAN)
                        # print(header)
                        raw_data = header + packet
                        await ws.send_bytes(raw_data)
                        total += size
                        packet_id += 1
                        # time.sleep(0.02)

                print("send size=", total, " packet_id=", packet_id)

            # close
            await ws.send_str(json.dumps({
                "command": "stop_stream",
                "stream_id": stream_id
            }))

            async for msg in ws:
                if msg.type == aiohttp.WSMsgType.TEXT:
                    print("maybe stopped")
                    break
                else:
                    print("unexpected message type[{}]".format(msg.type))


def iterate_ogg_segment(ogg_page_head):
    # opus format
    # https://tools.ietf.org/html/rfc7845
    def _split(b, pos):
        return b[:pos], b[pos:]
    # Each page begins with the characters, "OggS"
    current, next = _split(ogg_page_head, 4)
    # print(f"Capture pattern= {current}")
    current, next = _split(next, 1)
    # print(f"Version = {current}")
    current, next = _split(next, 1)
    # print(f"Header type = {current}")
    current, next = _split(next, 8)
    # print(f"Granule position = {current}")
    current, next = _split(next, 4)
    # print(f"Bitstream serial number = {current.hex()}")
    current, next = _split(next, 4)
    # print(f"Page sequence number = {current.hex()}")
    current, next = _split(next, 4)
    # print(f"Checksum = {current.hex()}")
    current, next = _split(next, 1)
    # print(f"Page segments(number of bytes) = {current.hex()}")
    num_pages = int.from_bytes(current, "big")
    # segment sizes per page
    current, next = _split(next, num_pages)
    # page_contents = None
    segment_table_size = 0
    # print(current)
    for length in current:
        segment_table_size += length
        # TODO support segments that continue in next page. if length is 255 this segment must be concat next page.
        if length < 255:
            current, next = _split(next, segment_table_size)
            yield (current, next, segment_table_size)
            segment_table_size = 0
        # else:
        #     print(segment_table_size)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

shtamura avatar Jan 07 '20 08:01 shtamura

can you add code to save stream into ogg?

vinnitu avatar Mar 19 '20 09:03 vinnitu

I need the code as well, I have some trouble understanding how their receiving API Library works

ayush-pradhan-bit avatar Jun 08 '22 06:06 ayush-pradhan-bit