PyAV icon indicating copy to clipboard operation
PyAV copied to clipboard

How do I use Output Options for av containers and save chunks of stream

Open JDRanpariya opened this issue 1 year ago • 0 comments

Overview

I'm getting two RemoteStreamTracks of type audio and video from aiortc, I want to save chunks of 10 seconds from the stream without breaking the stream.

Expected behavior

I would like to pass output options where I have input as audio and video stream, I guess donig .mux of both types of frames will do it. I pass option like -g 150 in ffmpeg and it saves chunks by itself.

Actual behavior

I am saving the container with .close() as I have to start a new container and have difficult time implementing timestamps and audio/video chunk to save properly.

I would love any help with the scenario and on how can I code it.

import asyncio
import av
import time
from aiortc.mediastreams import MediaStreamError, MediaStreamTrack


class MediaRecorderContext:
    def __init__(self):
        self.started = False
        self.audio_stream = None
        self.video_stream = None
        self.task = None
        self.audio_task = None
        self.video_task = None
        self._last_chunk_saved = time.time()

    def set_audio_stream(self, audio_stream):
        self.audio_stream = audio_stream

    def set_video_stream(self, video_stream):
        self.video_stream = video_stream


class MediaRecorder:
    def __init__(self, prefix, format=None, options=None):
        self.prefix = prefix
        self.__container = av.open(file=f'{prefix}.mp4', format=format, mode="w", options=options)
        self.__tracks = {}
        self.__context = MediaRecorderContext()
        self.__duration = 5
        self.__is_saving = False

    def addTrack(self, track):
        if track.kind == "audio":
            codec_name = "aac"
            stream = self.__container.add_stream(codec_name)
            self.__context.set_audio_stream(stream)
            self.__tracks[track] = self.__context
        else:
            stream = self.__container.add_stream("libx264", rate=300)
            stream.pix_fmt = "yuv420p"
            self.__context.set_video_stream(stream)
            self.__tracks[track] = self.__context

    async def start(self):
        self.__context.task = asyncio.ensure_future(self.__run_track())
        self.__context._last_chunk_saved = time.time()

    async def __add_audio(self, track: MediaStreamTrack):
        while True:
            print('time inside audio is ', time.time() - self.__context._last_chunk_saved)
            while time.time() - self.__context._last_chunk_saved < self.__duration:
                try:
                    frame = await track.recv()
                    print(frame)
                except MediaStreamError:
                    return

                for packet in self.__context.audio_stream.encode(frame):
                    self.__container.mux(packet)

            # Save and reset at the end of the duration
            if self.__is_saving:
                print('hell yeah')
                return
            else:
                print('encoding None audio')
                for packet in self.__context.audio_stream.encode(None):
                    self.__container.mux(packet)
                print('waiting to save chunk, inside audio')
                await self.__save_chunk(caller='audio')

    async def __add_video(self, track: MediaStreamTrack):
        while True:
            print('time inside video is ', time.time() - self.__context._last_chunk_saved)
            while time.time() - self.__context._last_chunk_saved < self.__duration:
                try:
                    frame = await track.recv()
                    frame.pts = int(time.time() * 90000)
                    print(frame)
                except MediaStreamError:
                    return

                if not self.__context.started:
                    self.__context.video_stream.width = frame.width
                    self.__context.video_stream.height = frame.height
                    self.__context.started = True

                for packet in self.__context.video_stream.encode(frame):
                    self.__container.mux(packet)

            # Save and reset at the end of the duration
            if self.__is_saving:
                return
            else:
                self.__is_saving = True
                print('encoding None video')
                for packet in self.__context.video_stream.encode(None):
                    self.__container.mux(packet)
                print('waiting to save chunk, inside video')
                await self.__save_chunk(caller='video')

    async def __save_chunk(self, caller):

        print(f'called by {caller}')
        print('saving the chunk')
        self.__container.close()
        self.__container = av.open(file=f'{self.prefix}_{time.time()}.mp4', format='mp4', mode='w')

        # Reset streams for all tracks
        for track, context in self.__tracks.items():
            if track.kind == "audio":
                codec_name = "aac"
                stream = self.__container.add_stream(codec_name)
                self.__context.set_audio_stream(stream)
                self.__tracks[track] = self.__context
            else:
                stream = self.__container.add_stream("libx264", rate=300)
                stream.pix_fmt = "yuv420p"
                self.__context.set_video_stream(stream)
                self.__tracks[track] = self.__context
        print(f'new streams have been created for container as {self.__tracks.items()}')
        self.__is_saving = False
        self.__context._last_chunk_saved = time.time()

    async def __run_track(self):
        for track, _ in self.__tracks.items():
            if track.kind == 'audio':
                self.__context.audio_task = asyncio.ensure_future(self.__add_audio(track))
            else:
                self.__context.video_task = asyncio.ensure_future(self.__add_video(track))

    async def stop(self):
        if self.__container:
            self.__context.audio_task.cancel()
            self.__context.video_task.cancel()
            self.__context.task.cancel()
            self.__context.task = None
            self.__context.audio_task = None
            self.__context.video_task = None
            self.__context.audio_stream = None
            self.__context.video_stream = None
            self.__container.close()
            self.__container = None

I know I'm awaiting save chunks function twice but container needs to be closed only once. It's not working otherwise and saves only video after first chunk. So I'm awaiting in audio as well.

I have done the following:

JDRanpariya avatar Feb 08 '24 06:02 JDRanpariya