CommunityScripts icon indicating copy to clipboard operation
CommunityScripts copied to clipboard

Convert videos to x265 to reduce video size and save space

Open ghost opened this issue 1 year ago • 3 comments

This is a script that converts all videos to mkv to reduce the size. It could be modified to use the stash database instead.

import enlighten
import ffmpeg
import logging
import os
import pprint
import re
import shelve
import subprocess
import sys
import traceback
import typer
if sys.platform == 'win32':
    import wexpect as expect
    # patch windows console for scale correctly characters
    import ansicon
    ansicon.load()
else:
    import pexpect as expect

from pathlib import Path
from typing import Dict, List, Union

VIDEO_CODEC = "hevc"
AUDIO_CODEC = "aac"
CRF = 26
PRESET = "veryfast"
MAX_HEIGHT = 720
MAX_WIDTH = 1280
MAX_VIDEO_BITRATE = 2000000
MAX_AUDIO_BITRATE = 64000
MAX_FRAME_RATE = 30
pattern_duration = re.compile(
    'duration[ \t\r]?:[ \t\r]?(.+?),[ \t\r]?start', re.IGNORECASE)
pattern_progress = re.compile('time=(.+?)[ \t\r]?bitrate', re.IGNORECASE)
BAR_FMT = u'{desc}{desc_pad}{percentage:3.0f}%|{bar}| {count:{len_total}.1f}/{total:.1f} ' + \
          u'[{elapsed}<{eta}, {rate:.2f}{unit_pad}{unit}/s]'
COUNTER_FMT = u'{desc}{desc_pad}{count:.1f} {unit}{unit_pad}' + \
              u'[{elapsed}, {rate:.2f}{unit_pad}{unit}/s]{fill}'
CACHE_FILE = "cache.db"

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[
        logging.FileHandler("debug.log"),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)
app = typer.Typer()


@app.command()
def folder(
folder: str = typer.Argument(
    default='.',
    exists=True,
    file_okay=True,
    dir_okay=True,
    readable=True,
    resolve_path=True
)) -> None:
    """
    Reduce the size of all videos on the given directory
    """
    files = get_files_from(folder)
    manager = enlighten.get_manager()
    pbar = manager.counter(total=len(files), desc='Files', unit='files')
    cache = shelve.open(CACHE_FILE)
    for file in files:
        try:
            if file in cache:
                continue
            else:
                video = Video(file)
                if video.is_video():
                    video.convert_video()
                cache[file] = True
        except Exception:
            logger.error(f"Could not convert: {file}")
        pbar.update()
    cache.close()


@app.command()
def file(
path: Path = typer.Argument(
    default=None,
    exists=True,
    file_okay=True,
    dir_okay=False,
    readable=True,
    resolve_path=True
)) -> None:
    """
    Reduce the size of a single file
    """
    video = Video(path)
    if video.is_video():
        video.convert_video()


def get_files_from(folder: str) -> List[str]:
    """
    Walk directory recursively and get a list paths for the files
    """
    logger.info(f"get_files_from({folder})")
    res = []
    for path, _, files in os.walk(folder):
        for file in files:
            res.append(os.path.join(path, file))
    return res


px10bit = re.compile('10le$')
px12bit = re.compile('12le$')

class PixelFormat:
    __slots__ = ('_pix_fmt', '_is_10bit', '_is_12bit')
    def __init__(self, pix_fmt):
        self._pix_fmt = pix_fmt
        self._is_10bit = px10bit.search(pix_fmt) is not None
        self._is_12bit = px12bit.search(pix_fmt) is not None
    @property
    def pixel_format(self):
        return self._pix_fmt
    @property
    def is_10bit(self):
        return self._is_10bit
    @property
    def is_12bit(self):
        return self._is_12bit
    @property
    def is_8bit(self):
        return not(self._is_10bit or self._is_12bit)
    def __str__(self):
        return self._pix_fmt


class Video():
    def __init__(self, path: str) -> None:
        logger.info("Video.init()")
        self.path = path
        if not self.init_error_checking():
            return
        self.video_metadata = self.get_metadata("v")
        self.audio_metadata = self.get_metadata("a")
        self.dest = get_dest_path(Path(self.path))

    def init_error_checking(self) -> bool:
        logger.info("Video.init_error_checking()")
        try:
            with open(os.devnull, 'w') as devnull:
                subprocess.check_call(["ffprobe", "-h"], stdout=devnull, stderr=devnull)
        except FileNotFoundError:
            raise IOError('ffprobe not found.')

        if not os.path.isfile(self.path):
            logger.error('No such media file ' + str(self.path))
            return False
        return True

    def is_video(self) -> bool:
        try:
            res = (self.video_metadata['codec_type'] == 'video'
                   and int(self.video_metadata['nb_frames']) > 0)
            logger.info(f"Video.is_video() -> {res}")
            return res
        except Exception:
            return False

    def convert_video(self) -> None:
        if self.dest.exists():
            os.remove(str(self.dest))
        try:
            self.skip = True
            cmd = self.prepare_ffmpeg_command()
            if self.skip:
                logger.info(f"Skipped: {self.path}")
                print(f"Skipped: {self.path}")
                return
            logger.info(f"Processing: {self.path}")
            print(f"Processing: {self.path}")
            execute_ffmpeg(cmd, self.path)
            remove_larger_file(self.dest, Path(self.path)) 
        except ffmpeg._run.Error:
            print(f"ffmpeg could not process: {self.path}")
            traceback.print_exc()
            write_to_file("errors.txt", self.path)
        logger.info(f"Done processing: {self.path}")

    def prepare_ffmpeg_command(self) -> List[str]:
        cmd = ["ffmpeg", "-progress", "pipe:1", "-i", str(self.path),
                "-map", "0", "-map", "-v", "-map", "V"]
        self.push_encode_video_args_to_command(cmd)
        self.push_change_frame_rate_args_to_command(cmd)
        try:
            self.push_encode_audio_args_to_command(cmd)
        except Exception:
            logger.warning("No audio stream found")
        cmd.extend(["-y", str(self.dest)]) # 'y' arg overwrites output files without asking. 
        logger.info(f"Video.prepare_ffmpeg_command() -> {cmd}")
        return cmd

    def push_encode_video_args_to_command(self, cmd: List[str]) -> None:
        self.codec = get_codec(self.video_metadata)
        if self.codec != VIDEO_CODEC:
            self.skip = False

        cmd.append("-c:v")
        encoder = choose_encoder(self.codec)

        crf = str(CRF)
        if self.get_bitdepth().is_10bit:
            if encoder == 'libx265':
                cmd.extend(["libx265", "-x265-params", f"crf={crf}:profile=main10"])
            else:
                cmd.extend(["libaom-av1", "-cpu-used", "8", "-threads", "0", "-x265-params", f"crf={crf}:profile=main10"])
        else:
            if encoder == 'libx265':
                cmd.extend(["libx265", "-crf", crf])
            else:
                cmd.extend(["libaom-av1", "-cpu-used", "8", "-threads", "0", "-crf", crf])
        if ('bit_rate' in self.video_metadata
                and int(self.video_metadata['bit_rate']) > MAX_VIDEO_BITRATE):
            self.skip = False
            cmd.extend(["-b:v", str(MAX_VIDEO_BITRATE)])
        cmd.extend(["-maxrate", bitrate_to_string(MAX_VIDEO_BITRATE), "-preset", PRESET])
        if ('height' in self.video_metadata
                and 'width' in self.video_metadata):
            if (int(self.video_metadata['height']) < int(self.video_metadata['width'])
                and int(self.video_metadata['height']) > MAX_HEIGHT):
                cmd.extend(["-vf", f"scale=-2:{MAX_HEIGHT}"])
                self.skip = False
            elif (int(self.video_metadata['height']) > int(self.video_metadata['width'])
                and int(self.video_metadata['width']) > MAX_WIDTH):
                cmd.extend(["-vf", f"scale={MAX_WIDTH}:-2"])
                self.skip = False

    def push_change_frame_rate_args_to_command(self, cmd: List[str]) -> None:
        if ('r_frame_rate' in self.video_metadata
                and convert_str_to_float(self.video_metadata['r_frame_rate']) > MAX_FRAME_RATE):
            cmd.extend(["-r", str(MAX_FRAME_RATE)])
            self.skip = False

    def push_encode_audio_args_to_command(self, cmd: List[str]) -> None:
        if get_codec(self.audio_metadata) != AUDIO_CODEC:
            self.skip = False
        cmd.extend(["-c:a", AUDIO_CODEC])
        if int(self.audio_metadata["sample_rate"]) > MAX_AUDIO_BITRATE:
            self.skip = False
        cmd.extend(["-b:a", bitrate_to_string(MAX_AUDIO_BITRATE)])

    def get_metadata(self, stream: str) -> Union[Dict, None]:
        try:
            metadata = ffmpeg.probe(self.path, select_streams = stream)['streams'][0]
        except IndexError as e:
            logger.error(f"ffprobe failed for Video({self.path}).get_metadata({stream})")
            return None
        except ffmpeg.Error as e:
            logger.error('stdout:', e.stdout.decode('utf8'))
            logger.error('stderr:', e.stderr.decode('utf8'))
            return None
        except Exception as e:
            logger.exception("message")
            return None
        logger.info(f"Video({self.path}).get_metadata(stream='{stream}') ->\n{pprint.pformat(metadata)}")
        return metadata

    def get_bitdepth(self) -> PixelFormat:
        res = PixelFormat(self.video_metadata['pix_fmt'])
        logger.info(f"Video.get_bitdepth() -> {res}")
        return res

    def __repr__(self) -> str:
        return "<FFprobe: {metadata}, {video}, {audio}, {subtitle}, {attachment}>".format(**vars(self))


def write_to_file(file_path: str, data: str) -> None:
    with open(file_path, 'w') as f:
        f.write(data)


def get_dest_path(path: Path) -> Path:
    dest = str(path.parent / (path.stem + ".mkv"))
    i = 2
    while (os.path.exists(dest)):
        dest = str(path.parent / (path.stem + f"({i}).mkv"))
        i += 1
    logger.info(f"get_dest_path({str(path)}) -> {dest}")
    return Path(dest)


def remove_larger_file(new: Path, old: Path) -> None:
    logger.info(f"remove_larger_file({str(new)}, {str(old)})")
    if not (new.exists() and old.exists()):
        return
    if os.stat(str(new)).st_size <= os.stat(str(old)).st_size:
        os.remove(str(old))
    else:
        os.remove(str(new))


def execute_ffmpeg(cmd: List[str], path: str) -> None:
    logger.info("execute_ffmpeg()")
    try:
        subprocess.check_output(cmd)
    except Exception:
        logger.error(f"Could not process: {path}")


def get_codec(metadata: Dict) -> str:
    codec = metadata['codec_name']
    logger.info(f"get_codec() -> {codec}")
    return codec


def bitrate_to_string(bitrate: int) -> str:
    return str(bitrate // 1000) + "k"


def choose_encoder(codec: str) -> str:
    if codec == 'av1':
        encoder = 'libaom-av1'
    else:
        encoder = 'libx265'
    logger.info(f"choose_encoder -> {encoder}")
    return encoder


def convert_str_to_float(s: str) -> float:
    """Convert rational or decimal string to float
    """
    if '/' in s:
        num, denom = s.split('/')
        return float(num) / float(denom)
    return float(s)


if __name__ == "__main__":
    app()

ghost avatar May 19 '23 10:05 ghost