edge-tts icon indicating copy to clipboard operation
edge-tts copied to clipboard

generate chinese subtitles function update

Open wh1te-moon opened this issue 2 years ago • 9 comments

It's makes many mistakes when generating chinese subtitle,like this

WEBVTT



00:00:00.083 --> 00:00:02.583

你 穿越 大明成 为 第一 贪官 入股 赌坊 兴办 青楼



00:00:02.708 --> 00:00:04.892

是 沛县 最大 的 保护伞 你 更 是 当众 受贿



00:00:05.125 --> 00:00:07.858

万两白银 打点 官职 就 连 沈安 的 县衙 前院 你



00:00:07.858 --> 00:00:09.833

都 毫无 避讳 地 摆 满 了 金尊 琉璃 可

Incorrect clauses and missing punctuation. I fix it in my fork,which creates a new function named generate_cn_subs. Should I make a pr request?

wh1te-moon avatar Nov 13 '23 08:11 wh1te-moon

I used your code, please don't request pr, your code will cause the subtitle and voice speed mismatch problem

Anning01 avatar Apr 10 '24 16:04 Anning01

What does "the subtitle and voice speed mismatch problem" mean? Could you give me an example? I update a new file in my fork named streaming_with_cn_subtitles.py. It tests the rate argument without problems.

But this cn_subtitle function is based on the punctuation of the text, which may be not beautiful.

wh1te-moon avatar Apr 19 '24 03:04 wh1te-moon

Unfortunately I don't speak Chinese so I will need some help with testing. The simplest solution I can think of is to match the input text against the subtitle word boundary on a best effort basis and with some fuzziness. However, any solution will need to be generic and work for all languages not just Chinese.

rany2 avatar Apr 29 '24 21:04 rany2

I'm thinking about doing something like set(list(output_wordboundaries)) and then filtering input on that set. This will allow me to figure out which index of the input I need to be on for the subtitle. I think it should work generically.

rany2 avatar Apr 29 '24 21:04 rany2

@wh1te-moon I’m curious about the effect of the repaired Chinese subtitles. Can you upload a comparison of the before and after effects?

Imfdj avatar Apr 30 '24 09:04 Imfdj

I'm thinking about doing something like set(list(output_wordboundaries)) and then filtering input on that set. This will allow me to figure out which index of the input I need to be on for the subtitle. I think it should work generically.

In fact,my solution is based on the correct punctuation of the input text,not the specific language,so it will take effect if the PUNCTUATION_LIST is set. I have only set it for Chinese and English punctuations. This PUNCTUATION_LIST is easy to modify if we can invite someone who wants their language to be surported.This solution may only be unsupported by a small number of right-to-left languages (e.g., Arabic).

But it's based on the correct punctuation of the input text.

wh1te-moon avatar Apr 30 '24 09:04 wh1te-moon

@wh1te-moon I’m curious about the effect of the repaired Chinese subtitles. Can you upload a comparison of the before and after effects?

input text:

"东风夜放花千树,更吹落、星如雨。宝马雕车香满路。"

vtt file:

00:00:00.083 --> 00:00:01.567

东风夜放花千树 更 吹



00:00:01.567 --> 00:00:03.742

落 星如雨 宝马雕车香满路

fixed version:

00:00:00.083 --> 00:00:01.092

东风夜放花千树,



00:00:01.267 --> 00:00:02.400

更吹落、星如雨。



00:00:02.767 --> 00:00:03.742

宝马雕车香满路。

wh1te-moon avatar Apr 30 '24 10:04 wh1te-moon

I'm thinking about doing something like set(list(output_wordboundaries)) and then filtering input on that set. This will allow me to figure out which index of the input I need to be on for the subtitle. I think it should work generically.

In fact,my solution is based on the correct punctuation of the input text,not the specific language,so it will take effect if the PUNCTUATION_LIST is set. I have only set it for Chinese and English punctuations. This PUNCTUATION_LIST is easy to modify if we can invite someone who wants their language to be surported.This solution may only be unsupported by a small number of right-to-left languages (e.g., Arabic).

But it's based on the correct punctuation of the input text.

I would like to give this a try. Please share how to apply your PUNCTUATION_LIST solution (e.g. files and code to be modified).

anartigone avatar May 06 '24 22:05 anartigone

It's simple.

def generate_subs_based_on_punc(self, text) -> str:
        PUNCTUATION = [',', '。', '!', '?', ';',
                       ':', '\n', '“', '”', ',', '!', '\\. ']

        def clause(self) -> list[str]:
            pattern = '(' + '|'.join(punc for punc in PUNCTUATION) + ')'
            text_list = re.split(pattern, text)

            index = 0
            pattern = '^[' + ''.join(p for p in PUNCTUATION) + ']+$'
            while (index < len(text_list)-1):
                if not text_list[index+1]:
                    text_list.pop(index+1)
                    continue
                if re.match(pattern, text_list[index+1]):
                    if (text_list[index+1] == '\n'):
                        text_list.pop(index+1)
                        continue
                    text_list[index] += text_list.pop(index+1)
                else:
                    index += 1

            return text_list

        self.text_list = clause(self)
        if len(self.subs) != len(self.offset):
            raise ValueError("subs and offset are not of the same length")
        data = "WEBVTT\r\n\r\n"
        j = 0
        for text in self.text_list:
            try:
                start_time = self.offset[j][0]
            except IndexError:
                return data
            try:
                while (self.subs[j + 1] in text):
                    j += 1
            except IndexError:
                pass
            data += formatter(start_time, self.offset[j][1], text)
            j += 1
        return data

The main function is the clause, using re to split the text with punctuation marks.

wh1te-moon avatar May 07 '24 03:05 wh1te-moon

It's simple.

def generate_subs_based_on_punc(self, text) -> str:
        PUNCTUATION = [',', '。', '!', '?', ';',
                       ':', '\n', '“', '”', ',', '!', '\\. ']

        def clause(self) -> list[str]:
            pattern = '(' + '|'.join(punc for punc in PUNCTUATION) + ')'
            text_list = re.split(pattern, text)

            index = 0
            pattern = '^[' + ''.join(p for p in PUNCTUATION) + ']+$'
            while (index < len(text_list)-1):
                if not text_list[index+1]:
                    text_list.pop(index+1)
                    continue
                if re.match(pattern, text_list[index+1]):
                    if (text_list[index+1] == '\n'):
                        text_list.pop(index+1)
                        continue
                    text_list[index] += text_list.pop(index+1)
                else:
                    index += 1

            return text_list

        self.text_list = clause(self)
        if len(self.subs) != len(self.offset):
            raise ValueError("subs and offset are not of the same length")
        data = "WEBVTT\r\n\r\n"
        j = 0
        for text in self.text_list:
            try:
                start_time = self.offset[j][0]
            except IndexError:
                return data
            try:
                while (self.subs[j + 1] in text):
                    j += 1
            except IndexError:
                pass
            data += formatter(start_time, self.offset[j][1], text)
            j += 1
        return data

The main function is the clause, using re to split the text with punctuation marks.

Awesome, after some struggle, the code works as expected!

Just for my personal record. I would like to share what I did.

  1. Get the code: git clone https://github.com/rany2/edge-tts.git
  2. Edit /edge-tts/src/edge_tts/submaker.py :
"""
SubMaker package for the Edge TTS project.

SubMaker is a package that makes the process of creating subtitles with
information provided by the service easier.
"""

import math
import re
from typing import List, Tuple
from xml.sax.saxutils import escape, unescape


def formatter(start_time: float, end_time: float, subdata: str) -> str:
    """
    formatter returns the timecode and the text of the subtitle.
    """
    return (
        f"{mktimestamp(start_time)} --> {mktimestamp(end_time)}\r\n"
        f"{escape(subdata)}\r\n\r\n"
    )


def mktimestamp(time_unit: float) -> str:
    """
    mktimestamp returns the timecode of the subtitle.

    The timecode is in the format of 00:00:00.000.

    Returns:
        str: The timecode of the subtitle.
    """
    hour = math.floor(time_unit / 10**7 / 3600)
    minute = math.floor((time_unit / 10**7 / 60) % 60)
    seconds = (time_unit / 10**7) % 60
    return f"{hour:02d}:{minute:02d}:{seconds:06.3f}"


class SubMaker:
    """
    SubMaker class
    """

    def __init__(self) -> None:
        """
        SubMaker constructor.
        """
        self.offset: List[Tuple[float, float]] = []
        self.subs: List[str] = []

    def create_sub(self, timestamp: Tuple[float, float], text: str) -> None:
        """
        create_sub creates a subtitle with the given timestamp and text
        and adds it to the list of subtitles

        Args:
            timestamp (tuple): The offset and duration of the subtitle.
            text (str): The text of the subtitle.

        Returns:
            None
        """
        self.offset.append((timestamp[0], timestamp[0] + timestamp[1]))
        self.subs.append(text)

    def generate_subs_based_on_punc(self, text) -> str:
        PUNCTUATION = [',', '。', '!', '?', ';',
                       ':', '\n', '“', '”', ',', '!', '\\. ']

        def clause(self) -> list[str]:
            pattern = '(' + '|'.join(punc for punc in PUNCTUATION) + ')'
            text_list = re.split(pattern, text)

            index = 0
            pattern = '^[' + ''.join(p for p in PUNCTUATION) + ']+$'
            while (index < len(text_list)-1):
                if not text_list[index+1]:
                    text_list.pop(index+1)
                    continue
                if re.match(pattern, text_list[index+1]):
                    if (text_list[index+1] == '\n'):
                        text_list.pop(index+1)
                        continue
                    text_list[index] += text_list.pop(index+1)
                else:
                    index += 1

            return text_list

        self.text_list = clause(self)
        if len(self.subs) != len(self.offset):
            raise ValueError("subs and offset are not of the same length")
        data = "WEBVTT\r\n\r\n"
        j = 0
        for text in self.text_list:
            try:
                start_time = self.offset[j][0]
            except IndexError:
                return data
            try:
                while (self.subs[j + 1] in text):
                    j += 1
            except IndexError:
                pass
            data += formatter(start_time, self.offset[j][1], text)
            j += 1
        return data
  1. Edit /edge-tts/src/edge_tts/util.py :
"""
Main package.
"""

import argparse
import asyncio
import sys
from io import TextIOWrapper
from typing import Any, TextIO, Union

from edge_tts import Communicate, SubMaker, list_voices


async def _print_voices(*, proxy: str) -> None:
    """Print all available voices."""
    voices = await list_voices(proxy=proxy)
    voices = sorted(voices, key=lambda voice: voice["ShortName"])
    for idx, voice in enumerate(voices):
        if idx != 0:
            print()

        for key in voice.keys():
            if key in (
                "SuggestedCodec",
                "FriendlyName",
                "Status",
                "VoiceTag",
                "Name",
                "Locale",
            ):
                continue
            pretty_key_name = key if key != "ShortName" else "Name"
            print(f"{pretty_key_name}: {voice[key]}")


async def _run_tts(args: Any) -> None:
    """Run TTS after parsing arguments from command line."""

    try:
        if sys.stdin.isatty() and sys.stdout.isatty() and not args.write_media:
            print(
                "Warning: TTS output will be written to the terminal. "
                "Use --write-media to write to a file.\n"
                "Press Ctrl+C to cancel the operation. "
                "Press Enter to continue.",
                file=sys.stderr,
            )
            input()
    except KeyboardInterrupt:
        print("\nOperation canceled.", file=sys.stderr)
        return

    tts: Communicate = Communicate(
        args.text,
        args.voice,
        proxy=args.proxy,
        rate=args.rate,
        volume=args.volume,
        pitch=args.pitch,
    )
    subs: SubMaker = SubMaker()
    with (
        open(args.write_media, "wb") if args.write_media else sys.stdout.buffer
    ) as audio_file:
        async for chunk in tts.stream():
            if chunk["type"] == "audio":
                audio_file.write(chunk["data"])
            elif chunk["type"] == "WordBoundary":
                subs.create_sub((chunk["offset"], chunk["duration"]), chunk["text"])

    sub_file: Union[TextIOWrapper, TextIO] = (
        open(args.write_subtitles, "w", encoding="utf-8")
        if args.write_subtitles
        else sys.stderr
    )
    with sub_file:
        sub_file.write(subs.generate_subs_based_on_punc(args.text))


async def amain() -> None:
    """Async main function"""
    parser = argparse.ArgumentParser(description="Microsoft Edge TTS")
    group = parser.add_mutually_exclusive_group(required=True)
    group.add_argument("-t", "--text", help="what TTS will say")
    group.add_argument("-f", "--file", help="same as --text but read from file")
    parser.add_argument(
        "-v",
        "--voice",
        help="voice for TTS. Default: en-US-AriaNeural",
        default="en-US-AriaNeural",
    )
    group.add_argument(
        "-l",
        "--list-voices",
        help="lists available voices and exits",
        action="store_true",
    )
    parser.add_argument("--rate", help="set TTS rate. Default +0%%.", default="+0%")
    parser.add_argument("--volume", help="set TTS volume. Default +0%%.", default="+0%")
    parser.add_argument("--pitch", help="set TTS pitch. Default +0Hz.", default="+0Hz")
    parser.add_argument(
        "--words-in-cue",
        help="number of words in a subtitle cue. Default: 10.",
        default=10,
        type=float,
    )
    parser.add_argument(
        "--write-media", help="send media output to file instead of stdout"
    )
    parser.add_argument(
        "--write-subtitles",
        help="send subtitle output to provided file instead of stderr",
    )
    parser.add_argument("--proxy", help="use a proxy for TTS and voice list.")
    args = parser.parse_args()

    if args.list_voices:
        await _print_voices(proxy=args.proxy)
        sys.exit(0)

    if args.file is not None:
        # we need to use sys.stdin.read() because some devices
        # like Windows and Termux don't have a /dev/stdin.
        if args.file == "/dev/stdin":
            args.text = sys.stdin.read()
        else:
            with open(args.file, "r", encoding="utf-8") as file:
                args.text = file.read()

    if args.text is not None:
        await _run_tts(args)


def main() -> None:
    """Run the main function using asyncio."""
    asyncio.run(amain())


if __name__ == "__main__":
    main()
  1. Save both file and install the package with pip install -e /path/to/edge-tts/
  2. Use edge-tts command with --write-subtitles, the results is fixed.

Thank you again, @wh1te-moon, for your awesome help.

anartigone avatar May 08 '24 00:05 anartigone

Why not choose to merge my pull request? Should I modify the target branch to a non-master branch? Or is there something else I should be aware of? This is my first successful involvement in an open-source project,so thank you very much too.

wh1te-moon avatar May 08 '24 01:05 wh1te-moon

This is a helpful function to have. I have tested it works in both Chinese and English. I agree it should be merged for a good reason.

anartigone avatar May 08 '24 02:05 anartigone