audible-cli icon indicating copy to clipboard operation
audible-cli copied to clipboard

Bug: Download Hangs

Open devvythelopper opened this issue 4 months ago • 38 comments

With the current HEAD of master https://github.com/mkb79/audible-cli/commit/6e497fb865178cd81fb9465b230108c01286ba53, I am using the following command:

audible download --output-dir . --pdf --cover --annotation --chapter --chapter-type Flat --quality best --aaxc --all

to update my local copy of my library. I get a lot of "Skip download" and then suddenly audible hangs. Left it running over night, so I am sure it is hanging.

Within the log there are also two errors:

error: Server disconnected without sending a response.
...
error: Server disconnected without sending a response.

When running the command in an empty directory (so that the whole library starts to redownload), downloads work as they should. So the error is thrown only for specific audiobooks I have in my library. To find out which audiobook is a culprit, I modified audible-cli and added some logging (see pull request #234). It happens when downloading the "annotation" for the audiobook "Fooled by Randomness" (ASIN: B004V03DW2).

Audible does not hang if I use the command line flag --ignore-errors. So apparently when errors are raised in cmd_download.py...

async def consume(ignore_errors):
    while True:
        item, entity_type, cmd, kwargs = await QUEUE.get()
        try:
            logger.info(f"--- Downloading {entity_type} for: {item.asin}: {item.title}")
            await cmd(**kwargs)
        except Exception as e:
            logger.error(e)
            if not ignore_errors:
                raise
        finally:
            QUEUE.task_done()

... the raised exception seems not to be handled correctly.

Also, I suppose there should not be an exception in the first place but the annotation downloading command should handle whatever happens gracefully.

devvythelopper avatar Aug 15 '25 13:08 devvythelopper

Apparently this happens for many of my audiobooks, and it is always the annotation that fails to download.

I also subscribe to a podcast: audible-cli downloads an mp3 file and then complains that it has the "Wrong content type. [...]". This however only happens for some of the podcast's mp3 episodes, not for all of the mp3 episodes. I have to use --ignore-errors to even get to the podcast episodes, so I do not know if the content type error is handled gracefully without --ignore-errors.

devvythelopper avatar Aug 15 '25 13:08 devvythelopper

@devvythelopper Thank you for your issue and pr. Because adding new options to the download command is a cruel, I will refactor the module. My plan is to implement an options and download item class. I will also take your changes into account!

mkb79 avatar Aug 15 '25 13:08 mkb79

Good idea. A rewrite will change everything, and then this will no longer be relevant. Until then, I'll leave the issue open, so that people will figure out that they need to use --ignore-errors

Updating my library in this way, with --ignore-errors, results in audible-cli re-downloading the failed .mp3 episodes btw. and then deleting them. It also tries to download "annotation" every time.

devvythelopper avatar Aug 15 '25 13:08 devvythelopper

Since the annotation issue might be API related, I verified that it fails for books where I have annotations and where I have none. It also succeeds for books where I have annotations and where I have none.

devvythelopper avatar Aug 15 '25 13:08 devvythelopper

I've did a fast rework of the download command. Can you please add the code from my next message into a file named "cmd_donwload2.py" into your audible-cli plugin directory?

mkb79 avatar Aug 23 '25 19:08 mkb79

import asyncio
import json
import logging
import pathlib
from dataclasses import dataclass, fields
from datetime import datetime, timezone
from typing import Any

import aiofiles
import click
import httpx
import questionary
from audible import AsyncClient
from audible.exceptions import NotFoundError, RequestError
from click import echo

from ..config import Session
from ..decorators import (
    bunch_size_option,
    end_date_option,
    pass_client,
    pass_session,
    start_date_option,
    timeout_option,
)
from ..downloader import Downloader as NewDownloader
from ..downloader import Status
from ..exceptions import (
    AudibleCliException,
    DirectoryDoesNotExists,
    DownloadUrlExpired,
    NotDownloadableAsAAX,
    VoucherNeedRefresh,
)
from ..models import Library, LibraryItem
from ..utils import Downloader, datetime_type


logger = logging.getLogger(__name__)

CLIENT_HEADERS = {"User-Agent": "Audible/671 CFNetwork/1240.0.4 Darwin/20.6.0"}

COUNTER_NAME_LABEL_MAP = {
    "aax": "aax files",
    "aaxc": "aaxc files",
    "annotation": "annotations",
    "aycl": "aycl files",
    "aycl_voucher": "aycl voucher files",
    "chapter": "chapters",
    "cover": "covers",
    "pdf": "PDFs",
    "voucher": "voucher files",
    "voucher_saved": "saved voucher files"
}


class DownloadCounter:
    __slots__ = ("_count_data",)

    def __init__(self) -> None:
        self._count_data = {}

    def __getattr__(self, item) -> int:
        if item not in COUNTER_NAME_LABEL_MAP.keys():
            raise AttributeError(f"Invalid attribute: {item}")

        try:
            return self._count_data[item]
        except KeyError:
            return 0

    def count(self, name: str) -> int:
        new_val = getattr(self, name) + 1
        self._count_data[name] = new_val
        label = COUNTER_NAME_LABEL_MAP[name]
        logger.debug("Currently downloaded %s: %s", label, new_val)
        return new_val

    def as_dict(self) -> dict:
        return self._count_data

    def has_downloads(self) -> bool:
        return bool(self._count_data)


def display_counter(counter: DownloadCounter) -> None:
    if not counter.has_downloads():
        echo("No new files downloaded.")
        return None

    echo("The download ended with the following result:")

    data = counter.as_dict()
    for k, v in data.items():
        if k == "voucher_saved":
            key_label = "voucher"
        elif k == "voucher":
            unsaved_vouchers = counter.voucher - counter.voucher_saved
            if unsaved_vouchers > 0:
                echo(f"Unsaved voucher: {unsaved_vouchers}")
            continue
        else:
            key_label = COUNTER_NAME_LABEL_MAP[k]

        echo(f"New {key_label}: {v}")
    return None


@dataclass
class DownloadOptions:
    # Directory settings
    output_dir: pathlib.Path

    # Selection options
    all: bool
    asins: tuple[str, ...]
    titles: tuple[str, ...]

    # Content type options
    aax: bool
    aaxc: bool
    aax_fallback: bool
    annotation: bool
    chapters: bool
    cover: bool
    pdf: bool

    # Quality and formatting options
    quality: str
    cover_sizes: list[str]
    chapter_type: str
    filename_mode: str
    filename_length: int

    # Processing options
    sim_jobs: int
    overwrite_existing: bool
    ignore_errors: bool
    no_confirm: bool

    # Podcast handling
    resolve_podcasts: bool
    ignore_podcasts: bool

    # Date filtering
    start_date: datetime | None = None
    end_date: datetime | None = None

    # Additional options
    bunch_size: int | None = None

    def copy_with(self, **overrides: Any) -> "DownloadOptions":
        """Return a new DownloadOptions instance with the given fields overridden."""
        # Validate override keys
        valid_fields = {f.name for f in fields(self)}
        invalid = [k for k in overrides.keys() if k not in valid_fields]
        if invalid:
            invalid_list = ", ".join(invalid)
            raise TypeError(f"Invalid field(s) for copy_with: {invalid_list}")

        # Build base kwargs from current instance
        kwargs: dict[str, Any] = {}
        for name in valid_fields:
            value = getattr(self, name)
            # Shallow-copy lists to avoid shared mutation
            if isinstance(value, list):
                value = list(value)
            kwargs[name] = value

        # Apply overrides (ignore None -> keep original value)
        for k, v in overrides.items():
            if v is not None:
                kwargs[k] = v

        # Create a new instance (this will run __post_init__ validations)
        return type(self)(**kwargs)

    def __post_init__(self):
        """Validates options after initialization."""
        self._validate_selection_options()
        self._validate_download_options()
        self._validate_podcast_options()
        self._validate_date_options()
        self._handle_aax_fallback()

    def _validate_selection_options(self):
        """Ensure the item selection options are valid."""
        if self.all and any([self.asins, self.titles]):
            raise click.BadOptionUsage(
                "--all",
                "The --all option cannot be used together with --asin or --title options"
            )

    def _validate_download_options(self):
        """Ensure at least one download option is selected."""
        if not any([
            self.aax, self.aax_fallback, self.aaxc,
            self.annotation, self.chapters, self.cover, self.pdf
        ]):
            raise click.BadOptionUsage(
                "download_option",
                "Please select at least one option for what you want to download."
            )

    def _validate_podcast_options(self):
        """Ensure podcast options are not conflicting."""
        if self.resolve_podcasts and self.ignore_podcasts:
            raise click.BadOptionUsage(
                "podcast_option",
                "Do not mix --ignore-podcasts with --resolve-podcasts option."
            )

    def _validate_date_options(self):
        """Ensure date options are valid if provided."""
        if (self.start_date and self.end_date and
                self.start_date > self.end_date):
            raise click.BadOptionUsage(
                "date_option",
                "Start date must be before or equal to the end date"
            )

        if self.start_date is not None:
            logger.info("Selected start date: %s",
                        self.start_date.strftime("%Y-%m-%dT%H:%M:%S.%fZ"))
        if self.end_date is not None:
            logger.info("Selected end date: %s",
                        self.end_date.strftime("%Y-%m-%dT%H:%M:%S.%fZ"))

    def _handle_aax_fallback(self):
        """Handle the  aax_fallback option."""
        if not self.aax_fallback:
            return

        if self.aax:
            logger.info("Using --aax is redundant and can be left when using --aax-fallback")

        # aax_fallback implies aax
        self.aax = False

        if self.aaxc:
            logger.warning("Do not mix --aaxc with the --aax-fallback option.")

        self.aaxc = False


def parse_options(session: Session, options: dict[str, Any]) -> DownloadOptions:
    """Parse CLI options into a structured DownloadOptions object."""
    output_dir = pathlib.Path(options.get("output_dir")).resolve()

    # Resolve chapter_type and filename_mode from config if needed
    chapter_type = options.get("chapter_type")
    if chapter_type == "config":
        chapter_type = session.config.get_profile_option(
            session.selected_profile, "chapter_type", "Tree")

    filename_mode = options.get("filename_mode")
    if filename_mode == "config":
        filename_mode = session.config.get_profile_option(
            session.selected_profile, "filename_mode", "ascii")

    return DownloadOptions(
        # Directory settings
        output_dir=output_dir,

        # Selection options
        all=options.get("all"),
        asins=tuple(options.get("asin")),
        titles=tuple(options.get("title")),

        # Content type options
        aax=options.get("aax"),
        aaxc=options.get("aaxc"),
        aax_fallback=options.get("aax_fallback"),
        annotation=options.get("annotation"),
        chapters=options.get("chapter"),
        cover=options.get("cover"),
        pdf=options.get("pdf"),

        # Quality and formatting options
        quality=options.get("quality"),
        cover_sizes=list(set(options.get("cover_size"))),
        chapter_type=chapter_type,
        filename_mode=filename_mode,
        filename_length=options.get("filename_length"),

        # Processing options
        sim_jobs=options.get("jobs"),
        overwrite_existing=options.get("overwrite"),
        ignore_errors=options.get("ignore_errors"),
        no_confirm=options.get("no_confirm"),

        # Podcast handling
        resolve_podcasts=options.get("resolve_podcasts"),
        ignore_podcasts=options.get("ignore_podcasts"),

        # Date and batch options
        start_date=session.params.get("start_date"),
        end_date=session.params.get("end_date"),
        bunch_size=session.params.get("bunch_size")
    )


async def fetch_library(api_client: AsyncClient, options: DownloadOptions) -> Library:
    # fetch the user library
    library = await Library.from_api_full_sync(
        api_client,
        image_sizes=", ".join(options.cover_sizes),
        bunch_size=options.bunch_size,
        response_groups=(
            "product_desc, media, product_attrs, relationships, "
            "series, customer_rights, pdf_url"
        ),
        start_date=options.start_date,
        end_date=options.end_date,
        status="Active",
    )

    if options.resolve_podcasts:
        await library.resolve_podcasts(start_date=options.start_date, end_date=options.end_date)
        [library.data.remove(i) for i in library if i.is_parent_podcast()]

    return library


def collect_items_by_asin(
        library: Library,
        asins: tuple[str, ...],
        ignore_errors: bool
) -> list[LibraryItem]:
    """Collect library items based on provided ASINs."""
    items = []

    for asin in asins:
        if library.has_asin(asin):
            item = library.get_item_by_asin(asin)
            items.append(item)
        else:
            if not ignore_errors:
                logger.error("Asin %s: Not found in library.", asin)
                raise click.Abort()
            logger.error("Skip asin %s: Not found in library.", asin)

    return items


async def collect_items_by_title(
        library: Library,
        titles: tuple[str, ...],
        no_confirm: bool
) -> list[LibraryItem]:
    """Collect library items based on provided titles."""
    items = []

    for title in titles:
        match = library.search_item_by_title(title)
        full_match = [i for i in match if i[1] == 100]

        if not match:
            logger.error("Skip title %s: Not found in library.", title)
            continue

        if no_confirm:
            # Add all matches automatically if no confirmation needed
            items.extend([i[0] for i in full_match or match])
        else:
            # Ask user to select which items to download
            choices = [
                questionary.Choice(
                    title=f"{i[0].asin} # {i[0].full_title}",
                    value=i[0].asin
                ) for i in full_match or match
            ]

            answer = await questionary.checkbox(
                f"Found the following matches for '{title}'. Which you want to download?",
                choices=choices
            ).unsafe_ask_async()

            if answer:
                items.extend([library.get_item_by_asin(i) for i in answer])

    return items


@dataclass
class DownloadJob:
    item: LibraryItem
    options: DownloadOptions
    client: httpx.AsyncClient
    queue: asyncio.Queue
    counter: DownloadCounter

    def create_base_filename(self) -> str:
        """Create a base filename for the item."""
        return self.item.create_base_filename(
            self.options.filename_mode, self.options.filename_length
        )

    async def add_to_queue(self) -> None:
        if self.options.cover:
            cmd = download_covers
            await self.queue.put((cmd, self))

        if self.options.pdf:
            cmd = download_pdf
            await self.queue.put((cmd, self))

        if self.options.chapters:
            cmd = download_chapters
            await self.queue.put((cmd, self))

        if self.options.annotation:
            cmd = download_annotations
            await self.queue.put((cmd, self))

        if self.options.aax or self.options.aax_fallback:
            cmd = download_aax
            await self.queue.put((cmd, self))

        if self.options.aaxc:
            cmd = download_aaxc
            await self.queue.put((cmd, self))


async def create_download_jobs(
    items: list[LibraryItem],
    options: DownloadOptions,
    client: httpx.AsyncClient,
    queue: asyncio.Queue,
    counter: DownloadCounter,
) -> list[DownloadJob]:
    """Process library items and prepare them for download.

    Handles parent podcasts by fetching their child items and creating appropriate
    directories for podcast content.
    """
    processed_items = []

    # Process all items in the list
    for item in items:
        # Skip already processed items
        if item.asin in [i.item.asin for i in processed_items]:
            continue

        # Handle parent podcasts
        if item.is_parent_podcast():
            if options.ignore_podcasts:
                continue

            # Fetch child items if needed
            if item._children is None:
                await item.get_child_items(
                    start_date=options.start_date,
                    end_date=options.end_date
                )

            # Create dedicated directory for podcast
            podcast_dir = item.create_base_filename(options.filename_mode)
            output_dir = options.output_dir / podcast_dir
            if not output_dir.is_dir():
                output_dir.mkdir(parents=True)

            # Set up custom options for podcast children
            options_for_children = options.copy_with(output_dir=output_dir)

            # Add child items to processing queue with custom output directory
            for child_item in item._children:
                # Add to the process queue if not already included
                if child_item.asin not in [i.asin for i in processed_items]:
                    download_job = DownloadJob(child_item, options_for_children, client, queue, counter)
                    processed_items.append(download_job)
        else:
            download_job = DownloadJob(item, options, client, queue, counter)
            processed_items.append(download_job)

    return processed_items


async def download_covers(job: DownloadJob) -> None:
    base_filename = job.create_base_filename()

    for cover_size in job.options.cover_sizes:
        filename = f"{base_filename}_({cover_size!s}).jpg"
        filepath = job.options.output_dir / filename

        url = job.item.get_cover_url(cover_size)
        if url is None:
            logger.error(
                "Cover size %s notfound for %s}", cover_size, job.item.full_title
            )
            return None

        dl = Downloader(url, filepath, job.client, job.options.overwrite_existing, "image/jpeg")
        downloaded = await dl.run(stream=False, pb=False)
        if downloaded:
            job.counter.count("cover")
    return None


async def download_pdf(job: DownloadJob) -> None:
    url = job.item.get_pdf_url()
    if url is None:
        logger.info("No PDF found for %s", job.item.full_title)
        return None

    base_filename = job.create_base_filename()
    filename = base_filename + ".pdf"
    filepath = job.options.output_dir / filename
    dl = Downloader(
        url, filepath, job.client, job.options.overwrite_existing,
        ["application/octet-stream", "application/pdf"]
    )
    downloaded = await dl.run(stream=False, pb=False)

    if downloaded:
        job.counter.count("pdf")
    return None


async def download_chapters(job: DownloadJob) -> None:
    options = job.options
    if not options.output_dir.is_dir():
        raise DirectoryDoesNotExists(options.output_dir)

    base_filename = job.create_base_filename()
    filename = base_filename + "-chapters.json"
    file = options.output_dir / filename
    if file.exists() and not options.overwrite_existing:
        logger.info("File %s already exists. Skip saving chapters", file)
        return None

    try:
        metadata = await job.item.get_content_metadata(job.options.quality, chapter_type=options.chapter_type)
    except NotFoundError:
        logger.info("No chapters found for %s", job.item.full_title)
        return None

    metadata = json.dumps(metadata, indent=4)
    async with aiofiles.open(file, "w") as f:
        await f.write(metadata)
    logger.info("Chapter file saved in style '%s' to %s.", options.chapter_type.upper(), file)
    job.counter.count("chapter")
    return None


async def download_annotations(job: DownloadJob) -> None:
    options = job.options
    if not options.output_dir.is_dir():
        raise DirectoryDoesNotExists(options.output_dir)

    base_filename = job.create_base_filename()
    filename = base_filename + "-annotations.json"
    file = options.output_dir / filename
    if file.exists() and not options.overwrite_existing:
        logger.info("File %s already exists. Skip saving annotations", file)
        return None

    try:
        annotation = await job.item.get_annotations()
    except NotFoundError:
        logger.info("No annotations found for %s.", job.item.full_title)
        return None
    except RequestError:
        logger.error("Failed to get annotations for %s.", job.item.full_title)
        return None

    annotation = json.dumps(annotation, indent=4)
    async with aiofiles.open(file, "w") as f:
        await f.write(annotation)
    logger.info("Annotation file saved to %s.", file)
    job.counter.count("annotation")
    return None


async def _get_audioparts(job: DownloadJob) -> list[LibraryItem]:
    parts = []
    child_library: Library = await job.item.get_child_items()
    if child_library is not None:
        for child in child_library:
            if (
                child.content_delivery_type is not None
                and child.content_delivery_type == "AudioPart"
            ):
                parts.append(child)

    return parts


async def _add_audioparts_to_queue(job: DownloadJob, download_mode: str) -> None:
    parts = await _get_audioparts(job)

    jobs = []
    for part in parts:
        logger.info("Item %s has audio parts. Adding parts to queue.",part.full_title)

        if download_mode == "aax":
            options = job.options.copy_with(
                aax=True,
                aax_fallback=False,
                aaxc=False,
                annotation=False,
                chapters=False,
                cover=False,
                pdf=False
            )

        else:
            options = job.options.copy_with(
                aax=False,
                aax_fallback=False,
                aaxc=True,
                annotation=False,
                chapters=False,
                cover=False,
                pdf=False
            )

        part_job = DownloadJob(
            item=part,
            options=options,
            client=job.client,
            queue=job.queue,
            counter=job.counter
        )
        jobs.append(part_job)

    # Set max_workers to 0 to avoid put None into queue which will close the consumer task
    max_workers = 0
    await produce_jobs(jobs, job.queue, max_workers)


async def download_aax(job: DownloadJob) -> None:
    # url, codec = await item.get_aax_url(quality)
    options = job.options
    try:
        url, codec = await job.item.get_aax_url_old(options.quality)
    except NotDownloadableAsAAX:
        if options.aax_fallback:
            logger.info("Fallback to aaxc for %s", job.item.full_title)
            return await download_aaxc(job)
        raise

    base_filename = job.create_base_filename()
    filename = base_filename + f"-{codec}.aax"
    filepath = options.output_dir / filename

    dl = NewDownloader(
        source=url,
        client=job.client,
        expected_types=[
            "audio/aax", "audio/vnd.audible.aax", "audio/audible"
        ]
    )
    downloaded = await dl.run(target=filepath, force_reload=options.overwrite_existing)

    if downloaded.status == Status.Success:
        job.counter.count("aax")
    elif downloaded.status == Status.DownloadIndividualParts:
        logger.info("Item %s must be downloaded in parts. Adding parts to queue", filepath)
        await _add_audioparts_to_queue(
            job,
            download_mode="aax"
        )
    return None


async def _reuse_voucher(lr_file, job: DownloadJob) -> tuple[dict, httpx.URL, str]:
    logger.info("Loading data from voucher file %s.", lr_file)
    async with aiofiles.open(lr_file) as f:
        lr = await f.read()
    lr = json.loads(lr)
    content_license = lr["content_license"]

    if not content_license["status_code"] == "Granted":
        raise AudibleCliException("License not granted")

    # try to get the user id
    user_id = None
    if job.item._client is not None:
        auth = job.item._client.auth
        if auth.customer_info is not None:
            user_id = auth.customer_info.get("user_id")

    # Verification of allowed user
    if user_id is None:
        logger.debug("No user id found. Skip user verification.")
    elif "allowed_users" in content_license:
        allowed_users = content_license["allowed_users"]
        if allowed_users and user_id not in allowed_users:
            # Don't proceed here to prevent an overwriting voucher file
            msg = f"The current user is not entitled to use the voucher {lr_file}."
            raise AudibleCliException(msg)
    else:
        logger.debug("%s does not contain allowed users key.", lr_file)

    # Verification of voucher validity
    if "refresh_date" in content_license:
        refresh_date = content_license["refresh_date"]
        refresh_date = datetime_type.convert(refresh_date, None, None)
        if refresh_date < datetime.now(timezone.utc):
            raise VoucherNeedRefresh(lr_file)

    content_metadata = content_license["content_metadata"]
    url = httpx.URL(content_metadata["content_url"]["offline_url"])
    codec = content_metadata["content_reference"]["content_format"]

    expires = url.params.get("Expires")
    if expires:
        expires = datetime.fromtimestamp(int(expires), timezone.utc)
        now = datetime.now(timezone.utc)
        if expires < now:
            raise DownloadUrlExpired(lr_file)

    return lr, url, codec


async def download_aaxc(job: DownloadJob) -> None:
    lr, url, codec = None, None, None
    options = job.options
    base_filename = job.create_base_filename()

    # https://github.com/mkb79/audible-cli/issues/60
    if not options.overwrite_existing:
        codec, _ = job.item._get_codec(options.quality)
        if codec is not None:
            filepath = pathlib.Path(
                options.output_dir) / f"{base_filename}-{codec}.aaxc"
            lr_file = filepath.with_suffix(".voucher")

            if lr_file.is_file():
                if filepath.is_file():
                    logger.info("File %s already exists. Skip download.", lr_file)
                    logger.info("File %s already exists. Skip download.", filepath)
                    return None

                try:
                    lr, url, codec = await _reuse_voucher(lr_file, job)
                except DownloadUrlExpired:
                    logger.debug("Download url in %s is expired. Refreshing license.", lr_file)
                except VoucherNeedRefresh:
                    logger.debug("Refresh date for voucher %s reached. Refreshing license.", lr_file)

    is_aycl = job.item.benefit_id == "AYCL"

    new_license = False
    if lr is None or url is None or codec is None:
        url, codec, lr = await job.item.get_aaxc_url(options.quality)
        new_license = True
        job.counter.count("voucher")
        if is_aycl:
            job.counter.count("aycl_voucher")

    if codec.lower() == "mpeg":
        ext = "mp3"
    else:
        ext = "aaxc"

    filepath = pathlib.Path(
        options.output_dir) / f"{base_filename}-{codec}.{ext}"
    lr_file = filepath.with_suffix(".voucher")

    if lr_file.is_file() and not new_license:
        logger.info("File %s already exists. Skip download.", lr_file)
    else:
        lr = json.dumps(lr, indent=4)
        async with aiofiles.open(lr_file, "w") as f:
            await f.write(lr)
        logger.info("Voucher file saved to %s.", lr_file)
        job.counter.count("voucher_saved")

    dl = NewDownloader(
        source=url,
        client=job.client,
        expected_types=[
            "audio/aax", "audio/vnd.audible.aax", "audio/mpeg", "audio/x-m4a",
            "audio/audible"
        ],
    )
    downloaded = await dl.run(target=filepath, force_reload=options.overwrite_existing)

    if downloaded.status == Status.Success:
        job.counter.count("aaxc")
        if is_aycl:
            job.counter.count("aycl")
    elif downloaded.status == Status.DownloadIndividualParts:
        logger.info("Item %s must be downloaded in parts. Adding parts to queue", filepath)
        await _add_audioparts_to_queue(
            job,
            download_mode="aaxc"
        )
    return None


async def produce_jobs(jobs: list[DownloadJob], queue: asyncio.Queue, max_workers: int) -> None:
    """Add a download job to the queue with appropriate options."""
    for job in jobs:
        await job.add_to_queue()

    for _ in range(max_workers):
        await queue.put(None)


async def consume_jobs(queue: asyncio.Queue) -> None:
    while True:
        item = await queue.get()
        if item is None:
            break
        cmd, job = item

        try:
            result = cmd(job)
            await result
        except Exception as e:
            if not job.options.ignore_errors:
                raise
            else:
                logger.error(e)

        finally:
            queue.task_done()


@click.command("download2")
@click.option(
    "--output-dir", "-o",
    type=click.Path(exists=True, dir_okay=True),
    default=pathlib.Path().cwd(),
    help="Directory where downloaded files will be saved (defaults to current working directory)"
)
@click.option(
    "--all",
    is_flag=True,
    help="Download all books from your library (overrides the --asin and --title options)"
)
@click.option(
    "--asin", "-a",
    multiple=True,
    help="ASIN(s) of the audiobook(s) to download (can be specified multiple times)"
)
@click.option(
    "--title", "-t",
    multiple=True,
    help="tile of the audiobook (partial search)"
)
@click.option(
    "--aax",
    is_flag=True,
    help="Download book in aax format"
)
@click.option(
    "--aaxc",
    is_flag=True,
    help="Download book in aaxc format incl. voucher file"
)
@click.option(
    "--aax-fallback",
    is_flag=True,
    help="Download book in aax format and fallback to aaxc, if former is not supported."
)
@click.option(
    "--quality", "-q",
    default="best",
    show_default=True,
    type=click.Choice(["best", "high", "normal"]),
    help="download quality"
)
@click.option(
    "--pdf",
    is_flag=True,
    help="downloads the pdf in addition to the audiobook"
)
@click.option(
    "--cover",
    is_flag=True,
    help="downloads the cover in addition to the audiobook"
)
@click.option(
    "--cover-size",
    type=click.Choice(["252", "315", "360", "408", "500", "558", "570", "882",
                       "900", "1215"]),
    default=["500"],
    multiple=True,
    help="The cover pixel size. This option can be provided multiple times."
)
@click.option(
    "--chapter",
    is_flag=True,
    help="Saves chapter metadata as JSON file."
)
@click.option(
    "--chapter-type",
    default="config",
    type=click.Choice(["Flat", "Tree", "config"], case_sensitive=False),
    help="The chapter type."
)
@click.option(
    "--annotation",
    is_flag=True,
    help="saves the annotations (e.g. bookmarks, notes) as JSON file"
)
@start_date_option
@end_date_option
@click.option(
    "--no-confirm", "-y",
    is_flag=True,
    help="start without confirm"
)
@click.option(
    "--overwrite",
    is_flag=True,
    help="rename existing files"
)
@click.option(
    "--ignore-errors",
    is_flag=True,
    help="ignore errors and continue with the rest"
)
@click.option(
    "--jobs", "-j",
    type=int,
    default=3,
    show_default=True,
    help="number of simultaneous downloads"
)
@click.option(
    "--filename-mode", "-f",
    type=click.Choice(
        ["config", "ascii", "asin_ascii", "unicode", "asin_unicode", "asin_only"]
    ),
    default="config",
    help="Filename mode to use. [default: config]"
)
@click.option(
    "--filename-length",
    "-l",
    default=230,
    show_default=True,
    help="Maximum filename length.",
)
@timeout_option
@click.option(
    "--resolve-podcasts",
    is_flag=True,
    help="Resolve podcasts to download a single episode via asin or title"
)
@click.option(
    "--ignore-podcasts",
    is_flag=True,
    help="Ignore a podcast if it have episodes"
)
@bunch_size_option
@pass_session
@pass_client(headers=CLIENT_HEADERS)
async def cli(session: Session, api_client: AsyncClient, **params: Any):
    """Download audiobook(s) from an Audible library."""
    options = parse_options(session, params)

    library = await fetch_library(api_client, options)

    # collect items to download
    items = []

    if options.all:
        items = list(library)
    else:
        # Collect items by ASIN
        asin_items = collect_items_by_asin(library, options.asins, options.ignore_errors)
        items.extend(asin_items)

        # Collect items by title
        title_items = await collect_items_by_title(library, options.titles, options.no_confirm)
        items.extend(title_items)

    queue = asyncio.Queue(options.sim_jobs)
    counter = DownloadCounter()
    max_workers = options.sim_jobs

    download_jobs = await create_download_jobs(items, options, api_client.session, queue, counter)

    consumers = [asyncio.create_task(consume_jobs(queue)) for _ in range(max_workers)]
    producer = asyncio.create_task(produce_jobs(download_jobs, queue, max_workers))
    tasks = [producer, *consumers]

    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
    for task in done:
        try:
            task.result()
        except Exception as e:
            logger.exception(e)
            break

    # Now cancel any remaining tasks:
    if pending:
        for task in pending:
            task.cancel()

        # wait for all canceled tasks to finish
        await asyncio.wait(pending)

    display_counter(counter)

mkb79 avatar Aug 23 '25 19:08 mkb79

You can run the command with audible download2 .... The rework fix issues with hanging jobs (if not ignore errors) and annotations, which are not available.

mkb79 avatar Aug 23 '25 19:08 mkb79

I've linked a branch to this issue which will solve your issue and more. Can you try it out please? For me it runs fine. Any suggestions are welcome!

mkb79 avatar Aug 24 '25 21:08 mkb79

So I've checked out the branch, and I am getting AttributeError: 'DownloadJob' object has no attribute 'asin'. The download2 command seems not to be available in the branch.

devvythelopper avatar Aug 25 '25 12:08 devvythelopper

@devvythelopper Can you give me the full command prompt please?

The command name is download in the branch and overriding the old download command.

mkb79 avatar Aug 25 '25 12:08 mkb79

A branch is a good idea. If you put whatever I can test inside that branch, so that it always has the newest code, I can comment on it, or add suggestions (pull requests?). I don't even know, can you grant commit rights for branches?

The command is:

audible -v DEBUG download --output-dir /#/big/media/audiobooks/#audible-cli/ --pdf --cover --annotation --chapter --chapter-type Flat --quality best --aaxc --all

One suggestion I have is to put the log entries into the job instead of into the place where the entry is added to the queue. The reason why I added the log entries in my previous PR was so that I could track the audiobook where I got the error. If the log entry is before the queue is even started and the jobs are being run, then the error message will not be close to the log message, and so tracking is impossible.

Also, since downloading is the main job of the download command anyways, I would recommend using info instead of debug:

logger.info("Adding annotations download job for %s", job.item.full_title)

Or, if you do not want to use the info log-level, I recommend adding another log level, VERBOSE.

This is something I've done for one of my own projects, and which I will do for every project in the future. The reason is that I feel that the available log-levels are insufficient. There is CRITICAL, ERROR, WARNING, INFO, and DEBUG. So any normal program message needs to be info, for it is neither an error or a warning of some kind. Debug messages are meant to debug issues, and I use them for finegrained things.

But for many command line utilities a VERBOSE switch makes sense, i.e. messages that are not yet debug level messages, but would clutter things for the user if they were part of the normal info log. So I add a VERBOSE log level to my loggers. (And if DEBUG is not finegrained I sometimes add TRACE as well).

It's really easy if yo use standard logging, not sure about the AudibleCliLogHelper though:

import logging

# python: DEBUG: 10, INFO: 20, WARNING: 30, ERROR: 40: CRITICAL: 50
# mine: TRACE: 5, VERBOSE: 15
VERBOSE = 15

logging.addLevelName(VERBOSE, "VERBOSE")

def verbose(self, message, *args, **kwargs):
    if self.isEnabledFor(VERBOSE):
        self._log(VERBOSE, message, args, **kwargs)

logging.Logger.notice = notice

logger = logging.getLogger(__name__)
logger.setLevel(VERBOSE)

logger.verbose("This is a notice message.")

I would put most of the log entries in cmd_download.py into the VERBOSE category, i.e.:

logger.verbose("Loading data from voucher file %s.", lr_file)

devvythelopper avatar Aug 25 '25 12:08 devvythelopper

I've made a pull request for the logging within the job (not the new log level): #236 Untested of course, since I still get the error.

devvythelopper avatar Aug 25 '25 12:08 devvythelopper

Here is the full stacktracke:

Uncaught Exception
Traceback (most recent call last):
  File "/#/projects/new/audible-cli/src/audible_cli/cli.py", line 62, in main
    sys.exit(cli(*args, **kwargs))
             ^^^^^^^^^^^^^^^^^^^^
  File "/#/projects/new/audible-cli/.venv/lib/python3.12/site-packages/click/core.py", line 1442, in __call__
    return self.main(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/#/projects/new/audible-cli/.venv/lib/python3.12/site-packages/click/core.py", line 1363, in main
    rv = self.invoke(ctx)
         ^^^^^^^^^^^^^^^^
  File "/#/projects/new/audible-cli/.venv/lib/python3.12/site-packages/click/core.py", line 1830, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/#/projects/new/audible-cli/.venv/lib/python3.12/site-packages/click/core.py", line 1226, in invoke
    return ctx.invoke(self.callback, **ctx.params)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/#/projects/new/audible-cli/.venv/lib/python3.12/site-packages/click/core.py", line 794, in invoke
    return callback(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/#/projects/new/audible-cli/.venv/lib/python3.12/site-packages/click/decorators.py", line 93, in new_func
    return ctx.invoke(f, obj, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/#/projects/new/audible-cli/.venv/lib/python3.12/site-packages/click/core.py", line 794, in invoke
    return callback(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/#/projects/new/audible-cli/.venv/lib/python3.12/site-packages/click/decorators.py", line 93, in new_func
    return ctx.invoke(f, obj, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/#/projects/new/audible-cli/.venv/lib/python3.12/site-packages/click/core.py", line 794, in invoke
    return callback(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/#/projects/new/audible-cli/src/audible_cli/decorators.py", line 23, in wrapper
    return asyncio.run(f(*args, ** kwargs))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/nix/store/h097imm3w6dpx10qynrd2sz9fks2wbq8-python3-3.12.11/lib/python3.12/asyncio/runners.py", line 195, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/nix/store/h097imm3w6dpx10qynrd2sz9fks2wbq8-python3-3.12.11/lib/python3.12/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/nix/store/h097imm3w6dpx10qynrd2sz9fks2wbq8-python3-3.12.11/lib/python3.12/asyncio/base_events.py", line 691, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/#/projects/new/audible-cli/src/audible_cli/decorators.py", line 49, in wrapper
    return await f(*args, client, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/#/projects/new/audible-cli/src/audible_cli/cmds/cmd_download.py", line 1057, in cli
    download_jobs = await create_download_jobs(items, options, api_client.session, queue, counter)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/#/projects/new/audible-cli/src/audible_cli/cmds/cmd_download.py", line 512, in create_download_jobs
    if child_item.asin not in [i.asin for i in processed_items]:
                               ^^^^^^
AttributeError: 'DownloadJob' object has no attribute 'asin'

devvythelopper avatar Aug 25 '25 12:08 devvythelopper

never mind, the error was just a typo in the code. instead of i.asin it has to be i.item.asin. Fixed it in my branch (#236).

Everything seems to work fine now. By fine I mean, that errors (and error messages) do not halt download progress anymore.

devvythelopper avatar Aug 25 '25 13:08 devvythelopper

@devvythelopper Yeah, I saw my mistake as I saw your traceback. I'm on the go and will back to you later.

mkb79 avatar Aug 25 '25 13:08 mkb79

I found another issue. When proving --aax, --aaxc, --pdf, --cover, --annotation and --chapter together the download command will quit sometime without error message but nothing is downloaded.

mkb79 avatar Aug 25 '25 13:08 mkb79

Actually I've just run into that as well. I'm only using --aaxc and not --aax though. It only seems to download my newest additions to my library, and then just fails without any notice. Maybe running it from the debugger will give you more insights?

devvythelopper avatar Aug 25 '25 13:08 devvythelopper

The following, too: I just realized I got an

error: Failed to get annotations for: Die Wunderübung.

But on the next run I got:

File /#/big/media/audiobooks/#audible-cli/Die_Wunderubung-annotations.json already exists. Skip saving annotations

And the weirdest thing is, that the -annotations.json file actually contains some json with some data. But then I deleted it, and it fails to download it again without any error messsage, it just says "No new files downloaded" and exits...

And when I run audible in a completely empty directory to download my whole library again, it seems to download much more and does not crash so soon...

devvythelopper avatar Aug 25 '25 13:08 devvythelopper

Here are some relevant log entries:

Checking cover for: [B00HVWBHG6] Die Wunderübung
File /#/big/media/audiobooks/#audible-cli/Die_Wunderubung_(500).jpg already exists. Skip download
debug: Adding chapters download job for Die Wunderübung
debug: Adding annotations download job for Die Wunderübung
debug: Adding AAXC download job for Die Wunderübung
Checking PDF for: [B00HVWBHG6] Die Wunderübung
No PDF found for Die Wunderübung
Checking chapters for: [B00HVWBHG6] Die Wunderübung
File /#/big/media/audiobooks/#audible-cli/Die_Wunderubung-chapters.json already exists. Skip saving chapters
Checking annotations for: [B00HVWBHG6] Die Wunderübung
debug: Adding cover download job for Saint Thomas Aquinas
debug: Adding PDF download job for Saint Thomas Aquinas
debug: Adding chapters download job for Saint Thomas Aquinas
Checking aaxc for: [B00HVWBHG6] Die Wunderübung
File /#/big/media/audiobooks/#audible-cli/Die_Wunderubung-AAX_44_128.voucher already exists. Skip download.
File /#/big/media/audiobooks/#audible-cli/Die_Wunderubung-AAX_44_128.aaxc already exists. Skip download.
Checking cover for: [B07JKTKRF9] Saint Thomas Aquinas
File /#/big/media/audiobooks/#audible-cli/Saint_Thomas_Aquinas_(500).jpg already exists. Skip download
Checking PDF for: [B07JKTKRF9] Saint Thomas Aquinas
No PDF found for Saint Thomas Aquinas
No new files downloaded.

The Checking ... log entries are from my branch, they happen at the beginning of the download_xyz function. As you can see it begins downloading the annotations, and I can confirm that it executes:

annotation = await job.item.get_annotations()

but this await call does not return before audible-cli exits.

devvythelopper avatar Aug 25 '25 13:08 devvythelopper

Need to correct my previous statement, the await call does "return", it throws

asyncio.exceptions.CancelledError

devvythelopper avatar Aug 25 '25 14:08 devvythelopper

Maybe the queue is closed because the queue is empty before the producer(s) can put another item into the queue?

I'll test this out by inserting await asyncio.sleep(0.1) in the produce_jobs function. To give up the control to the event loop for the next coro. And adding some debug log messages into the consumer jobs to signal creation and destroying of the consumers.

Maybe raise the number of items in the queue will help though. Currently, the size of the queue is equal to the count of consumers. When I change queue = SmartQueue(options.sim_jobs) to queue = SmartQueue(options.sim_jobs + 3) this could help.

Test this out.

mkb79 avatar Aug 25 '25 14:08 mkb79

Did you commit this into the branch? And does "Test this out." mean I should test it?

I recommend merging my PR #236 with this branch before continuing testing as it is mostly about improved logging.

devvythelopper avatar Aug 25 '25 14:08 devvythelopper

Did you commit this into the branch? And does "Test this out." mean I should test it?

I recommend merging my PR #236 with this branch before continuing testing as it is mostly about improved logging.

No I will.

mkb79 avatar Aug 25 '25 15:08 mkb79

@devvythelopper I've reworked the download queue, moved the queue to utils module and add some debug log messages.

Maybe this logging is enough for you? Otherwise feel free to add more commits to your logging pr!

If you are interested to participate on my project, feel free to accept my invitation as Collaborator!

Except the master branch feel free to push any commits directly to my repo. For integration in the master branch please open a pr. I'll look into this as fast as possible and will merge this.

mkb79 avatar Aug 25 '25 19:08 mkb79

I like the verbosity of the logging messages now. I think I might add one more thing, namely that at least some log messages contain both ASIN and title so that backtracking is easy (if I find the time soon).

I'll test whether it works (over night) loading the new items in my library. Looks good though.

Thank you for inviting my collaboration. I'll be happy to help when an issue comes up and I can make the time.

devvythelopper avatar Aug 27 '25 21:08 devvythelopper

This morning the following error was reported:

error: Task <Task finished name='Task-4' coro=<consume_jobs() done, defined at /#/projects/new/audible-cli/src/audible_cli/cmds/cmd_download.py:892> exception=AudibleCliException('The Complete Book of Five Rings is not downloadable.')> failed with: AudibleCliException('The Complete Book of Five Rings is not downloadable.')
error: Traceback (most recent call last):                                                                                                                                                                           
error:   File "/#/projects/new/audible-cli/src/audible_cli/cmds/cmd_download.py", line 903, in consume_jobs
error:     await cmd(job, *args)
error:   File "/#/projects/new/audible-cli/src/audible_cli/cmds/cmd_download.py", line 805, in download_aaxc
error:     url, codec, lr = await job.item.get_aaxc_url(options.quality)
error:                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
error:   File "/#/projects/new/audible-cli/src/audible_cli/models.py", line 322, in get_aaxc_url
error:     raise AudibleCliException(
error: audible_cli.exceptions.AudibleCliException: The Complete Book of Five Rings is not downloadable.

I'll look into it myself and report back.

By the way, in case you find some of my programming style "odd" lets say, the reason is, that I have a background in functional programming (think Haskell). And I have come to believe that functional programming warps your thinking, and thus enables you to be a better programmer. And functional programming allows you to create more reusable code. It also makes you separate code into smaller functions, that only serve one purpose.

devvythelopper avatar Aug 28 '25 09:08 devvythelopper

Maybe we should insert a try/except block on line 805. But this does not resolve the underlying issue why it does not download the book.

Will look later, too.

mkb79 avatar Aug 28 '25 09:08 mkb79

I think it is fine that the exception is thrown. If I use --ignore-errors downloads continue as expected.

The only thing that feels inconsistent, is that some errors stop audible-cli while others it continues automatically (without --ignore-errors).

devvythelopper avatar Aug 28 '25 10:08 devvythelopper

Also I have the following error

error: Error downloading /#/big/media/audiobooks/#audible-cli/The_New_Testament.pdf. Wrong content type. Expected type(s): ['application/octet-stream', 'application/pdf']; Got: text/html;charset=UTF-8; Message:  
error:
...

And then it dumps a whole html page to the error logs. The relevant message on that page (in german) is: "We can currently not access these additional contents".

I'll try and investigate the annotations download issue that keeps persisting.

devvythelopper avatar Aug 28 '25 10:08 devvythelopper

My guess is, that the annotations download issue is that for some of my audiobooks, the URL that is being used (https://cde-ta-g7g.amazon.com/FionaCDEServiceEngine/sidecar) does not work. I am not quite sure how to figure out the correct url, but I can give you a list of audiobooks for which it fails. Or, if you give me a hint, I can also test REST API calls.

devvythelopper avatar Aug 28 '25 10:08 devvythelopper