From 50dcfa14e79e1321af12751c1028e275159d52c9 Mon Sep 17 00:00:00 2001 From: Rafael Moraes <50295204+glomatico@users.noreply.github.com> Date: Thu, 23 Oct 2025 11:54:39 -0300 Subject: [PATCH] Refactor CLI utility classes and functions to utils.py --- gamdl/cli/cli.py | 114 ++------------------------------------------- gamdl/cli/utils.py | 112 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 116 insertions(+), 110 deletions(-) create mode 100644 gamdl/cli/utils.py diff --git a/gamdl/cli/cli.py b/gamdl/cli/cli.py index d655b92..ef7bf12 100644 --- a/gamdl/cli/cli.py +++ b/gamdl/cli/cli.py @@ -1,8 +1,5 @@ -import asyncio import inspect import logging -import typing -from functools import wraps from pathlib import Path import click @@ -31,9 +28,9 @@ from ..interface import ( SyncedLyricsFormat, UploadedVideoQuality, ) -from .config_file import ConfigFile from .constants import X_NOT_IN_PATH from .custom_logger_formatter import CustomLoggerFormatter +from .utils import Csv, PathPrompt, load_config_file, make_sync logger = logging.getLogger(__name__) @@ -46,112 +43,6 @@ uploaded_video_downloader_sig = inspect.signature( ) -class Csv(click.ParamType): - name = "csv" - - def __init__( - self, - subtype: typing.Any, - ) -> None: - self.subtype = subtype - - def convert( - self, - value: str | typing.Any, - param: click.Parameter, - ctx: click.Context, - ) -> list[typing.Any]: - if not isinstance(value, str): - return value - - items = [v.strip() for v in value.split(",") if v.strip()] - result = [] - - for item in items: - try: - result.append(self.subtype(item)) - except ValueError as e: - self.fail( - f"'{item}' is not a valid value for {self.subtype.__name__}", - param, - ctx, - ) - return result - - -class PathPrompt(click.ParamType): - name = "path" - - def __init__(self, is_file: bool = False) -> None: - self.is_file = is_file - - def convert( - self, - value: str | typing.Any, - param: click.Parameter, - ctx: click.Context, - ) -> str: - if not isinstance(value, str): - return value - - path_validator = click.Path( - exists=True, - file_okay=self.is_file, - dir_okay=not self.is_file, - ) - path_type = "file" if self.is_file else "directory" - while True: - try: - result = path_validator.convert(value, None, None) - break - except click.BadParameter as e: - value = click.prompt( - ( - f'{path_type.capitalize()} "{Path(value).absolute()}" does not exist. ' - f"Create the {path_type} at the specified path, " - f"type a new path or drag and drop the {path_type} here. " - "Then, press enter to continue" - ), - default=value, - show_default=False, - ) - value = value.strip('"') - return result - - -def load_config_file( - ctx: click.Context, - param: click.Parameter, - no_config_file: bool, -) -> click.Context: - if no_config_file: - return ctx - - config_file = ConfigFile(ctx.params["config_path"]) - config_file.add_params_default_to_config( - ctx.command.params, - ) - parsed_params = config_file.parse_params_from_config( - [ - param - for param in ctx.command.params - if ctx.get_parameter_source(param.name) - != click.core.ParameterSource.COMMANDLINE - ] - ) - ctx.params.update(parsed_params) - - return ctx - - -def make_sync(func): - @wraps(func) - def wrapper(*args, **kwargs): - return asyncio.run(func(*args, **kwargs)) - - return wrapper - - @click.command() @click.help_option("-h", "--help") @click.version_option(__version__, "-v", "--version") @@ -636,8 +527,10 @@ async def main( url_progress + f' Error processing "{url}"', exc_info=not no_exceptions, ) + if not download_queue: continue + for download_index, download_item in enumerate( download_queue, 1, @@ -655,6 +548,7 @@ async def main( else "Unknown Title" ) logger.info(download_queue_progress + f' Downloading "{media_title}"') + try: await downloader.download(download_item) except ( diff --git a/gamdl/cli/utils.py b/gamdl/cli/utils.py new file mode 100644 index 0000000..2e4b3ce --- /dev/null +++ b/gamdl/cli/utils.py @@ -0,0 +1,112 @@ +import asyncio +import typing +from functools import wraps +from pathlib import Path +import click +from .config_file import ConfigFile + + +class Csv(click.ParamType): + name = "csv" + + def __init__( + self, + subtype: typing.Any, + ) -> None: + self.subtype = subtype + + def convert( + self, + value: str | typing.Any, + param: click.Parameter, + ctx: click.Context, + ) -> list[typing.Any]: + if not isinstance(value, str): + return value + + items = [v.strip() for v in value.split(",") if v.strip()] + result = [] + + for item in items: + try: + result.append(self.subtype(item)) + except ValueError as e: + self.fail( + f"'{item}' is not a valid value for {self.subtype.__name__}", + param, + ctx, + ) + return result + + +class PathPrompt(click.ParamType): + name = "path" + + def __init__(self, is_file: bool = False) -> None: + self.is_file = is_file + + def convert( + self, + value: str | typing.Any, + param: click.Parameter, + ctx: click.Context, + ) -> str: + if not isinstance(value, str): + return value + + path_validator = click.Path( + exists=True, + file_okay=self.is_file, + dir_okay=not self.is_file, + ) + path_type = "file" if self.is_file else "directory" + while True: + try: + result = path_validator.convert(value, None, None) + break + except click.BadParameter as e: + value = click.prompt( + ( + f'{path_type.capitalize()} "{Path(value).absolute()}" does not exist. ' + f"Create the {path_type} at the specified path, " + f"type a new path or drag and drop the {path_type} here. " + "Then, press enter to continue" + ), + default=value, + show_default=False, + ) + value = value.strip('"') + return result + + +def load_config_file( + ctx: click.Context, + param: click.Parameter, + no_config_file: bool, +) -> click.Context: + if no_config_file: + return ctx + + config_file = ConfigFile(ctx.params["config_path"]) + config_file.add_params_default_to_config( + ctx.command.params, + ) + parsed_params = config_file.parse_params_from_config( + [ + param + for param in ctx.command.params + if ctx.get_parameter_source(param.name) + != click.core.ParameterSource.COMMANDLINE + ] + ) + ctx.params.update(parsed_params) + + return ctx + + +def make_sync(func): + @wraps(func) + def wrapper(*args, **kwargs): + return asyncio.run(func(*args, **kwargs)) + + return wrapper