CommunityScripts
CommunityScripts copied to clipboard
Convert videos to x265 to reduce video size and save space
This is a script that converts all videos to mkv to reduce the size. It could be modified to use the stash database instead.
import enlighten
import ffmpeg
import logging
import os
import pprint
import re
import shelve
import subprocess
import sys
import traceback
import typer
if sys.platform == 'win32':
import wexpect as expect
# patch windows console for scale correctly characters
import ansicon
ansicon.load()
else:
import pexpect as expect
from pathlib import Path
from typing import Dict, List, Union
VIDEO_CODEC = "hevc"
AUDIO_CODEC = "aac"
CRF = 26
PRESET = "veryfast"
MAX_HEIGHT = 720
MAX_WIDTH = 1280
MAX_VIDEO_BITRATE = 2000000
MAX_AUDIO_BITRATE = 64000
MAX_FRAME_RATE = 30
pattern_duration = re.compile(
'duration[ \t\r]?:[ \t\r]?(.+?),[ \t\r]?start', re.IGNORECASE)
pattern_progress = re.compile('time=(.+?)[ \t\r]?bitrate', re.IGNORECASE)
BAR_FMT = u'{desc}{desc_pad}{percentage:3.0f}%|{bar}| {count:{len_total}.1f}/{total:.1f} ' + \
u'[{elapsed}<{eta}, {rate:.2f}{unit_pad}{unit}/s]'
COUNTER_FMT = u'{desc}{desc_pad}{count:.1f} {unit}{unit_pad}' + \
u'[{elapsed}, {rate:.2f}{unit_pad}{unit}/s]{fill}'
CACHE_FILE = "cache.db"
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler("debug.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
app = typer.Typer()
@app.command()
def folder(
folder: str = typer.Argument(
default='.',
exists=True,
file_okay=True,
dir_okay=True,
readable=True,
resolve_path=True
)) -> None:
"""
Reduce the size of all videos on the given directory
"""
files = get_files_from(folder)
manager = enlighten.get_manager()
pbar = manager.counter(total=len(files), desc='Files', unit='files')
cache = shelve.open(CACHE_FILE)
for file in files:
try:
if file in cache:
continue
else:
video = Video(file)
if video.is_video():
video.convert_video()
cache[file] = True
except Exception:
logger.error(f"Could not convert: {file}")
pbar.update()
cache.close()
@app.command()
def file(
path: Path = typer.Argument(
default=None,
exists=True,
file_okay=True,
dir_okay=False,
readable=True,
resolve_path=True
)) -> None:
"""
Reduce the size of a single file
"""
video = Video(path)
if video.is_video():
video.convert_video()
def get_files_from(folder: str) -> List[str]:
"""
Walk directory recursively and get a list paths for the files
"""
logger.info(f"get_files_from({folder})")
res = []
for path, _, files in os.walk(folder):
for file in files:
res.append(os.path.join(path, file))
return res
px10bit = re.compile('10le$')
px12bit = re.compile('12le$')
class PixelFormat:
__slots__ = ('_pix_fmt', '_is_10bit', '_is_12bit')
def __init__(self, pix_fmt):
self._pix_fmt = pix_fmt
self._is_10bit = px10bit.search(pix_fmt) is not None
self._is_12bit = px12bit.search(pix_fmt) is not None
@property
def pixel_format(self):
return self._pix_fmt
@property
def is_10bit(self):
return self._is_10bit
@property
def is_12bit(self):
return self._is_12bit
@property
def is_8bit(self):
return not(self._is_10bit or self._is_12bit)
def __str__(self):
return self._pix_fmt
class Video():
def __init__(self, path: str) -> None:
logger.info("Video.init()")
self.path = path
if not self.init_error_checking():
return
self.video_metadata = self.get_metadata("v")
self.audio_metadata = self.get_metadata("a")
self.dest = get_dest_path(Path(self.path))
def init_error_checking(self) -> bool:
logger.info("Video.init_error_checking()")
try:
with open(os.devnull, 'w') as devnull:
subprocess.check_call(["ffprobe", "-h"], stdout=devnull, stderr=devnull)
except FileNotFoundError:
raise IOError('ffprobe not found.')
if not os.path.isfile(self.path):
logger.error('No such media file ' + str(self.path))
return False
return True
def is_video(self) -> bool:
try:
res = (self.video_metadata['codec_type'] == 'video'
and int(self.video_metadata['nb_frames']) > 0)
logger.info(f"Video.is_video() -> {res}")
return res
except Exception:
return False
def convert_video(self) -> None:
if self.dest.exists():
os.remove(str(self.dest))
try:
self.skip = True
cmd = self.prepare_ffmpeg_command()
if self.skip:
logger.info(f"Skipped: {self.path}")
print(f"Skipped: {self.path}")
return
logger.info(f"Processing: {self.path}")
print(f"Processing: {self.path}")
execute_ffmpeg(cmd, self.path)
remove_larger_file(self.dest, Path(self.path))
except ffmpeg._run.Error:
print(f"ffmpeg could not process: {self.path}")
traceback.print_exc()
write_to_file("errors.txt", self.path)
logger.info(f"Done processing: {self.path}")
def prepare_ffmpeg_command(self) -> List[str]:
cmd = ["ffmpeg", "-progress", "pipe:1", "-i", str(self.path),
"-map", "0", "-map", "-v", "-map", "V"]
self.push_encode_video_args_to_command(cmd)
self.push_change_frame_rate_args_to_command(cmd)
try:
self.push_encode_audio_args_to_command(cmd)
except Exception:
logger.warning("No audio stream found")
cmd.extend(["-y", str(self.dest)]) # 'y' arg overwrites output files without asking.
logger.info(f"Video.prepare_ffmpeg_command() -> {cmd}")
return cmd
def push_encode_video_args_to_command(self, cmd: List[str]) -> None:
self.codec = get_codec(self.video_metadata)
if self.codec != VIDEO_CODEC:
self.skip = False
cmd.append("-c:v")
encoder = choose_encoder(self.codec)
crf = str(CRF)
if self.get_bitdepth().is_10bit:
if encoder == 'libx265':
cmd.extend(["libx265", "-x265-params", f"crf={crf}:profile=main10"])
else:
cmd.extend(["libaom-av1", "-cpu-used", "8", "-threads", "0", "-x265-params", f"crf={crf}:profile=main10"])
else:
if encoder == 'libx265':
cmd.extend(["libx265", "-crf", crf])
else:
cmd.extend(["libaom-av1", "-cpu-used", "8", "-threads", "0", "-crf", crf])
if ('bit_rate' in self.video_metadata
and int(self.video_metadata['bit_rate']) > MAX_VIDEO_BITRATE):
self.skip = False
cmd.extend(["-b:v", str(MAX_VIDEO_BITRATE)])
cmd.extend(["-maxrate", bitrate_to_string(MAX_VIDEO_BITRATE), "-preset", PRESET])
if ('height' in self.video_metadata
and 'width' in self.video_metadata):
if (int(self.video_metadata['height']) < int(self.video_metadata['width'])
and int(self.video_metadata['height']) > MAX_HEIGHT):
cmd.extend(["-vf", f"scale=-2:{MAX_HEIGHT}"])
self.skip = False
elif (int(self.video_metadata['height']) > int(self.video_metadata['width'])
and int(self.video_metadata['width']) > MAX_WIDTH):
cmd.extend(["-vf", f"scale={MAX_WIDTH}:-2"])
self.skip = False
def push_change_frame_rate_args_to_command(self, cmd: List[str]) -> None:
if ('r_frame_rate' in self.video_metadata
and convert_str_to_float(self.video_metadata['r_frame_rate']) > MAX_FRAME_RATE):
cmd.extend(["-r", str(MAX_FRAME_RATE)])
self.skip = False
def push_encode_audio_args_to_command(self, cmd: List[str]) -> None:
if get_codec(self.audio_metadata) != AUDIO_CODEC:
self.skip = False
cmd.extend(["-c:a", AUDIO_CODEC])
if int(self.audio_metadata["sample_rate"]) > MAX_AUDIO_BITRATE:
self.skip = False
cmd.extend(["-b:a", bitrate_to_string(MAX_AUDIO_BITRATE)])
def get_metadata(self, stream: str) -> Union[Dict, None]:
try:
metadata = ffmpeg.probe(self.path, select_streams = stream)['streams'][0]
except IndexError as e:
logger.error(f"ffprobe failed for Video({self.path}).get_metadata({stream})")
return None
except ffmpeg.Error as e:
logger.error('stdout:', e.stdout.decode('utf8'))
logger.error('stderr:', e.stderr.decode('utf8'))
return None
except Exception as e:
logger.exception("message")
return None
logger.info(f"Video({self.path}).get_metadata(stream='{stream}') ->\n{pprint.pformat(metadata)}")
return metadata
def get_bitdepth(self) -> PixelFormat:
res = PixelFormat(self.video_metadata['pix_fmt'])
logger.info(f"Video.get_bitdepth() -> {res}")
return res
def __repr__(self) -> str:
return "<FFprobe: {metadata}, {video}, {audio}, {subtitle}, {attachment}>".format(**vars(self))
def write_to_file(file_path: str, data: str) -> None:
with open(file_path, 'w') as f:
f.write(data)
def get_dest_path(path: Path) -> Path:
dest = str(path.parent / (path.stem + ".mkv"))
i = 2
while (os.path.exists(dest)):
dest = str(path.parent / (path.stem + f"({i}).mkv"))
i += 1
logger.info(f"get_dest_path({str(path)}) -> {dest}")
return Path(dest)
def remove_larger_file(new: Path, old: Path) -> None:
logger.info(f"remove_larger_file({str(new)}, {str(old)})")
if not (new.exists() and old.exists()):
return
if os.stat(str(new)).st_size <= os.stat(str(old)).st_size:
os.remove(str(old))
else:
os.remove(str(new))
def execute_ffmpeg(cmd: List[str], path: str) -> None:
logger.info("execute_ffmpeg()")
try:
subprocess.check_output(cmd)
except Exception:
logger.error(f"Could not process: {path}")
def get_codec(metadata: Dict) -> str:
codec = metadata['codec_name']
logger.info(f"get_codec() -> {codec}")
return codec
def bitrate_to_string(bitrate: int) -> str:
return str(bitrate // 1000) + "k"
def choose_encoder(codec: str) -> str:
if codec == 'av1':
encoder = 'libaom-av1'
else:
encoder = 'libx265'
logger.info(f"choose_encoder -> {encoder}")
return encoder
def convert_str_to_float(s: str) -> float:
"""Convert rational or decimal string to float
"""
if '/' in s:
num, denom = s.split('/')
return float(num) / float(denom)
return float(s)
if __name__ == "__main__":
app()