Refactor CLI module

This commit is contained in:
Rafael Moraes
2026-04-21 10:15:49 -03:00
parent 9b76ab90a7
commit 30b3f36905
4 changed files with 541 additions and 321 deletions
+117 -143
View File
@@ -5,11 +5,13 @@ from pathlib import Path
import click
import colorama
import structlog
from dataclass_click import dataclass_click
from httpx import ConnectError
from .. import __version__
from ..api import AppleMusicApi, ItunesApi
from ..api import AppleMusicApi
import traceback
from ..downloader import (
AppleMusicBaseDownloader,
AppleMusicDownloader,
@@ -17,23 +19,27 @@ from ..downloader import (
AppleMusicSongDownloader,
AppleMusicUploadedVideoDownloader,
DownloadItem,
DownloadMode,
GamdlError,
RemuxMode,
GamdlDownloaderSyncedLyricsOnlyError,
GamdlDownloaderMediaFileExistsError,
GamdlDownloaderDependencyNotFoundError,
)
from ..interface import (
AppleMusicBaseInterface,
AppleMusicInterface,
AppleMusicMusicVideoInterface,
AppleMusicSongInterface,
AppleMusicUploadedVideoInterface,
SongCodec,
GamdlInterfaceArtistMediaTypeError,
GamdlInterfaceDecryptionNotAvailableError,
GamdlInterfaceFormatNotAvailableError,
GamdlInterfaceMediaNotStreamableError,
)
from .cli_config import CliConfig
from .config_file import ConfigFile
from .constants import X_NOT_IN_PATH
from .utils import CustomLoggerFormatter, prompt_path
from .utils import custom_structlog_formatter, prompt_path
from .interactive_prompts import InteractivePrompts
logger = logging.getLogger(__name__)
logger = structlog.get_logger(__name__)
def make_sync(func):
@@ -58,14 +64,22 @@ async def main(config: CliConfig):
root_logger.propagate = False
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(CustomLoggerFormatter())
stream_handler.setFormatter(logging.Formatter("%(message)s"))
root_logger.addHandler(stream_handler)
if config.log_file:
file_handler = logging.FileHandler(config.log_file, encoding="utf-8")
file_handler.setFormatter(CustomLoggerFormatter(use_colors=False))
file_handler.setFormatter(logging.Formatter("%(message)s"))
root_logger.addHandler(file_handler)
structlog.configure(
processors=[
structlog.processors.add_log_level,
custom_structlog_formatter,
],
logger_factory=structlog.stdlib.LoggerFactory(),
)
logger.info(f"Starting Gamdl {__version__}")
if config.use_wrapper:
@@ -87,38 +101,73 @@ async def main(config: CliConfig):
language=config.language,
)
itunes_api = ItunesApi(
apple_music_api.storefront,
apple_music_api.language,
)
if not apple_music_api.active_subscription:
logger.critical(
"No active Apple Music subscription found, you won't be able to download"
" anything"
)
return
if apple_music_api.account_restrictions:
logger.warning(
"Your account has content restrictions enabled, some content may not be"
" downloadable"
)
interface = AppleMusicInterface(
apple_music_api,
itunes_api,
if (
any(not codec.is_legacy() for codec in config.song_codec_piority)
and not config.use_wrapper
):
logger.warning(
"You have chosen an experimental song codec "
"without enabling wrapper. "
"They're not guaranteed to work due to API limitations."
)
interactive_prompts = InteractivePrompts(
artist_auto_select=config.artist_auto_select,
)
base_interface = await AppleMusicBaseInterface.create(
apple_music_api=apple_music_api,
cover_format=config.cover_format,
cover_size=config.cover_size,
wvd_path=config.wvd_path,
)
song_interface = AppleMusicSongInterface(
base=base_interface,
synced_lyrics_format=config.synced_lyrics_format,
codec_priority=config.song_codec_piority,
use_album_date=config.use_album_date,
skip_decryption_key_non_legacy=config.use_wrapper,
ask_codec_function=interactive_prompts.ask_song_codec,
)
music_video_interface = AppleMusicMusicVideoInterface(
base=base_interface,
resolution=config.music_video_resolution,
codec_priority=config.music_video_codec_priority,
ask_video_codec_function=interactive_prompts.ask_music_video_video_codec_function,
ask_audio_codec_function=interactive_prompts.ask_music_video_audio_codec_function,
)
uploaded_video_interface = AppleMusicUploadedVideoInterface(
base=base_interface,
quality=config.uploaded_video_quality,
ask_quality_function=interactive_prompts.ask_uploaded_video_quality_function,
)
interface = AppleMusicInterface(
song=song_interface,
music_video=music_video_interface,
uploaded_video=uploaded_video_interface,
artist_select_media_type_function=interactive_prompts.ask_artist_media_type,
artist_select_items_function=interactive_prompts.ask_artist_select_items,
)
song_interface = AppleMusicSongInterface(interface)
music_video_interface = AppleMusicMusicVideoInterface(interface)
uploaded_video_interface = AppleMusicUploadedVideoInterface(interface)
base_downloader = AppleMusicBaseDownloader(
interface=interface,
output_path=config.output_path,
temp_path=config.temp_path,
wvd_path=config.wvd_path,
overwrite=config.overwrite,
save_cover=config.save_cover,
save_playlist=config.save_playlist,
nm3u8dlre_path=config.nm3u8dlre_path,
mp4decrypt_path=config.mp4decrypt_path,
ffmpeg_path=config.ffmpeg_path,
@@ -126,102 +175,42 @@ async def main(config: CliConfig):
use_wrapper=config.use_wrapper,
wrapper_decrypt_ip=config.wrapper_decrypt_ip,
download_mode=config.download_mode,
cover_format=config.cover_format,
album_folder_template=config.album_folder_template,
compilation_folder_template=config.compilation_folder_template,
no_album_folder_template=config.no_album_folder_template,
playlist_folder_template=config.playlist_folder_template,
single_disc_file_template=config.single_disc_file_template,
multi_disc_file_template=config.multi_disc_file_template,
no_album_file_template=config.no_album_file_template,
playlist_file_template=config.playlist_file_template,
date_tag_template=config.date_tag_template,
exclude_tags=config.exclude_tags,
cover_size=config.cover_size,
truncate=config.truncate,
)
song_downloader = AppleMusicSongDownloader(
base_downloader=base_downloader,
interface=song_interface,
codec_priority=config.song_codec_piority,
synced_lyrics_format=config.synced_lyrics_format,
no_synced_lyrics=config.no_synced_lyrics,
synced_lyrics_only=config.synced_lyrics_only,
use_album_date=config.use_album_date,
fetch_extra_tags=config.fetch_extra_tags,
base=base_downloader,
)
music_video_downloader = AppleMusicMusicVideoDownloader(
base_downloader=base_downloader,
interface=music_video_interface,
codec_priority=config.music_video_codec_priority,
base=base_downloader,
remux_mode=config.music_video_remux_mode,
remux_format=config.music_video_remux_format,
resolution=config.music_video_resolution,
)
uploaded_video_downloader = AppleMusicUploadedVideoDownloader(
base_downloader=base_downloader,
interface=uploaded_video_interface,
quality=config.uploaded_video_quality,
base=base_downloader,
)
downloader = AppleMusicDownloader(
interface=interface,
base_downloader=base_downloader,
song_downloader=song_downloader,
music_video_downloader=music_video_downloader,
uploaded_video_downloader=uploaded_video_downloader,
artist_auto_select=config.artist_auto_select,
song=song_downloader,
music_video=music_video_downloader,
uploaded_video=uploaded_video_downloader,
overwrite=config.overwrite,
save_cover=config.save_cover,
save_playlist=config.save_playlist,
no_synced_lyrics=config.no_synced_lyrics,
synced_lyrics_only=config.synced_lyrics_only,
)
if not config.synced_lyrics_only:
if (
config.download_mode == DownloadMode.NM3U8DLRE
and not base_downloader.full_nm3u8dlre_path
):
logger.critical(X_NOT_IN_PATH.format("N_m3u8DL-RE", config.nm3u8dlre_path))
return
missing_music_video_paths = []
if not base_downloader.full_ffmpeg_path and (
config.music_video_remux_mode == RemuxMode.FFMPEG
or config.download_mode == DownloadMode.NM3U8DLRE
):
missing_music_video_paths.append(
X_NOT_IN_PATH.format("ffmpeg", config.ffmpeg_path)
)
if (
not base_downloader.full_mp4box_path
and config.music_video_remux_mode == RemuxMode.MP4BOX
):
missing_music_video_paths.append(
X_NOT_IN_PATH.format("MP4Box", config.mp4box_path)
)
if not base_downloader.full_mp4decrypt_path and (
config.song_codec_piority
not in (SongCodec.AAC_LEGACY, SongCodec.AAC_HE_LEGACY)
or config.music_video_remux_mode == RemuxMode.MP4BOX
):
missing_music_video_paths.append(
X_NOT_IN_PATH.format("mp4decrypt", config.mp4decrypt_path)
)
if missing_music_video_paths:
logger.warning(
"Music videos will not be downloaded due to missing dependencies:\n"
+ "\n".join(missing_music_video_paths)
)
if (
any(not codec.is_legacy() for codec in config.song_codec_piority)
and not config.use_wrapper
):
logger.warning(
"You have chosen an experimental song codec "
"without enabling wrapper. "
"They're not guaranteed to work due to API limitations."
)
if config.read_urls_as_txt:
urls_from_file = []
for url in config.urls:
@@ -239,68 +228,53 @@ async def main(config: CliConfig):
error_count = 0
for url_index, url in enumerate(urls, 1):
url_progress = click.style(f"[URL {url_index}/{len(urls)}]", dim=True)
logger.info(url_progress + f' Processing "{url}"')
download_queue = None
url_log = logger.bind(action=f"URL {url_index:>3}/{len(urls):<3}")
url_log.info(f'Processing "{url}"')
try:
url_info = downloader.get_url_info(url)
if not url_info:
logger.warning(
url_progress + f' Could not parse "{url}", skipping.',
)
continue
download_queue = await downloader.get_download_queue(url_info)
if not download_queue:
logger.warning(
url_progress
+ f' No downloadable media found for "{url}", skipping.',
)
continue
except KeyboardInterrupt:
exit(1)
download_queue: list[DownloadItem] = []
async for media in downloader.get_download_item_from_url(url):
download_queue.append(media)
except Exception as e:
url_log.error(f'Error processing "{url}": {e}')
error_count += 1
logger.error(
url_progress + f' Error processing "{url}"',
exc_info=not config.no_exceptions,
)
if not download_queue:
continue
for download_index, download_item in enumerate(
download_queue,
1,
):
download_queue_progress = click.style(
f"[Track {download_index}/{len(download_queue)}]",
dim=True,
track_log = logger.bind(
action=f"Track {download_index:>3}/{len(download_queue):<3}"
)
media_title = (
download_item.media_metadata["attributes"]["name"]
if isinstance(
download_item,
DownloadItem,
)
download_item.media.media_metadata["attributes"]["name"]
if download_item.media.media_metadata
and download_item.media.media_metadata.get("attributes", {}).get("name")
else "Unknown Title"
)
logger.info(download_queue_progress + f' Downloading "{media_title}"')
track_log.info(f'Downloading "{media_title}"')
try:
await downloader.download(download_item)
except GamdlError as e:
logger.warning(
download_queue_progress + f' Skipping "{media_title}": {e}'
)
except (
GamdlInterfaceMediaNotStreamableError,
GamdlInterfaceFormatNotAvailableError,
GamdlInterfaceDecryptionNotAvailableError,
GamdlInterfaceArtistMediaTypeError,
GamdlDownloaderSyncedLyricsOnlyError,
GamdlDownloaderMediaFileExistsError,
GamdlDownloaderDependencyNotFoundError,
) as e:
track_log.warning(f'Skipping "{media_title}": {e}')
continue
except KeyboardInterrupt:
exit(1)
except Exception as e:
error_count += 1
logger.error(
download_queue_progress + f' Error downloading "{media_title}"',
exc_info=not config.no_exceptions,
)
track_log.error(f'Error downloading "{media_title}"')
if not config.no_exceptions:
traceback.print_exc()
logger.info(f"Finished with {error_count} error(s)")
+165 -155
View File
@@ -11,14 +11,17 @@ from ..downloader import (
AppleMusicBaseDownloader,
AppleMusicDownloader,
AppleMusicMusicVideoDownloader,
AppleMusicSongDownloader,
AppleMusicUploadedVideoDownloader,
ArtistAutoSelect,
DownloadMode,
RemuxFormatMusicVideo,
RemuxMode,
)
from ..interface import (
AppleMusicBaseInterface,
AppleMusicInterface,
AppleMusicMusicVideoInterface,
AppleMusicSongInterface,
AppleMusicUploadedVideoInterface,
ArtistMediaType,
CoverFormat,
MusicVideoCodec,
MusicVideoResolution,
@@ -30,13 +33,18 @@ from .utils import Csv
api_from_cookies_sig = inspect.signature(AppleMusicApi.create_from_netscape_cookies)
api_from_wrapper_sig = inspect.signature(AppleMusicApi.create_from_wrapper)
api_sig = inspect.signature(AppleMusicApi.__init__)
api_create_sig = inspect.signature(AppleMusicApi.create)
base_interface_create_sig = inspect.signature(AppleMusicBaseInterface.create)
song_interface_sig = inspect.signature(AppleMusicSongInterface.__init__)
music_video_interface_sig = inspect.signature(AppleMusicMusicVideoInterface.__init__)
uploaded_video_interface_sig = inspect.signature(
AppleMusicUploadedVideoInterface.__init__
)
interface_create_sig = inspect.signature(AppleMusicInterface)
base_downloader_sig = inspect.signature(AppleMusicBaseDownloader.__init__)
music_video_downloader_sig = inspect.signature(AppleMusicMusicVideoDownloader.__init__)
song_downloader_sig = inspect.signature(AppleMusicSongDownloader.__init__)
uploaded_video_downloader_sig = inspect.signature(
AppleMusicUploadedVideoDownloader.__init__
)
downloader_sig = inspect.signature(AppleMusicDownloader.__init__)
@@ -105,6 +113,24 @@ class CliConfig:
is_flag=True,
),
]
artist_auto_select: Annotated[
ArtistMediaType | None,
option(
"--artist-auto-select",
help="Automatically select artist content to download (only for artist URLs)",
default=None,
type=ArtistMediaType,
),
]
no_config_file: Annotated[
bool,
option(
"--no-config-file",
"-n",
help="Don't use a config file",
is_flag=True,
),
]
# API specific options
cookies_path: Annotated[
str,
@@ -135,17 +161,95 @@ class CliConfig:
"--language",
"-l",
help="Metadata language",
default=api_sig.parameters["language"].default,
default=api_create_sig.parameters["language"].default,
),
]
# Downloader specific options
artist_auto_select: Annotated[
ArtistAutoSelect | None,
# Base Interface specific options
cover_format: Annotated[
CoverFormat,
option(
"--artist-auto-select",
help="Automatically select artist content to download (only for artist URLs)",
default=downloader_sig.parameters["artist_auto_select"].default,
type=ArtistAutoSelect,
"--cover-format",
help="Cover format",
default=base_interface_create_sig.parameters["cover_format"].default,
type=CoverFormat,
),
]
cover_size: Annotated[
int,
option(
"--cover-size",
help="Cover size in pixels",
default=base_interface_create_sig.parameters["cover_size"].default,
),
]
wvd_path: Annotated[
str | None,
option(
"--wvd-path",
help=".wvd file path",
default=base_interface_create_sig.parameters["wvd_path"].default,
type=click.Path(
file_okay=False,
dir_okay=True,
writable=True,
resolve_path=True,
),
),
]
# Song Interface Options
synced_lyrics_format: Annotated[
SyncedLyricsFormat,
option(
"--synced-lyrics-format",
help="Synced lyrics format",
default=song_interface_sig.parameters["synced_lyrics_format"].default,
type=SyncedLyricsFormat,
),
]
song_codec_piority: Annotated[
list[SongCodec],
option(
"--song-codec-priority",
help="Comma-separated codec priority",
default=song_interface_sig.parameters["codec_priority"].default,
type=Csv(SongCodec),
),
]
use_album_date: Annotated[
bool,
option(
"--use-album-date",
help="Use album release date for songs",
is_flag=True,
),
]
# Music Video Interface Options
music_video_resolution: Annotated[
MusicVideoResolution,
option(
"--music-video-resolution",
help="Max music video resolution",
default=music_video_interface_sig.parameters["resolution"].default,
type=MusicVideoResolution,
),
]
music_video_codec_priority: Annotated[
list[MusicVideoCodec],
option(
"--music-video-codec-priority",
help="Comma-separated codec priority",
default=music_video_interface_sig.parameters["codec_priority"].default,
type=Csv(MusicVideoCodec),
),
]
# Uploaded Video Interface Options
uploaded_video_quality: Annotated[
UploadedVideoQuality,
option(
"--uploaded-video-quality",
help="Post video quality",
default=uploaded_video_interface_sig.parameters["quality"].default,
type=UploadedVideoQuality,
),
]
# Base Downloader specific options
@@ -178,45 +282,6 @@ class CliConfig:
),
),
]
wvd_path: Annotated[
str,
option(
"--wvd-path",
help=".wvd file path",
default=base_downloader_sig.parameters["wvd_path"].default,
type=click.Path(
file_okay=False,
dir_okay=True,
writable=True,
resolve_path=True,
),
),
]
overwrite: Annotated[
bool,
option(
"--overwrite",
help="Overwrite existing files",
is_flag=True,
),
]
save_cover: Annotated[
bool,
option(
"--save-cover",
"-s",
help="Save cover as separate file",
is_flag=True,
),
]
save_playlist: Annotated[
bool,
option(
"--save-playlist",
help="Save M3U8 playlist file",
is_flag=True,
),
]
nm3u8dlre_path: Annotated[
str,
option(
@@ -274,15 +339,6 @@ class CliConfig:
type=DownloadMode,
),
]
cover_format: Annotated[
CoverFormat,
option(
"--cover-format",
help="Cover format",
default=base_downloader_sig.parameters["cover_format"].default,
type=CoverFormat,
),
]
album_folder_template: Annotated[
str,
option(
@@ -309,6 +365,14 @@ class CliConfig:
default=base_downloader_sig.parameters["no_album_folder_template"].default,
),
]
playlist_folder_template: Annotated[
str,
option(
"--playlist-folder-template",
help="Playlist folder template",
default=base_downloader_sig.parameters["playlist_folder_template"].default,
),
]
single_disc_file_template: Annotated[
str,
option(
@@ -358,14 +422,6 @@ class CliConfig:
type=Csv(str),
),
]
cover_size: Annotated[
int,
option(
"--cover-size",
help="Cover size in pixels",
default=base_downloader_sig.parameters["cover_size"].default,
),
]
truncate: Annotated[
int,
option(
@@ -374,67 +430,7 @@ class CliConfig:
default=base_downloader_sig.parameters["truncate"].default,
),
]
# DownloaderSong specific options
song_codec_piority: Annotated[
list[SongCodec],
option(
"--song-codec-priority",
help="Comma-separated codec priority",
default=song_downloader_sig.parameters["codec_priority"].default,
type=Csv(SongCodec),
),
]
synced_lyrics_format: Annotated[
SyncedLyricsFormat,
option(
"--synced-lyrics-format",
help="Synced lyrics format",
default=song_downloader_sig.parameters["synced_lyrics_format"].default,
type=SyncedLyricsFormat,
),
]
no_synced_lyrics: Annotated[
bool,
option(
"--no-synced-lyrics",
help="Don't download synced lyrics",
is_flag=True,
),
]
synced_lyrics_only: Annotated[
bool,
option(
"--synced-lyrics-only",
help="Download only synced lyrics",
is_flag=True,
),
]
use_album_date: Annotated[
bool,
option(
"--use-album-date",
help="Use album release date for songs",
is_flag=True,
),
]
fetch_extra_tags: Annotated[
bool,
option(
"--fetch-extra-tags",
help="Fetch extra tags from preview (normalization and smooth playback)",
is_flag=True,
),
]
# DownloaderMusicVideo specific options
music_video_codec_priority: Annotated[
list[MusicVideoCodec],
option(
"--music-video-codec-priority",
help="Comma-separated codec priority",
default=music_video_downloader_sig.parameters["codec_priority"].default,
type=Csv(MusicVideoCodec),
),
]
music_video_remux_mode: Annotated[
RemuxMode,
option(
@@ -453,31 +449,45 @@ class CliConfig:
type=RemuxFormatMusicVideo,
),
]
music_video_resolution: Annotated[
MusicVideoResolution,
option(
"--music-video-resolution",
help="Max music video resolution",
default=music_video_downloader_sig.parameters["resolution"].default,
type=MusicVideoResolution,
),
]
# DownloaderUploadedVideo specific options
uploaded_video_quality: Annotated[
UploadedVideoQuality,
option(
"--uploaded-video-quality",
help="Post video quality",
default=uploaded_video_downloader_sig.parameters["quality"].default,
type=UploadedVideoQuality,
),
]
no_config_file: Annotated[
# Downloader specific options
overwrite: Annotated[
bool,
option(
"--no-config-file",
"-n",
help="Don't use a config file",
"--overwrite",
help="Overwrite existing files",
is_flag=True,
),
]
save_cover: Annotated[
bool,
option(
"--save-cover",
"-s",
help="Save cover as separate file",
is_flag=True,
),
]
save_playlist: Annotated[
bool,
option(
"--save-playlist",
help="Save M3U8 playlist file",
is_flag=True,
),
]
no_synced_lyrics: Annotated[
bool,
option(
"--no-synced-lyrics",
help="Don't download synced lyrics",
is_flag=True,
),
]
synced_lyrics_only: Annotated[
bool,
option(
"--synced-lyrics-only",
help="Download only synced lyrics",
is_flag=True,
),
]
+232
View File
@@ -0,0 +1,232 @@
from InquirerPy import inquirer
from InquirerPy.base.control import Choice
import m3u8
from ..interface import ArtistMediaType
class InteractivePrompts:
def __init__(
self,
artist_auto_select: ArtistMediaType | None = None,
):
self.artist_auto_select = artist_auto_select
@staticmethod
def millis_to_min_sec(millis) -> str:
minutes, seconds = divmod(millis // 1000, 60)
return f"{minutes:02}:{seconds:02}"
@staticmethod
async def ask_song_codec(
playlists: list[dict],
) -> dict:
choices = [
Choice(
name=playlist["stream_info"]["audio"],
value=playlist,
)
for playlist in playlists
]
return await inquirer.select(
message="Select which codec to download:",
choices=choices,
).execute_async()
@staticmethod
async def ask_music_video_video_codec_function(
playlists: list[m3u8.Playlist],
) -> dict:
choices = [
Choice(
name=" | ".join(
[
playlist.stream_info.codecs[:4],
"x".join(str(v) for v in playlist.stream_info.resolution),
str(playlist.stream_info.bandwidth),
]
),
value=playlist,
)
for playlist in playlists
]
return await inquirer.select(
message="Select which video codec to download: (Codec | Resolution | Bitrate)",
choices=choices,
).execute_async()
@staticmethod
async def ask_music_video_audio_codec_function(
playlists: list[dict],
) -> dict:
choices = [
Choice(
name=playlist["group_id"],
value=playlist,
)
for playlist in playlists
]
selected = await inquirer.select(
message="Select which audio codec to download:",
choices=choices,
).execute_async()
return selected
@staticmethod
async def ask_uploaded_video_quality_function(
available_qualities: dict[str, str],
) -> str:
qualities = list(available_qualities.keys())
choices = [
Choice(
name=quality,
value=quality,
)
for quality in qualities
]
selected = await inquirer.select(
message="Select which quality to download:",
choices=choices,
).execute_async()
return available_qualities[selected]
async def ask_artist_media_type(
self,
media_types: list[ArtistMediaType],
artist_metadata: dict,
) -> ArtistMediaType:
if self.artist_auto_select:
return self.artist_auto_select
available_choices = []
for media_types in media_types:
available_choices.append(
Choice(
name=str(media_types),
value=(media_types,),
),
)
(media_type,) = await inquirer.select(
message=f'Select which type to download for artist "{artist_metadata["attributes"]["name"]}":',
choices=available_choices,
validate=lambda result: artist_metadata.get(result[0].path_key[0], {})
.get(result[0].path_key[1], {})
.get("data"),
).execute_async()
return media_type
async def ask_artist_select_items(
self,
media_type: ArtistMediaType,
items: list[dict],
) -> list[dict]:
if media_type in {
ArtistMediaType.MAIN_ALBUMS,
ArtistMediaType.COMPILATION_ALBUMS,
ArtistMediaType.LIVE_ALBUMS,
ArtistMediaType.SINGLES_EPS,
ArtistMediaType.ALL_ALBUMS,
}:
return await self._ask_artist_select_albums(items)
elif media_type == ArtistMediaType.TOP_SONGS:
return await self._ask_artist_select_songs(
items,
)
elif media_type == ArtistMediaType.MUSIC_VIDEOS:
return await self._ask_artist_select_music_videos(items)
async def _ask_artist_select_albums(
self,
albums: list[dict],
) -> list[dict]:
if self.artist_auto_select:
return albums
choices = [
Choice(
name=" | ".join(
[
f'{album["attributes"]["trackCount"]:03d}',
f'{album["attributes"]["releaseDate"]:<10}',
f'{album["attributes"].get("contentRating", "None").title():<8}',
f'{album["attributes"]["name"]}',
]
),
value=album,
)
for album in albums
if album.get("attributes")
]
selected = await inquirer.select(
message="Select which albums to download: (Track Count | Release Date | Rating | Title)",
choices=choices,
multiselect=True,
).execute_async()
return selected
async def _ask_artist_select_songs(
self,
songs: list[dict],
) -> list[dict]:
if self.artist_auto_select:
return songs
choices = [
Choice(
name=" | ".join(
[
self.millis_to_min_sec(song["attributes"]["durationInMillis"]),
f'{song["attributes"].get("contentRating", "None").title():<8}',
song["attributes"]["name"],
],
),
value=song,
)
for song in songs
if song.get("attributes")
]
selected = await inquirer.select(
message="Select which songs to download: (Duration | Rating | Title)",
choices=choices,
multiselect=True,
).execute_async()
return selected
async def _ask_artist_select_music_videos(
self,
music_videos: list[dict],
) -> list[dict]:
if self.artist_auto_select:
return music_videos
choices = [
Choice(
name=" | ".join(
[
self.millis_to_min_sec(
music_video["attributes"]["durationInMillis"]
),
f'{music_video["attributes"].get("contentRating", "None").title():<8}',
music_video["attributes"]["name"],
],
),
value=music_video,
)
for music_video in music_videos
if music_video.get("attributes")
]
selected = await inquirer.select(
message="Select which music videos to download: (Duration | Rating | Title)",
choices=choices,
multiselect=True,
).execute_async()
return selected
+27 -23
View File
@@ -1,6 +1,7 @@
import logging
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Any
import click
@@ -38,31 +39,34 @@ class Csv(click.ParamType):
return result
class CustomLoggerFormatter(logging.Formatter):
base_format = "[%(levelname)-8s %(asctime)s]"
format_colors = {
logging.DEBUG: dict(dim=True),
logging.INFO: dict(fg="green"),
logging.WARNING: dict(fg="yellow"),
logging.ERROR: dict(fg="red"),
logging.CRITICAL: dict(fg="red", bold=True),
def custom_structlog_formatter(
logger: Any,
name: str,
event_dict: dict[str, Any],
) -> str:
level = event_dict.get("level", "INFO").upper()
timestamp = datetime.now().strftime("%H:%M:%S")
level_colors = {
"DEBUG": "cyan",
"INFO": "green",
"WARNING": "yellow",
"ERROR": "red",
"CRITICAL": "red",
}
date_format = "%H:%M:%S"
def __init__(self, use_colors: bool = True) -> None:
super().__init__()
self.use_colors = use_colors
color = level_colors.get(level, "white")
prefix = click.style(f"[{level:<8} {timestamp}]", fg=color)
def format(self, record: logging.LogRecord) -> str:
return logging.Formatter(
(
click.style(self.base_format, **self.format_colors.get(record.levelno))
if self.use_colors
else self.base_format
)
+ " %(message)s",
datefmt=self.date_format,
).format(record)
action = event_dict.pop("action", None)
if action:
prefix += click.style(f" [{action}]", dim=True)
if level in {"INFO", "WARNING", "ERROR", "CRITICAL"}:
message = event_dict.get("event", "")
return f"{prefix} {message}"
else:
return f"{prefix} {event_dict}"
def prompt_path(