Convert videos to x265 to reduce video size and save space
This is a script that converts all videos to mkv to reduce the size. It could be modified to use the stash database instead.
import enlighten
import ffmpeg
import logging
import os
import pprint
import re
import shelve
import subprocess
import sys
import traceback
import typer
if sys.platform == 'win32':
import wexpect as expect
# patch windows console for scale correctly characters
import ansicon
ansicon.load()
else:
import pexpect as expect
from pathlib import Path
from typing import Dict, List, Union
VIDEO_CODEC = "hevc"
AUDIO_CODEC = "aac"
CRF = 26
PRESET = "veryfast"
MAX_HEIGHT = 720
MAX_WIDTH = 1280
MAX_VIDEO_BITRATE = 2000000
MAX_AUDIO_BITRATE = 64000
MAX_FRAME_RATE = 30
pattern_duration = re.compile(
'duration[ \t\r]?:[ \t\r]?(.+?),[ \t\r]?start', re.IGNORECASE)
pattern_progress = re.compile('time=(.+?)[ \t\r]?bitrate', re.IGNORECASE)
BAR_FMT = u'{desc}{desc_pad}{percentage:3.0f}%|{bar}| {count:{len_total}.1f}/{total:.1f} ' + \
u'[{elapsed}<{eta}, {rate:.2f}{unit_pad}{unit}/s]'
COUNTER_FMT = u'{desc}{desc_pad}{count:.1f} {unit}{unit_pad}' + \
u'[{elapsed}, {rate:.2f}{unit_pad}{unit}/s]{fill}'
CACHE_FILE = "cache.db"
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler("debug.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
app = typer.Typer()
@app.command()
def folder(
folder: str = typer.Argument(
default='.',
exists=True,
file_okay=True,
dir_okay=True,
readable=True,
resolve_path=True
)) -> None:
"""
Reduce the size of all videos on the given directory
"""
files = get_files_from(folder)
manager = enlighten.get_manager()
pbar = manager.counter(total=len(files), desc='Files', unit='files')
cache = shelve.open(CACHE_FILE)
for file in files:
try:
if file in cache:
continue
else:
video = Video(file)
if video.is_video():
video.convert_video()
cache[file] = True
except Exception:
logger.error(f"Could not convert: {file}")
pbar.update()
cache.close()
@app.command()
def file(
path: Path = typer.Argument(
default=None,
exists=True,
file_okay=True,
dir_okay=False,
readable=True,
resolve_path=True
)) -> None:
"""
Reduce the size of a single file
"""
video = Video(path)
if video.is_video():
video.convert_video()
def get_files_from(folder: str) -> List[str]:
"""
Walk directory recursively and get a list paths for the files
"""
logger.info(f"get_files_from({folder})")
res = []
for path, _, files in os.walk(folder):
for file in files:
res.append(os.path.join(path, file))
return res
px10bit = re.compile('10le$')
px12bit = re.compile('12le$')
class PixelFormat:
__slots__ = ('_pix_fmt', '_is_10bit', '_is_12bit')
def __init__(self, pix_fmt):
self._pix_fmt = pix_fmt
self._is_10bit = px10bit.search(pix_fmt) is not None
self._is_12bit = px12bit.search(pix_fmt) is not None
@property
def pixel_format(self):
return self._pix_fmt
@property
def is_10bit(self):
return self._is_10bit
@property
def is_12bit(self):
return self._is_12bit
@property
def is_8bit(self):
return not(self._is_10bit or self._is_12bit)
def __str__(self):
return self._pix_fmt
class Video():
def __init__(self, path: str) -> None:
logger.info("Video.init()")
self.path = path
if not self.init_error_checking():
return
self.video_metadata = self.get_metadata("v")
self.audio_metadata = self.get_metadata("a")
self.dest = get_dest_path(Path(self.path))
def init_error_checking(self) -> bool:
logger.info("Video.init_error_checking()")
try:
with open(os.devnull, 'w') as devnull:
subprocess.check_call(["ffprobe", "-h"], stdout=devnull, stderr=devnull)
except FileNotFoundError:
raise IOError('ffprobe not found.')
if not os.path.isfile(self.path):
logger.error('No such media file ' + str(self.path))
return False
return True
def is_video(self) -> bool:
try:
res = (self.video_metadata['codec_type'] == 'video'
and int(self.video_metadata['nb_frames']) > 0)
logger.info(f"Video.is_video() -> {res}")
return res
except Exception:
return False
def convert_video(self) -> None:
if self.dest.exists():
os.remove(str(self.dest))
try:
self.skip = True
cmd = self.prepare_ffmpeg_command()
if self.skip:
logger.info(f"Skipped: {self.path}")
print(f"Skipped: {self.path}")
return
logger.info(f"Processing: {self.path}")
print(f"Processing: {self.path}")
execute_ffmpeg(cmd, self.path)
remove_larger_file(self.dest, Path(self.path))
except ffmpeg._run.Error:
print(f"ffmpeg could not process: {self.path}")
traceback.print_exc()
write_to_file("errors.txt", self.path)
logger.info(f"Done processing: {self.path}")
def prepare_ffmpeg_command(self) -> List[str]:
cmd = ["ffmpeg", "-progress", "pipe:1", "-i", str(self.path),
"-map", "0", "-map", "-v", "-map", "V"]
self.push_encode_video_args_to_command(cmd)
self.push_change_frame_rate_args_to_command(cmd)
try:
self.push_encode_audio_args_to_command(cmd)
except Exception:
logger.warning("No audio stream found")
cmd.extend(["-y", str(self.dest)]) # 'y' arg overwrites output files without asking.
logger.info(f"Video.prepare_ffmpeg_command() -> {cmd}")
return cmd
def push_encode_video_args_to_command(self, cmd: List[str]) -> None:
self.codec = get_codec(self.video_metadata)
if self.codec != VIDEO_CODEC:
self.skip = False
cmd.append("-c:v")
encoder = choose_encoder(self.codec)
crf = str(CRF)
if self.get_bitdepth().is_10bit:
if encoder == 'libx265':
cmd.extend(["libx265", "-x265-params", f"crf={crf}:profile=main10"])
else:
cmd.extend(["libaom-av1", "-cpu-used", "8", "-threads", "0", "-x265-params", f"crf={crf}:profile=main10"])
else:
if encoder == 'libx265':
cmd.extend(["libx265", "-crf", crf])
else:
cmd.extend(["libaom-av1", "-cpu-used", "8", "-threads", "0", "-crf", crf])
if ('bit_rate' in self.video_metadata
and int(self.video_metadata['bit_rate']) > MAX_VIDEO_BITRATE):
self.skip = False
cmd.extend(["-b:v", str(MAX_VIDEO_BITRATE)])
cmd.extend(["-maxrate", bitrate_to_string(MAX_VIDEO_BITRATE), "-preset", PRESET])
if ('height' in self.video_metadata
and 'width' in self.video_metadata):
if (int(self.video_metadata['height']) < int(self.video_metadata['width'])
and int(self.video_metadata['height']) > MAX_HEIGHT):
cmd.extend(["-vf", f"scale=-2:{MAX_HEIGHT}"])
self.skip = False
elif (int(self.video_metadata['height']) > int(self.video_metadata['width'])
and int(self.video_metadata['width']) > MAX_WIDTH):
cmd.extend(["-vf", f"scale={MAX_WIDTH}:-2"])
self.skip = False
def push_change_frame_rate_args_to_command(self, cmd: List[str]) -> None:
if ('r_frame_rate' in self.video_metadata
and convert_str_to_float(self.video_metadata['r_frame_rate']) > MAX_FRAME_RATE):
cmd.extend(["-r", str(MAX_FRAME_RATE)])
self.skip = False
def push_encode_audio_args_to_command(self, cmd: List[str]) -> None:
if get_codec(self.audio_metadata) != AUDIO_CODEC:
self.skip = False
cmd.extend(["-c:a", AUDIO_CODEC])
if int(self.audio_metadata["sample_rate"]) > MAX_AUDIO_BITRATE:
self.skip = False
cmd.extend(["-b:a", bitrate_to_string(MAX_AUDIO_BITRATE)])
def get_metadata(self, stream: str) -> Union[Dict, None]:
try:
metadata = ffmpeg.probe(self.path, select_streams = stream)['streams'][0]
except IndexError as e:
logger.error(f"ffprobe failed for Video({self.path}).get_metadata({stream})")
return None
except ffmpeg.Error as e:
logger.error('stdout:', e.stdout.decode('utf8'))
logger.error('stderr:', e.stderr.decode('utf8'))
return None
except Exception as e:
logger.exception("message")
return None
logger.info(f"Video({self.path}).get_metadata(stream='{stream}') ->\n{pprint.pformat(metadata)}")
return metadata
def get_bitdepth(self) -> PixelFormat:
res = PixelFormat(self.video_metadata['pix_fmt'])
logger.info(f"Video.get_bitdepth() -> {res}")
return res
def __repr__(self) -> str:
return "<FFprobe: {metadata}, {video}, {audio}, {subtitle}, {attachment}>".format(**vars(self))
def write_to_file(file_path: str, data: str) -> None:
with open(file_path, 'w') as f:
f.write(data)
def get_dest_path(path: Path) -> Path:
dest = str(path.parent / (path.stem + ".mkv"))
i = 2
while (os.path.exists(dest)):
dest = str(path.parent / (path.stem + f"({i}).mkv"))
i += 1
logger.info(f"get_dest_path({str(path)}) -> {dest}")
return Path(dest)
def remove_larger_file(new: Path, old: Path) -> None:
logger.info(f"remove_larger_file({str(new)}, {str(old)})")
if not (new.exists() and old.exists()):
return
if os.stat(str(new)).st_size <= os.stat(str(old)).st_size:
os.remove(str(old))
else:
os.remove(str(new))
def execute_ffmpeg(cmd: List[str], path: str) -> None:
logger.info("execute_ffmpeg()")
try:
subprocess.check_output(cmd)
except Exception:
logger.error(f"Could not process: {path}")
def get_codec(metadata: Dict) -> str:
codec = metadata['codec_name']
logger.info(f"get_codec() -> {codec}")
return codec
def bitrate_to_string(bitrate: int) -> str:
return str(bitrate // 1000) + "k"
def choose_encoder(codec: str) -> str:
if codec == 'av1':
encoder = 'libaom-av1'
else:
encoder = 'libx265'
logger.info(f"choose_encoder -> {encoder}")
return encoder
def convert_str_to_float(s: str) -> float:
"""Convert rational or decimal string to float
"""
if '/' in s:
num, denom = s.split('/')
return float(num) / float(denom)
return float(s)
if __name__ == "__main__":
app()
Can you give an example of how to run this? Will it mess up already matched videos, or will it clear out the existing metadata for those files?
To run it you have to save it in a file for example test.py and then call it like python test.py folder '/path/to/folder'. After that you run a scan and move the metadata from the original to the x265 version while deduplicating.
I like the idea of this, and it could remain a standalone script, or it could become a stash plugin, perhaps with the new settings to make it easy to configure. I'd say this is really just redoing the current transcoding already in code. But maybe that's ok, and maybe we should just give a better UI to making custom transcodes using the current code?
Might be worth doing as a short term script, as a poor man's transcoder, but unsure of level of support. It is worth adding this or just pointing people to handbrake or tdarr?
Feedback welcomed.
Hello, I'm trying to use this on MacOS, but I'm getting this error every time [ERROR] Could not Convert "X". Am I missing some prerequisites? I had to install a few of the things being imported at the top of the script, so I did that