Compare commits

...

52 Commits

Author SHA1 Message Date
Rafael Moraes 9b701e8ee8 Update license format and add setuptools packages 2026-01-15 22:58:33 -03:00
Rafael Moraes f4e6069e69 Bump version to 2.8.3 2026-01-15 22:50:41 -03:00
Rafael Moraes 841b1edb64 Fix import location for CliConfig in config_file.py 2026-01-15 22:48:58 -03:00
Rafael Moraes ef4b34f3d2 Add pathlib and Csv import to cli_config.py 2026-01-15 22:48:53 -03:00
Rafael Moraes 98980fc130 Refactor CliConfig to separate module 2026-01-15 22:47:51 -03:00
Rafael Moraes 6c84651770 Fix config value check to distinguish None from falsy values 2026-01-15 22:44:08 -03:00
Rafael Moraes f9d3d0a97e Refactor playlist file path formatting logic 2026-01-15 22:41:33 -03:00
Rafael Moraes 9a879c0857 Refactor template variable names for clarity 2026-01-15 22:30:09 -03:00
Rafael Moraes d0ab35383b Remove unnecessary strip() after regex substitution 2026-01-15 22:28:54 -03:00
Rafael Moraes b14004f3e3 Update installation instructions to use pip 2026-01-15 22:26:46 -03:00
Rafael Moraes a6e409d98d Update template variable documentation in README 2026-01-15 22:25:06 -03:00
Rafael Moraes d1c9aea874 Skip config updates for command-line parameters 2026-01-15 22:15:55 -03:00
Rafael Moraes 8c110b4fb9 Refactor file template selection logic 2026-01-15 22:12:03 -03:00
Rafael Moraes e1c8cb51ad Refactor path sanitization and formatting logic 2026-01-15 22:11:14 -03:00
Rafael Moraes 52324d519c Refactor CLI to use dataclass-based config and options 2026-01-15 03:12:39 -03:00
Rafael Moraes 057315524f Add dataclass-click to project dependencies 2026-01-15 03:12:18 -03:00
Rafael Moraes 446636166e Update README with new CLI options 2026-01-04 15:11:09 -03:00
Rafael Moraes 7199cac179 Add support for fetching and applying extra tags 2026-01-04 15:11:02 -03:00
Rafael Moraes be4f30cb54 Add method to extract extra tags from Apple Music previews 2026-01-04 15:10:54 -03:00
Rafael Moraes 83ca91e91c Refactor cover image handling to interface layer 2026-01-03 15:08:35 -03:00
Rafael Moraes 6ed596ca42 Add option to use album release date for songs 2026-01-03 14:43:55 -03:00
Rafael Moraes 414ce749d6 Remove unused httpx import from downloader_base.py 2026-01-03 14:24:06 -03:00
Rafael Moraes 17863b500a Add UnsupportedMediaType exception and checks for downloaders 2026-01-02 13:31:24 -03:00
Rafael Moraes 5e48032f34 Remove redundant error handling in downloaders 2026-01-02 13:23:56 -03:00
Rafael Moraes e2ed443253 Add unified error handling to get_download_item 2026-01-02 13:19:13 -03:00
Rafael Moraes ade78ad7b3 Bump version to 2.8.2 2025-12-21 16:17:01 -03:00
Rafael Moraes 054f636434 Bump version to 2.8.2 2025-12-21 16:07:48 -03:00
Rafael Moraes bf9c74d9d8 Increase concurrency limit in safe_gather to 10 2025-12-21 16:07:33 -03:00
Rafael Moraes 3c48618e84 Remove custom transport retries from AppleMusicApi 2025-12-21 15:56:30 -03:00
Rafael Moraes c940ee2f47 Replace sequential_gather with safe_gather in downloader 2025-12-21 15:55:07 -03:00
Rafael Moraes 7f56dfd0c8 Remove retry logic from safe_gather utility 2025-12-21 15:54:52 -03:00
Rafael Moraes 7c3112421d Refactor AppleMusicApi.create_from_wrapper to use get_response utility 2025-12-21 01:48:16 -03:00
Rafael Moraes 55ce7555a9 Add timeout to iTunes API search request 2025-12-21 01:36:03 -03:00
Rafael Moraes 9c4adbb2c1 Refactor HTTP response handling for m3u8 and cover fetch 2025-12-21 01:34:09 -03:00
Rafael Moraes 1591f0daf2 Set httpx.AsyncClient timeout to 60 seconds 2025-12-18 14:21:56 -03:00
Rafael Moraes 25d028bea4 Add colorama for improved Windows console support 2025-12-14 19:13:30 -03:00
Rafael Moraes ebc28a019e Bump version to 2.8.1 2025-12-10 01:23:32 -03:00
Rafael Moraes 690df6e9d7 Update README example for AppleMusicApi usage 2025-12-10 01:12:52 -03:00
Rafael Moraes 8039c7c86f Reorder error check in AppleMusicDownloader 2025-12-10 01:08:13 -03:00
Rafael Moraes f67ba37d19 Check streamability before downloading media 2025-12-09 23:26:53 -03:00
Rafael Moraes 59f247a90f Fix default language option in CLI 2025-12-06 15:41:44 -03:00
Rafael Moraes 181bdb198d Refactor AppleMusicApi init and factory methods 2025-12-06 15:40:45 -03:00
Rafael Moraes 1945342adc Improve audio track validation in AppleMusicDownloader 2025-12-05 01:11:31 -03:00
Rafael Moraes f19ef4d6dd Fix audio track validation in AppleMusicDownloader 2025-12-05 01:05:44 -03:00
Rafael Moraes 1ceb7fcf46 Instantiate ItunesApi directly in CLI 2025-12-04 17:28:23 -03:00
Rafael Moraes 23ed14ca04 Refactor ItunesApi instantiation and initialization 2025-12-04 17:27:59 -03:00
Rafael Moraes 3e3939d0ee Refactor downloader setup to initialization method 2025-12-04 17:26:35 -03:00
Rafael Moraes 780261a9c8 Update API instantiation to use async factory methods 2025-12-04 17:24:41 -03:00
Rafael Moraes 80cb80e9a2 Refactor AppleMusicApi and ItunesApi initialization 2025-12-04 17:24:32 -03:00
Rafael Moraes f3b7adaad3 Replace safe_gather with sequential_gather in downloader 2025-12-04 16:52:34 -03:00
Rafael Moraes fe6a6e308d Refactor mp4decrypt and amdecrypt path checks in CLI 2025-11-29 14:27:28 -03:00
Rafael Moraes b08bf98759 Reduce retry count in safe_gather utility 2025-11-29 14:24:58 -03:00
24 changed files with 1238 additions and 831 deletions
+32 -36
View File
@@ -41,12 +41,10 @@ Add these tools to your system PATH for additional features:
## 📦 Installation
**Install Gamdl via pipx:**
[pipx](https://pipx.pypa.io/stable/installation/) is recommended for installing Gamdl to avoid dependency conflicts, but you can also use pip.
**Install Gamdl via pip:**
```bash
pipx install gamdl
pip install gamdl
```
**Setup cookies:**
@@ -159,6 +157,8 @@ The file is created automatically on first run. Command-line arguments override
| `--synced-lyrics-format` | Synced lyrics format | `lrc` |
| `--no-synced-lyrics` | Don't download synced lyrics | `false` |
| `--synced-lyrics-only` | Download only synced lyrics | `false` |
| `--use-album-date` | Use album release date for songs | `false` |
| `--fetch-extra-tags` | Fetch extra tags from preview (normalization and smooth playback) | `false` |
| **Music Video Options** | | |
| `--music-video-codec-priority` | Comma-separated codec priority | `h264,h265` |
| `--music-video-remux-format` | Music video remux format | `m4v` |
@@ -168,22 +168,23 @@ The file is created automatically on first run. Command-line arguments override
### Template Variables
Use these variables in folder/file templates or `--exclude-tags`:
**Tags for templates and exclude-tags:**
| Variable | Description |
| ---------------------------------------------------------------------------- | --------------------------------------------- |
| `{album}`, `{album_artist}`, `{album_id}`, `{album_sort}` | Album info |
| `{artist}`, `{artist_id}`, `{artist_sort}` | Artist info |
| `{title}`, `{title_id}`, `{title_sort}` | Title info |
| `{composer}`, `{composer_id}`, `{composer_sort}` | Composer info |
| `{track}`, `{track_total}`, `{disc}`, `{disc_total}` | Track numbers |
| `{genre}`, `{genre_id}` | Genre info |
| `{date}` | Release date (supports strftime: `{date:%Y}`) |
| `{playlist_artist}`, `{playlist_id}`, `{playlist_title}`, `{playlist_track}` | Playlist info |
| `{compilation}`, `{gapless}`, `{rating}` | Media properties |
| `{comment}`, `{copyright}`, `{lyrics}`, `{cover}` | Additional metadata |
| `{media_type}`, `{storefront}`, `{xid}` | Technical info |
| `all` | Special: Skip all tagging |
- `album`, `album_artist`, `album_id`
- `artist`, `artist_id`
- `composer`, `composer_id`
- `date` (supports strftime format: `{date:%Y}`)
- `disc`, `disc_total`
- `media_type`
- `playlist_artist`, `playlist_id`, `playlist_title`, `playlist_track`
- `title`, `title_id`
- `track`, `track_total`
**Tags for exclude-tags only:**
- `album_sort`, `artist_sort`, `composer_sort`, `title_sort`
- `comment`, `compilation`, `copyright`, `cover`, `gapless`, `genre`, `genre_id`, `lyrics`, `rating`, `storefront`, `xid`
- `all` (special: skip all tagging)
### Logging Level
@@ -292,29 +293,27 @@ from gamdl.interface import (
AppleMusicUploadedVideoInterface,
)
async def main():
# Initialize APIs
apple_music_api = AppleMusicApi.from_netscape_cookies(cookies_path="cookies.txt")
await apple_music_api.setup()
# Create AppleMusicApi instance (from cookies or wrapper)
apple_music_api = await AppleMusicApi.create_from_netscape_cookies(
cookies_path="cookies.txt",
)
itunes_api = ItunesApi(
apple_music_api.storefront,
apple_music_api.language,
)
itunes_api.setup()
# Initialize interfaces
# Check subscription
assert apple_music_api.active_subscription
# Set up interfaces
interface = AppleMusicInterface(apple_music_api, itunes_api)
song_interface = AppleMusicSongInterface(interface)
music_video_interface = AppleMusicMusicVideoInterface(interface)
uploaded_video_interface = AppleMusicUploadedVideoInterface(interface)
# Initialize base downloader
# Set up base downloader and specialized downloaders
base_downloader = AppleMusicBaseDownloader()
base_downloader.setup()
# Initialize specialized downloaders
song_downloader = AppleMusicSongDownloader(
base_downloader=base_downloader,
interface=song_interface,
@@ -328,7 +327,7 @@ async def main():
interface=uploaded_video_interface,
)
# Create main downloader
# Main downloader
downloader = AppleMusicDownloader(
interface=interface,
base_downloader=base_downloader,
@@ -338,10 +337,8 @@ async def main():
)
# Download a song
url_info = downloader.get_url_info(
"https://music.apple.com/us/album/never-gonna-give-you-up-2022-remaster/1624945511?i=1624945512"
)
url = "https://music.apple.com/us/album/never-gonna-give-you-up-2022-remaster/1624945511?i=1624945512"
url_info = downloader.get_url_info(url)
if url_info:
download_queue = await downloader.get_download_queue(url_info)
if download_queue:
@@ -360,4 +357,3 @@ MIT License - see [LICENSE](LICENSE) file for details
## 🤝 Contributing
Currently, I'm not interested in reviewing pull requests that change or add features. Only critical bug fixes will be considered. However, feel free to open issues for bugs or feature requests.
+1 -1
View File
@@ -1 +1 @@
__version__ = "2.8"
__version__ = "2.8.3"
+60 -24
View File
@@ -6,7 +6,7 @@ from urllib.parse import parse_qs, urlparse
import httpx
from ..utils import raise_for_status, safe_json
from ..utils import get_response, raise_for_status, safe_json
from .constants import (
AMP_API_URL,
APPLE_MUSIC_COOKIE_DOMAIN,
@@ -22,20 +22,21 @@ class AppleMusicApi:
def __init__(
self,
storefront: str = "us",
media_user_token: str | None = None,
token: str | None = None,
language: str = "en-US",
media_user_token: str | None = None,
developer_token: str | None = None,
) -> None:
self.storefront = storefront
self.media_user_token = media_user_token
self.token = token
self.language = language
self.media_user_token = media_user_token
self.token = developer_token
@classmethod
def from_netscape_cookies(
async def create_from_netscape_cookies(
cls,
cookies_path: str = "./cookies.txt",
language: str = "en-US",
*args,
**kwargs,
) -> "AppleMusicApi":
cookies = MozillaCookieJar(cookies_path)
cookies.load(ignore_discard=True, ignore_expires=True)
@@ -56,35 +57,55 @@ class AppleMusicApi:
"and are logged in with an active subscription."
)
return cls(
return await cls.create(
storefront=None,
media_user_token=media_user_token,
language=language,
developer_token=None,
*args,
**kwargs,
)
@classmethod
def from_wrapper(
async def create_from_wrapper(
cls,
wrapper_account_url: str = "http://127.0.0.1:30020/",
language: str = "en-US",
*args,
**kwargs,
) -> "AppleMusicApi":
wrapper_account_response = httpx.get(wrapper_account_url)
raise_for_status(wrapper_account_response)
wrapper_account_response = await get_response(wrapper_account_url)
wrapper_account_info = safe_json(wrapper_account_response)
return cls(
return await cls.create(
storefront=None,
media_user_token=wrapper_account_info["music_token"],
token=wrapper_account_info["dev_token"],
language=language,
developer_token=wrapper_account_info["dev_token"],
*args,
**kwargs,
)
async def setup(self) -> None:
await self._setup_client()
await self._setup_token()
await self._setup_account_info()
@classmethod
async def create(
cls,
storefront: str | None = "us",
language: str = "en-US",
media_user_token: str | None = None,
developer_token: str | None = None,
) -> "AppleMusicApi":
api = cls(
storefront=storefront,
language=language,
media_user_token=media_user_token,
developer_token=developer_token,
)
await api.initialize()
return api
async def _setup_client(self) -> None:
async def initialize(self) -> None:
await self._initialize_client()
await self._initialize_token()
await self._initialize_account_info()
async def _initialize_client(self) -> None:
self.client = httpx.AsyncClient(
headers={
"accept": "*/*",
@@ -104,7 +125,6 @@ class AppleMusicApi:
"l": self.language,
},
follow_redirects=True,
transport=httpx.AsyncHTTPTransport(retries=10),
timeout=60.0,
)
@@ -133,11 +153,11 @@ class AppleMusicApi:
logger.debug(f"Token: {token}")
return token
async def _setup_token(self) -> None:
async def _initialize_token(self) -> None:
self.token = self.token or await self._get_token()
self.client.headers.update({"authorization": f"Bearer {self.token}"})
async def _setup_account_info(self) -> None:
async def _initialize_account_info(self) -> None:
if not self.media_user_token:
return
@@ -150,6 +170,22 @@ class AppleMusicApi:
self.account_info = await self.get_account_info()
self.storefront = self.account_info["meta"]["subscription"]["storefront"]
@property
def active_subscription(self) -> bool:
return (
getattr(self, "account_info", {})
.get("meta", {})
.get("subscription", {})
.get("active", False)
)
@property
def account_restrictions(self) -> dict | None:
data = getattr(self, "account_info", {}).get("data", [])
if not data:
return None
return data[0].get("attributes", {}).get("restrictions")
async def get_account_info(self, meta: str | None = "subscription") -> dict:
response = await self.client.get(
f"{AMP_API_URL}/v1/me/account",
+7 -5
View File
@@ -16,18 +16,19 @@ class ItunesApi:
) -> None:
self.storefront = storefront
self.language = language
self.initialize()
def setup(self) -> None:
self._setup_storefront_id()
self._setup_session()
def initialize(self) -> None:
self._initialize_storefront_id()
self._initialize_client()
def _setup_storefront_id(self) -> None:
def _initialize_storefront_id(self) -> None:
try:
self.storefront_id = STOREFRONT_IDS[self.storefront.upper()]
except KeyError:
raise Exception(f"No storefront id for {self.storefront}")
def _setup_session(self) -> None:
def _initialize_client(self) -> None:
self.client = httpx.AsyncClient(
params={
"country": self.storefront,
@@ -36,6 +37,7 @@ class ItunesApi:
headers={
"X-Apple-Store-Front": f"{self.storefront_id} t:music31",
},
timeout=60.0,
)
async def get_lookup_result(
+89 -453
View File
@@ -1,10 +1,11 @@
import asyncio
import inspect
import logging
from functools import wraps
from pathlib import Path
import click
import colorama
from dataclass_click import dataclass_click
from .. import __version__
from ..api import AppleMusicApi, ItunesApi
@@ -14,11 +15,9 @@ from ..downloader import (
AppleMusicMusicVideoDownloader,
AppleMusicSongDownloader,
AppleMusicUploadedVideoDownloader,
CoverFormat,
DownloadItem,
DownloadMode,
GamdlError,
RemuxFormatMusicVideo,
RemuxMode,
)
from ..interface import (
@@ -26,53 +25,15 @@ from ..interface import (
AppleMusicMusicVideoInterface,
AppleMusicSongInterface,
AppleMusicUploadedVideoInterface,
MusicVideoCodec,
MusicVideoResolution,
SongCodec,
SyncedLyricsFormat,
UploadedVideoQuality,
)
from .cli_config import CliConfig
from .config_file import ConfigFile
from .constants import X_NOT_IN_PATH
from .utils import Csv, CustomLoggerFormatter, prompt_path
from .utils import CustomLoggerFormatter, prompt_path
logger = logging.getLogger(__name__)
api_from_cookies_sig = inspect.signature(AppleMusicApi.from_netscape_cookies)
api_from_wrapper_sig = inspect.signature(AppleMusicApi.from_wrapper)
base_downloader_sig = inspect.signature(AppleMusicBaseDownloader.__init__)
music_video_downloader_sig = inspect.signature(AppleMusicMusicVideoDownloader.__init__)
song_downloader_sig = inspect.signature(AppleMusicSongDownloader.__init__)
uploaded_video_downloader_sig = inspect.signature(
AppleMusicUploadedVideoDownloader.__init__
)
def load_config_file(
ctx: click.Context,
param: click.Parameter,
no_config_file: bool,
) -> click.Context:
if no_config_file:
return ctx
config_file = ConfigFile(ctx.params["config_path"])
config_file.cleanup_unknown_params(ctx.command.params)
config_file.add_params_default_to_config(
ctx.command.params,
)
parsed_params = config_file.parse_params_from_config(
[
param
for param in ctx.command.params
if ctx.get_parameter_source(param.name)
!= click.core.ParameterSource.COMMANDLINE
]
)
ctx.params.update(parsed_params)
return ctx
def make_sync(func):
@wraps(func)
@@ -85,377 +46,56 @@ def make_sync(func):
@click.command()
@click.help_option("-h", "--help")
@click.version_option(__version__, "-v", "--version")
# CLI specific options
@click.argument(
"urls",
nargs=-1,
type=str,
required=True,
)
@click.option(
"--read-urls-as-txt",
"-r",
is_flag=True,
help="Read URLs from text files",
)
@click.option(
"--config-path",
type=click.Path(file_okay=True, dir_okay=False, writable=True, resolve_path=True),
default=str(Path.home() / ".gamdl" / "config.ini"),
help="Config file path",
)
@click.option(
"--log-level",
type=click.Choice(["DEBUG", "INFO", "WARNING", "ERROR"]),
default="INFO",
help="Logging level",
)
@click.option(
"--log-file",
type=click.Path(file_okay=True, dir_okay=False, writable=True, resolve_path=True),
default=None,
help="Log file path",
)
@click.option(
"--no-exceptions",
is_flag=True,
help="Don't print exceptions",
)
# API specific options
@click.option(
"--cookies-path",
"-c",
type=click.Path(file_okay=True, dir_okay=False, readable=True, resolve_path=True),
default=api_from_cookies_sig.parameters["cookies_path"].default,
help="Cookies file path",
)
@click.option(
"--wrapper-account-url",
type=str,
default=api_from_wrapper_sig.parameters["wrapper_account_url"].default,
help="Wrapper account URL",
)
@click.option(
"--language",
"-l",
type=str,
default=api_from_cookies_sig.parameters["language"].default,
help="Metadata language",
)
# Base Downloader specific options
@click.option(
"--output-path",
"-o",
type=click.Path(file_okay=False, dir_okay=True, writable=True, resolve_path=True),
default=base_downloader_sig.parameters["output_path"].default,
help="Output directory path",
)
@click.option(
"--temp-path",
type=click.Path(file_okay=False, dir_okay=True, writable=True, resolve_path=True),
default=base_downloader_sig.parameters["temp_path"].default,
help="Temporary directory path",
)
@click.option(
"--wvd-path",
type=click.Path(file_okay=False, dir_okay=True, writable=True, resolve_path=True),
default=base_downloader_sig.parameters["wvd_path"].default,
help=".wvd file path",
)
@click.option(
"--overwrite",
is_flag=True,
help="Overwrite existing files",
default=base_downloader_sig.parameters["overwrite"].default,
)
@click.option(
"--save-cover",
"-s",
is_flag=True,
help="Save cover as separate file",
default=base_downloader_sig.parameters["save_cover"].default,
)
@click.option(
"--save-playlist",
is_flag=True,
help="Save M3U8 playlist file",
default=base_downloader_sig.parameters["save_playlist"].default,
)
@click.option(
"--nm3u8dlre-path",
type=str,
default=base_downloader_sig.parameters["nm3u8dlre_path"].default,
help="N_m3u8DL-RE executable path",
)
@click.option(
"--mp4decrypt-path",
type=str,
default=base_downloader_sig.parameters["mp4decrypt_path"].default,
help="mp4decrypt executable path",
)
@click.option(
"--ffmpeg-path",
type=str,
default=base_downloader_sig.parameters["ffmpeg_path"].default,
help="FFmpeg executable path",
)
@click.option(
"--mp4box-path",
type=str,
default=base_downloader_sig.parameters["mp4box_path"].default,
help="MP4Box executable path",
)
@click.option(
"--amdecrypt-path",
type=str,
default=base_downloader_sig.parameters["amdecrypt_path"].default,
help="amdecrypt executable path",
)
@click.option(
"--use-wrapper",
is_flag=True,
help="Use wrapper and amdecrypt for decrypting songs",
default=False,
)
@click.option(
"--wrapper-decrypt-ip",
type=str,
default=base_downloader_sig.parameters["wrapper_decrypt_ip"].default,
help="IP address and port for wrapper decryption",
)
@click.option(
"--download-mode",
type=DownloadMode,
default=base_downloader_sig.parameters["download_mode"].default,
help="Download mode",
)
@click.option(
"--remux-mode",
type=RemuxMode,
default=base_downloader_sig.parameters["remux_mode"].default,
help="Remux mode",
)
@click.option(
"--cover-format",
type=CoverFormat,
default=base_downloader_sig.parameters["cover_format"].default,
help="Cover format",
)
@click.option(
"--album-folder-template",
type=str,
default=base_downloader_sig.parameters["album_folder_template"].default,
help="Album folder template",
)
@click.option(
"--compilation-folder-template",
type=str,
default=base_downloader_sig.parameters["compilation_folder_template"].default,
help="Compilation folder template",
)
@click.option(
"--no-album-folder-template",
type=str,
default=base_downloader_sig.parameters["no_album_folder_template"].default,
help="No album folder template",
)
@click.option(
"--single-disc-file-template",
type=str,
default=base_downloader_sig.parameters["single_disc_file_template"].default,
help="Single disc file template",
)
@click.option(
"--multi-disc-file-template",
type=str,
default=base_downloader_sig.parameters["multi_disc_file_template"].default,
help="Multi disc file template",
)
@click.option(
"--no-album-file-template",
type=str,
default=base_downloader_sig.parameters["no_album_file_template"].default,
help="No album file template",
)
@click.option(
"--playlist-file-template",
type=str,
default=base_downloader_sig.parameters["playlist_file_template"].default,
help="Playlist file template",
)
@click.option(
"--date-tag-template",
type=str,
default=base_downloader_sig.parameters["date_tag_template"].default,
help="Date tag template",
)
@click.option(
"--exclude-tags",
type=Csv(str),
default=base_downloader_sig.parameters["exclude_tags"].default,
help="Comma-separated tags to exclude",
)
@click.option(
"--cover-size",
type=int,
default=base_downloader_sig.parameters["cover_size"].default,
help="Cover size in pixels",
)
@click.option(
"--truncate",
type=int,
default=base_downloader_sig.parameters["truncate"].default,
help="Max filename length",
)
# DownloaderSong specific options
@click.option(
"--song-codec",
type=SongCodec,
default=song_downloader_sig.parameters["codec"].default,
help="Song codec",
)
@click.option(
"--synced-lyrics-format",
type=SyncedLyricsFormat,
default=song_downloader_sig.parameters["synced_lyrics_format"].default,
help="Synced lyrics format",
)
@click.option(
"--no-synced-lyrics",
is_flag=True,
help="Don't download synced lyrics",
default=song_downloader_sig.parameters["no_synced_lyrics"].default,
)
@click.option(
"--synced-lyrics-only",
is_flag=True,
help="Download only synced lyrics",
default=song_downloader_sig.parameters["synced_lyrics_only"].default,
)
# DownloaderMusicVideo specific options
@click.option(
"--music-video-codec-priority",
type=Csv(MusicVideoCodec),
default=music_video_downloader_sig.parameters["codec_priority"].default,
help="Comma-separated codec priority",
)
@click.option(
"--music-video-remux-format",
type=RemuxFormatMusicVideo,
default=music_video_downloader_sig.parameters["remux_format"].default,
help="Music video remux format",
)
@click.option(
"--music-video-resolution",
type=MusicVideoResolution,
default=music_video_downloader_sig.parameters["resolution"].default,
help="Max music video resolution",
)
# DownloaderUploadedVideo specific options
@click.option(
"--uploaded-video-quality",
type=UploadedVideoQuality,
default=uploaded_video_downloader_sig.parameters["quality"].default,
help="Post video quality",
)
# This option should always be last
@click.option(
"--no-config-file",
"-n",
is_flag=True,
callback=load_config_file,
help="Don't use a config file",
)
@dataclass_click(CliConfig)
@make_sync
async def main(
urls: list[str],
read_urls_as_txt: bool,
config_path: str,
log_level: str,
log_file: str,
no_exceptions: bool,
cookies_path: str,
wrapper_account_url: str,
language: str,
output_path: str,
temp_path: str,
wvd_path: str,
overwrite: bool,
save_cover: bool,
save_playlist: bool,
nm3u8dlre_path: str,
mp4decrypt_path: str,
ffmpeg_path: str,
mp4box_path: str,
amdecrypt_path: str,
use_wrapper: bool,
wrapper_decrypt_ip: str,
download_mode: DownloadMode,
remux_mode: RemuxMode,
cover_format: CoverFormat,
album_folder_template: str,
compilation_folder_template: str,
no_album_folder_template: str,
single_disc_file_template: str,
multi_disc_file_template: str,
no_album_file_template: str,
playlist_file_template: str,
date_tag_template: str,
exclude_tags: list[str],
cover_size: int,
truncate: int,
song_codec: SongCodec,
synced_lyrics_format: SyncedLyricsFormat,
no_synced_lyrics: bool,
synced_lyrics_only: bool,
music_video_codec_priority: list[MusicVideoCodec],
music_video_remux_format: RemuxFormatMusicVideo,
music_video_resolution: MusicVideoResolution,
uploaded_video_quality: UploadedVideoQuality,
*args,
**kwargs,
):
async def main(config: CliConfig):
colorama.just_fix_windows_console()
root_logger = logging.getLogger(__name__.split(".")[0])
root_logger.setLevel(log_level)
root_logger.setLevel(config.log_level)
root_logger.propagate = False
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(CustomLoggerFormatter())
root_logger.addHandler(stream_handler)
if log_file:
file_handler = logging.FileHandler(log_file, encoding="utf-8")
if config.log_file:
file_handler = logging.FileHandler(config.log_file, encoding="utf-8")
file_handler.setFormatter(CustomLoggerFormatter(use_colors=False))
root_logger.addHandler(file_handler)
logger.info(f"Starting Gamdl {__version__}")
if use_wrapper:
apple_music_api = AppleMusicApi.from_wrapper(
wrapper_account_url=wrapper_account_url,
language=language,
if not config.no_config_file:
config_file = ConfigFile(config.config_path)
config_file.cleanup_unknown_params()
config_file.add_params_default_to_config()
config = config_file.update_params_from_config(config)
if config.use_wrapper:
apple_music_api = await AppleMusicApi.create_from_wrapper(
wrapper_account_url=config.wrapper_account_url,
language=config.language,
)
else:
cookies_path = prompt_path(cookies_path)
apple_music_api = AppleMusicApi.from_netscape_cookies(
cookies_path = prompt_path(config.cookies_path)
apple_music_api = await AppleMusicApi.create_from_netscape_cookies(
cookies_path=cookies_path,
language=language,
language=config.language,
)
await apple_music_api.setup()
itunes_api = ItunesApi(
apple_music_api.storefront,
apple_music_api.language,
)
itunes_api.setup()
if not apple_music_api.account_info["meta"]["subscription"]["active"]:
if not apple_music_api.active_subscription:
logger.critical(
"No active Apple Music subscription found, you won't be able to download"
" anything"
)
return
if apple_music_api.account_info["data"][0]["attributes"].get("restrictions"):
if apple_music_api.account_restrictions:
logger.warning(
"Your account has content restrictions enabled, some content may not be"
" downloadable"
@@ -470,54 +110,55 @@ async def main(
uploaded_video_interface = AppleMusicUploadedVideoInterface(interface)
base_downloader = AppleMusicBaseDownloader(
output_path=output_path,
temp_path=temp_path,
wvd_path=wvd_path,
overwrite=overwrite,
save_cover=save_cover,
save_playlist=save_playlist,
nm3u8dlre_path=nm3u8dlre_path,
mp4decrypt_path=mp4decrypt_path,
ffmpeg_path=ffmpeg_path,
mp4box_path=mp4box_path,
amdecrypt_path=amdecrypt_path,
use_wrapper=use_wrapper,
wrapper_decrypt_ip=wrapper_decrypt_ip,
download_mode=download_mode,
remux_mode=remux_mode,
cover_format=cover_format,
album_folder_template=album_folder_template,
compilation_folder_template=compilation_folder_template,
no_album_folder_template=no_album_folder_template,
single_disc_file_template=single_disc_file_template,
multi_disc_file_template=multi_disc_file_template,
no_album_file_template=no_album_file_template,
playlist_file_template=playlist_file_template,
date_tag_template=date_tag_template,
exclude_tags=exclude_tags,
cover_size=cover_size,
truncate=truncate,
output_path=config.output_path,
temp_path=config.temp_path,
wvd_path=config.wvd_path,
overwrite=config.overwrite,
save_cover=config.save_cover,
save_playlist=config.save_playlist,
nm3u8dlre_path=config.nm3u8dlre_path,
mp4decrypt_path=config.mp4decrypt_path,
ffmpeg_path=config.ffmpeg_path,
mp4box_path=config.mp4box_path,
amdecrypt_path=config.amdecrypt_path,
use_wrapper=config.use_wrapper,
wrapper_decrypt_ip=config.wrapper_decrypt_ip,
download_mode=config.download_mode,
remux_mode=config.remux_mode,
cover_format=config.cover_format,
album_folder_template=config.album_folder_template,
compilation_folder_template=config.compilation_folder_template,
no_album_folder_template=config.no_album_folder_template,
single_disc_file_template=config.single_disc_file_template,
multi_disc_file_template=config.multi_disc_file_template,
no_album_file_template=config.no_album_file_template,
playlist_file_template=config.playlist_file_template,
date_tag_template=config.date_tag_template,
exclude_tags=config.exclude_tags,
cover_size=config.cover_size,
truncate=config.truncate,
)
base_downloader.setup()
song_downloader = AppleMusicSongDownloader(
base_downloader=base_downloader,
interface=song_interface,
codec=song_codec,
synced_lyrics_format=synced_lyrics_format,
no_synced_lyrics=no_synced_lyrics,
synced_lyrics_only=synced_lyrics_only,
codec=config.song_codec,
synced_lyrics_format=config.synced_lyrics_format,
no_synced_lyrics=config.no_synced_lyrics,
synced_lyrics_only=config.synced_lyrics_only,
use_album_date=config.use_album_date,
fetch_extra_tags=config.fetch_extra_tags,
)
music_video_downloader = AppleMusicMusicVideoDownloader(
base_downloader=base_downloader,
interface=music_video_interface,
codec_priority=music_video_codec_priority,
remux_format=music_video_remux_format,
resolution=music_video_resolution,
codec_priority=config.music_video_codec_priority,
remux_format=config.music_video_remux_format,
resolution=config.music_video_resolution,
)
uploaded_video_downloader = AppleMusicUploadedVideoDownloader(
base_downloader=base_downloader,
interface=uploaded_video_interface,
quality=uploaded_video_quality,
quality=config.uploaded_video_quality,
)
downloader = AppleMusicDownloader(
interface=interface,
@@ -527,56 +168,49 @@ async def main(
uploaded_video_downloader=uploaded_video_downloader,
)
if not synced_lyrics_only:
if not config.synced_lyrics_only:
if not base_downloader.full_ffmpeg_path and (
remux_mode == RemuxMode.FFMPEG or download_mode == DownloadMode.NM3U8DLRE
config.remux_mode == RemuxMode.FFMPEG
or config.download_mode == DownloadMode.NM3U8DLRE
):
logger.critical(X_NOT_IN_PATH.format("ffmpeg", ffmpeg_path))
return
if not base_downloader.full_mp4box_path and remux_mode == RemuxMode.MP4BOX:
logger.critical(X_NOT_IN_PATH.format("MP4Box", mp4box_path))
logger.critical(X_NOT_IN_PATH.format("ffmpeg", config.ffmpeg_path))
return
if (
not base_downloader.full_mp4decrypt_path
and song_codec
not in (
SongCodec.AAC_LEGACY,
SongCodec.AAC_HE_LEGACY,
)
or (
remux_mode == RemuxMode.MP4BOX
and not base_downloader.full_mp4decrypt_path
)
not base_downloader.full_mp4box_path
and config.remux_mode == RemuxMode.MP4BOX
):
logger.critical(X_NOT_IN_PATH.format("mp4decrypt", mp4decrypt_path))
logger.critical(X_NOT_IN_PATH.format("MP4Box", config.mp4box_path))
return
if not base_downloader.full_mp4decrypt_path and (
config.song_codec not in (SongCodec.AAC_LEGACY, SongCodec.AAC_HE_LEGACY)
or config.remux_mode == RemuxMode.MP4BOX
):
logger.critical(X_NOT_IN_PATH.format("mp4decrypt", config.mp4decrypt_path))
return
if (
download_mode == DownloadMode.NM3U8DLRE
config.download_mode == DownloadMode.NM3U8DLRE
and not base_downloader.full_nm3u8dlre_path
):
logger.critical(X_NOT_IN_PATH.format("N_m3u8DL-RE", nm3u8dlre_path))
logger.critical(X_NOT_IN_PATH.format("N_m3u8DL-RE", config.nm3u8dlre_path))
return
if not base_downloader.full_mp4decrypt_path:
logger.warning(
X_NOT_IN_PATH.format("mp4decrypt", mp4decrypt_path)
+ ", music videos will not be downloaded"
)
downloader.skip_music_videos = True
if config.use_wrapper and not base_downloader.full_amdecrypt_path:
logger.critical(X_NOT_IN_PATH.format("amdecrypt", config.amdecrypt_path))
return
if not song_codec.is_legacy() and not use_wrapper:
if not config.song_codec.is_legacy() and not config.use_wrapper:
logger.warning(
"You have chosen an experimental song codec"
" without enabling wrapper."
"They're not guaranteed to work due to API limitations."
)
if read_urls_as_txt:
if config.read_urls_as_txt:
urls_from_file = []
for url in urls:
for url in config.urls:
if Path(url).is_file() and Path(url).exists():
urls_from_file.extend(
[
@@ -586,6 +220,8 @@ async def main(
]
)
urls = urls_from_file
else:
urls = config.urls
error_count = 0
for url_index, url in enumerate(urls, 1):
@@ -613,7 +249,7 @@ async def main(
error_count += 1
logger.error(
url_progress + f' Error processing "{url}"',
exc_info=not no_exceptions,
exc_info=not config.no_exceptions,
)
if not download_queue:
@@ -650,7 +286,7 @@ async def main(
error_count += 1
logger.error(
download_queue_progress + f' Error downloading "{media_title}"',
exc_info=not no_exceptions,
exc_info=not config.no_exceptions,
)
logger.info(f"Finished with {error_count} error(s)")
+519
View File
@@ -0,0 +1,519 @@
import inspect
from dataclasses import dataclass
from pathlib import Path
from typing import Annotated
import click
from click.types import BoolParamType, FuncParamType, IntParamType, StringParamType
from dataclass_click import argument, option
from ..api import AppleMusicApi
from ..downloader import (
AppleMusicBaseDownloader,
AppleMusicMusicVideoDownloader,
AppleMusicSongDownloader,
AppleMusicUploadedVideoDownloader,
DownloadMode,
RemuxFormatMusicVideo,
RemuxMode,
)
from ..interface import (
CoverFormat,
MusicVideoCodec,
MusicVideoResolution,
SongCodec,
SyncedLyricsFormat,
UploadedVideoQuality,
)
from .utils import Csv
api_from_cookies_sig = inspect.signature(AppleMusicApi.create_from_netscape_cookies)
api_from_wrapper_sig = inspect.signature(AppleMusicApi.create_from_wrapper)
api_sig = inspect.signature(AppleMusicApi.__init__)
base_downloader_sig = inspect.signature(AppleMusicBaseDownloader.__init__)
music_video_downloader_sig = inspect.signature(AppleMusicMusicVideoDownloader.__init__)
song_downloader_sig = inspect.signature(AppleMusicSongDownloader.__init__)
uploaded_video_downloader_sig = inspect.signature(
AppleMusicUploadedVideoDownloader.__init__
)
@dataclass
class CliConfig:
# CLI specific options
urls: Annotated[
list[str],
argument(
nargs=-1,
type=str,
required=True,
),
]
read_urls_as_txt: Annotated[
bool,
option(
"--read-urls-as-txt",
"-r",
help="Read URLs from text files",
default=False,
type=BoolParamType(),
is_flag=True,
),
]
config_path: Annotated[
str,
option(
"--config-path",
help="Config file path",
default=str(Path.home() / ".gamdl" / "config.ini"),
type=click.Path(
file_okay=True,
dir_okay=False,
writable=True,
resolve_path=True,
),
),
]
log_level: Annotated[
str,
option(
"--log-level",
help="Logging level",
default="INFO",
type=click.Choice(["DEBUG", "INFO", "WARNING", "ERROR"]),
),
]
log_file: Annotated[
str,
option(
"--log-file",
help="Log file path",
default=None,
type=click.Path(
file_okay=True,
dir_okay=False,
writable=True,
resolve_path=True,
),
),
]
no_exceptions: Annotated[
bool,
option(
"--no-exceptions",
help="Don't print exceptions",
default=False,
type=BoolParamType(),
is_flag=True,
),
]
# API specific options
cookies_path: Annotated[
str,
option(
"--cookies-path",
"-c",
help="Cookies file path",
default=api_from_cookies_sig.parameters["cookies_path"].default,
type=click.Path(
file_okay=True,
dir_okay=False,
readable=True,
resolve_path=True,
),
),
]
wrapper_account_url: Annotated[
str,
option(
"--wrapper-account-url",
help="Wrapper account URL",
default=api_from_wrapper_sig.parameters["wrapper_account_url"].default,
type=StringParamType(),
),
]
language: Annotated[
str,
option(
"--language",
"-l",
help="Metadata language",
default=api_sig.parameters["language"].default,
type=StringParamType(),
),
]
# Base Downloader specific options
output_path: Annotated[
str,
option(
"--output-path",
"-o",
help="Output directory path",
default=base_downloader_sig.parameters["output_path"].default,
type=click.Path(
file_okay=False,
dir_okay=True,
writable=True,
resolve_path=True,
),
),
]
temp_path: Annotated[
str,
option(
"--temp-path",
help="Temporary directory path",
default=base_downloader_sig.parameters["temp_path"].default,
type=click.Path(
file_okay=False,
dir_okay=True,
writable=True,
resolve_path=True,
),
),
]
wvd_path: Annotated[
str,
option(
"--wvd-path",
help=".wvd file path",
default=base_downloader_sig.parameters["wvd_path"].default,
type=click.Path(
file_okay=False,
dir_okay=True,
writable=True,
resolve_path=True,
),
),
]
overwrite: Annotated[
bool,
option(
"--overwrite",
help="Overwrite existing files",
default=False,
type=BoolParamType(),
is_flag=True,
),
]
save_cover: Annotated[
bool,
option(
"--save-cover",
"-s",
help="Save cover as separate file",
default=False,
type=BoolParamType(),
is_flag=True,
),
]
save_playlist: Annotated[
bool,
option(
"--save-playlist",
help="Save M3U8 playlist file",
type=BoolParamType(),
default=False,
is_flag=True,
),
]
nm3u8dlre_path: Annotated[
str,
option(
"--nm3u8dlre-path",
help="N_m3u8DL-RE executable path",
default=base_downloader_sig.parameters["nm3u8dlre_path"].default,
type=StringParamType(),
),
]
mp4decrypt_path: Annotated[
str,
option(
"--mp4decrypt-path",
help="mp4decrypt executable path",
default=base_downloader_sig.parameters["mp4decrypt_path"].default,
type=StringParamType(),
),
]
ffmpeg_path: Annotated[
str,
option(
"--ffmpeg-path",
help="FFmpeg executable path",
default=base_downloader_sig.parameters["ffmpeg_path"].default,
type=StringParamType(),
),
]
mp4box_path: Annotated[
str,
option(
"--mp4box-path",
help="MP4Box executable path",
default=base_downloader_sig.parameters["mp4box_path"].default,
type=StringParamType(),
),
]
amdecrypt_path: Annotated[
str,
option(
"--amdecrypt-path",
help="amdecrypt executable path",
default=base_downloader_sig.parameters["amdecrypt_path"].default,
type=StringParamType(),
),
]
use_wrapper: Annotated[
bool,
option(
"--use-wrapper",
help="Use wrapper and amdecrypt for decrypting songs",
default=False,
type=BoolParamType(),
is_flag=True,
),
]
wrapper_decrypt_ip: Annotated[
str,
option(
"--wrapper-decrypt-ip",
help="IP address and port for wrapper decryption",
default=base_downloader_sig.parameters["wrapper_decrypt_ip"].default,
type=StringParamType(),
),
]
download_mode: Annotated[
DownloadMode,
option(
"--download-mode",
help="Download mode",
default=base_downloader_sig.parameters["download_mode"].default,
type=FuncParamType(DownloadMode),
),
]
remux_mode: Annotated[
RemuxMode,
option(
"--remux-mode",
help="Remux mode",
default=base_downloader_sig.parameters["remux_mode"].default,
type=FuncParamType(RemuxMode),
),
]
cover_format: Annotated[
CoverFormat,
option(
"--cover-format",
help="Cover format",
default=base_downloader_sig.parameters["cover_format"].default,
type=FuncParamType(CoverFormat),
),
]
album_folder_template: Annotated[
str,
option(
"--album-folder-template",
help="Album folder template",
default=base_downloader_sig.parameters["album_folder_template"].default,
type=StringParamType(),
),
]
compilation_folder_template: Annotated[
str,
option(
"--compilation-folder-template",
help="Compilation folder template",
default=base_downloader_sig.parameters[
"compilation_folder_template"
].default,
type=StringParamType(),
),
]
no_album_folder_template: Annotated[
str,
option(
"--no-album-folder-template",
help="No album folder template",
default=base_downloader_sig.parameters["no_album_folder_template"].default,
type=StringParamType(),
),
]
single_disc_file_template: Annotated[
str,
option(
"--single-disc-file-template",
help="Single disc file template",
default=base_downloader_sig.parameters["single_disc_file_template"].default,
type=StringParamType(),
),
]
multi_disc_file_template: Annotated[
str,
option(
"--multi-disc-file-template",
help="Multi disc file template",
default=base_downloader_sig.parameters["multi_disc_file_template"].default,
type=StringParamType(),
),
]
no_album_file_template: Annotated[
str,
option(
"--no-album-file-template",
help="No album file template",
default=base_downloader_sig.parameters["no_album_file_template"].default,
type=StringParamType(),
),
]
playlist_file_template: Annotated[
str,
option(
"--playlist-file-template",
help="Playlist file template",
default=base_downloader_sig.parameters["playlist_file_template"].default,
type=StringParamType(),
),
]
date_tag_template: Annotated[
str,
option(
"--date-tag-template",
help="Date tag template",
default=base_downloader_sig.parameters["date_tag_template"].default,
type=StringParamType(),
),
]
exclude_tags: Annotated[
list[str],
option(
"--exclude-tags",
help="Comma-separated tags to exclude",
default=base_downloader_sig.parameters["exclude_tags"].default,
type=Csv(str),
),
]
cover_size: Annotated[
int,
option(
"--cover-size",
help="Cover size in pixels",
default=base_downloader_sig.parameters["cover_size"].default,
type=IntParamType(),
),
]
truncate: Annotated[
int,
option(
"--truncate",
help="Max filename length",
default=base_downloader_sig.parameters["truncate"].default,
type=IntParamType(),
),
]
# DownloaderSong specific options
song_codec: Annotated[
SongCodec,
option(
"--song-codec",
help="Song codec",
default=song_downloader_sig.parameters["codec"].default,
type=FuncParamType(SongCodec),
),
]
synced_lyrics_format: Annotated[
SyncedLyricsFormat,
option(
"--synced-lyrics-format",
help="Synced lyrics format",
default=song_downloader_sig.parameters["synced_lyrics_format"].default,
type=FuncParamType(SyncedLyricsFormat),
),
]
no_synced_lyrics: Annotated[
bool,
option(
"--no-synced-lyrics",
help="Don't download synced lyrics",
default=False,
type=BoolParamType(),
is_flag=True,
),
]
synced_lyrics_only: Annotated[
bool,
option(
"--synced-lyrics-only",
help="Download only synced lyrics",
default=False,
type=BoolParamType(),
is_flag=True,
),
]
use_album_date: Annotated[
bool,
option(
"--use-album-date",
help="Use album release date for songs",
default=False,
type=BoolParamType(),
is_flag=True,
),
]
fetch_extra_tags: Annotated[
bool,
option(
"--fetch-extra-tags",
help="Fetch extra tags from preview (normalization and smooth playback)",
default=False,
type=BoolParamType(),
is_flag=True,
),
]
# DownloaderMusicVideo specific options
music_video_codec_priority: Annotated[
list[MusicVideoCodec],
option(
"--music-video-codec-priority",
help="Comma-separated codec priority",
default=music_video_downloader_sig.parameters["codec_priority"].default,
type=Csv(MusicVideoCodec),
),
]
music_video_remux_format: Annotated[
RemuxFormatMusicVideo,
option(
"--music-video-remux-format",
help="Music video remux format",
default=music_video_downloader_sig.parameters["remux_format"].default,
type=FuncParamType(RemuxFormatMusicVideo),
),
]
music_video_resolution: Annotated[
MusicVideoResolution,
option(
"--music-video-resolution",
help="Max music video resolution",
default=music_video_downloader_sig.parameters["resolution"].default,
type=FuncParamType(MusicVideoResolution),
),
]
# DownloaderUploadedVideo specific options
uploaded_video_quality: Annotated[
UploadedVideoQuality,
option(
"--uploaded-video-quality",
help="Post video quality",
default=uploaded_video_downloader_sig.parameters["quality"].default,
type=FuncParamType(UploadedVideoQuality),
),
]
no_config_file: Annotated[
bool,
option(
"--no-config-file",
"-n",
help="Don't use a config file",
default=False,
type=BoolParamType(),
is_flag=True,
),
]
+96 -37
View File
@@ -1,14 +1,25 @@
import configparser
import typing
from dataclasses import dataclass
from pathlib import Path
from typing import get_type_hints
import click
from click.types import BoolParamType, FuncParamType
import click.types as click_types
from dataclass_click.dataclass_click import _DelayedCall
from .cli_config import CliConfig
from .constants import EXCLUDED_CONFIG_FILE_PARAMS
from .utils import Csv
@dataclass
class ParameterInfo:
name: str
default: typing.Any
type: typing.Any
class ConfigFile:
def __init__(
self,
@@ -17,9 +28,34 @@ class ConfigFile:
) -> None:
self.config_path = config_path
self.section_name = section_name
self.parameters = self._extract_parameters_from_cli_config()
self._read_config_file()
def _extract_parameters_from_cli_config(self) -> dict[str, ParameterInfo]:
parameters = {}
hints = get_type_hints(CliConfig, include_extras=True)
for field_name, hint in hints.items():
if hasattr(hint, "__metadata__"):
for metadata in hint.__metadata__:
if isinstance(metadata, _DelayedCall):
param_type = metadata.kwargs.get("type")
if param_type is None:
raise ValueError(
f"Parameter type for field '{field_name}' "
"could not be determined."
)
parameters[field_name] = ParameterInfo(
name=field_name,
default=metadata.kwargs.get("default"),
type=param_type,
)
break
return parameters
def _read_config_file(self) -> None:
self.config = configparser.ConfigParser(interpolation=None)
@@ -35,64 +71,81 @@ class ConfigFile:
with open(self.config_path, "w", encoding="utf-8") as config_file:
self.config.write(config_file)
def _serialize_param_default(self, param: click.Parameter) -> str:
if param.default is None:
def _serialize_param_default(self, param_info: ParameterInfo) -> str:
if param_info.default is None:
return "null"
if isinstance(param.type, Csv):
return ",".join(item.value for item in param.default)
if isinstance(param_info.type, Csv):
return ",".join(
item.value if hasattr(item, "value") else str(item)
for item in param_info.default
)
if isinstance(param.type, BoolParamType):
return str(param.default).lower()
if isinstance(param_info.type, click_types.FuncParamType):
return param_info.default.value
if isinstance(param.type, FuncParamType):
return param.default.value
if isinstance(param_info.type, click_types.BoolParamType):
return "true" if param_info.default else "false"
return str(param.default)
if isinstance(
param_info.type,
click_types.Choice
| click_types.Path
| click_types.StringParamType
| click_types.IntParamType,
):
return str(param_info.default)
raise NotImplementedError(
f"Serialization for parameter '{param_info.name}' of type "
f"'{type(param_info.type)}' is not implemented."
)
def _add_param_default_to_config(
self,
param: click.Parameter,
param_info: ParameterInfo,
) -> bool:
if self.config.has_option(self.section_name, param.name):
if self.config.has_option(self.section_name, param_info.name):
return False
value = self._serialize_param_default(param)
self.config.set(self.section_name, param.name, value)
value = self._serialize_param_default(param_info)
self.config.set(self.section_name, param_info.name, value)
return True
def _parse_param_from_config(
self,
param: click.Parameter,
param_info: ParameterInfo,
) -> typing.Any:
value = self.config[self.section_name].get(param.name)
value = self.config[self.section_name].get(param_info.name)
if value is None:
return param_info.default
if value == "null":
return None
return param.type_cast_value(None, value)
if not isinstance(param_info.type, click_types.ParamType):
raise NotImplementedError(
f"Parsing for parameter '{param_info.name}' of type "
f"'{type(param_info.type)}' is not implemented."
)
def add_params_default_to_config(
self,
params: list[click.Parameter],
) -> None:
return param_info.type.convert(value, None, None)
def add_params_default_to_config(self) -> None:
has_changes = False
for param in params:
if param.name in EXCLUDED_CONFIG_FILE_PARAMS:
for param_info in self.parameters.values():
if param_info.name in EXCLUDED_CONFIG_FILE_PARAMS:
continue
has_changes = self._add_param_default_to_config(param) or has_changes
has_changes = self._add_param_default_to_config(param_info) or has_changes
if has_changes:
self._write_config_file()
def cleanup_unknown_params(
self,
params: list[click.Parameter],
) -> None:
param_names = {param.name for param in params}
def cleanup_unknown_params(self) -> None:
param_names = {info.name for info in self.parameters.values()}
has_changes = False
for key in list(self.config[self.section_name].keys()):
@@ -103,13 +156,19 @@ class ConfigFile:
if has_changes:
self._write_config_file()
def parse_params_from_config(
self,
params: list[click.Parameter],
) -> dict[str, typing.Any]:
parsed_params = {}
def update_params_from_config(self, config: CliConfig) -> CliConfig:
updates = {}
click_context = click.get_current_context()
for param_info in self.parameters.values():
if (
click_context.get_parameter_source(param_info.name)
== click.core.ParameterSource.COMMANDLINE
):
continue
for param in params:
parsed_params[param.name] = self._parse_param_from_config(param)
if self.config.has_option(self.section_name, param_info.name):
updates[param_info.name] = self._parse_param_from_config(param_info)
return parsed_params
config_dict = config.__dict__.copy()
config_dict.update(updates)
return CliConfig(**config_dict)
-4
View File
@@ -1,10 +1,6 @@
import re
DEFAULT_SONG_DECRYPTION_KEY = "32b8ade1769e26b1ffb8986352793fc6"
IMAGE_FILE_EXTENSION_MAP = {
"jpeg": ".jpg",
"tiff": ".tif",
}
TEMP_PATH_TEMPLATE = "gamdl_temp_{}"
ILLEGAL_CHARS_RE = r'[\\/:*?"<>|;]'
ILLEGAL_CHAR_REPLACEMENT = "_"
+62 -50
View File
@@ -24,9 +24,10 @@ from .enums import DownloadMode, RemuxMode
from .exceptions import (
ExecutableNotFound,
FormatNotAvailable,
MediaFileExists,
NotStreamable,
SyncedLyricsOnly,
MediaFileExists,
UnsupportedMediaType,
)
from .types import DownloadItem, UrlInfo
@@ -79,23 +80,42 @@ class AppleMusicDownloader:
media_metadata: dict,
playlist_metadata: dict = None,
) -> DownloadItem:
download_item = None
if media_metadata["type"] in SONG_MEDIA_TYPE:
download_item = await self.song_downloader.get_download_item(
try:
if not self.base_downloader.is_media_streamable(
media_metadata,
playlist_metadata,
)
):
raise NotStreamable(media_metadata["id"])
if media_metadata["type"] in MUSIC_VIDEO_MEDIA_TYPE:
download_item = await self.music_video_downloader.get_download_item(
media_metadata,
playlist_metadata,
)
if media_metadata["type"] in SONG_MEDIA_TYPE:
if not self.song_downloader:
raise UnsupportedMediaType(media_metadata["type"])
if media_metadata["type"] in UPLOADED_VIDEO_MEDIA_TYPE:
download_item = await self.uploaded_video_downloader.get_download_item(
media_metadata,
download_item = await self.song_downloader.get_download_item(
media_metadata,
playlist_metadata,
)
if media_metadata["type"] in MUSIC_VIDEO_MEDIA_TYPE:
if not self.music_video_downloader:
raise UnsupportedMediaType(media_metadata["type"])
download_item = await self.music_video_downloader.get_download_item(
media_metadata,
playlist_metadata,
)
if media_metadata["type"] in UPLOADED_VIDEO_MEDIA_TYPE:
if not self.uploaded_video_downloader:
raise UnsupportedMediaType(media_metadata["type"])
download_item = await self.uploaded_video_downloader.get_download_item(
media_metadata,
)
except Exception as e:
download_item = DownloadItem(
media_metadata=media_metadata,
playlist_metadata=playlist_metadata,
error=e,
)
return download_item
@@ -111,15 +131,13 @@ class AppleMusicDownloader:
tracks_metadata.extend(extended_data["data"])
tasks = [
asyncio.create_task(
self.get_single_download_item(
media_metadata,
(
collection_metadata
if collection_metadata["type"] in PLAYLIST_MEDIA_TYPE
else None
),
)
self.get_single_download_item(
media_metadata,
(
collection_metadata
if collection_metadata["type"] in PLAYLIST_MEDIA_TYPE
else None
),
)
for media_metadata in tracks_metadata
]
@@ -196,17 +214,13 @@ class AppleMusicDownloader:
download_items = []
album_tasks = [
asyncio.create_task(
self.interface.apple_music_api.get_album(album_metadata["id"])
)
self.interface.apple_music_api.get_album(album_metadata["id"])
for album_metadata in selected
]
album_responses = await safe_gather(*album_tasks)
track_tasks = [
asyncio.create_task(
self.get_collection_download_items(album_response["data"][0])
)
self.get_collection_download_items(album_response["data"][0])
for album_response in album_responses
]
track_results = await safe_gather(*track_tasks)
@@ -243,10 +257,8 @@ class AppleMusicDownloader:
).execute_async()
music_video_tasks = [
asyncio.create_task(
self.get_single_download_item(
music_video_metadata,
)
self.get_single_download_item(
music_video_metadata,
)
for music_video_metadata in selected
]
@@ -368,15 +380,15 @@ class AppleMusicDownloader:
download_item: DownloadItem,
) -> DownloadItem:
try:
if download_item.error:
raise download_item.error
if download_item.flat_filter_result:
download_item = await self.get_single_download_item_no_filter(
download_item.media_metadata,
download_item.playlist_metadata,
)
if download_item.error:
raise download_item.error
await self._initial_processing(download_item)
await self._download(download_item)
await self._final_processing(download_item)
@@ -399,11 +411,6 @@ class AppleMusicDownloader:
if self.song_downloader.synced_lyrics_only:
return
if not self.base_downloader.is_media_streamable(
download_item.media_metadata,
):
raise NotStreamable(download_item.media_metadata["id"])
if (
Path(download_item.final_path).exists()
and not self.base_downloader.overwrite
@@ -448,10 +455,18 @@ class AppleMusicDownloader:
raise ExecutableNotFound("N_m3u8DL-RE")
if (
not download_item.decryption_key
or not download_item.decryption_key.audio_track
or not download_item.decryption_key.audio_track.key
) and not self.base_downloader.use_wrapper:
not download_item.stream_info
or not download_item.stream_info.audio_track
or not download_item.stream_info.audio_track.stream_url
or (
(
not download_item.decryption_key
or not download_item.decryption_key.audio_track
or not download_item.decryption_key.audio_track.key
)
and not self.base_downloader.use_wrapper
)
):
raise FormatNotAvailable(download_item.media_metadata["id"])
if download_item.media_metadata["type"] in SONG_MEDIA_TYPE:
@@ -471,10 +486,7 @@ class AppleMusicDownloader:
return
if download_item.cover_path and self.base_downloader.save_cover:
cover_url = self.base_downloader.get_cover_url(
download_item.cover_url_template,
)
cover_bytes = await self.base_downloader.get_cover_bytes(cover_url)
cover_bytes = await self.interface.get_cover_bytes(download_item.cover_url)
if cover_bytes and (
self.base_downloader.overwrite
or not Path(download_item.cover_path).exists()
+102 -137
View File
@@ -2,25 +2,17 @@ import asyncio
import re
import shutil
import uuid
from io import BytesIO
from pathlib import Path
import httpx
from async_lru import alru_cache
from mutagen.mp4 import MP4, MP4Cover
from PIL import Image
from pywidevine import Cdm, Device
from yt_dlp import YoutubeDL
from ..interface.enums import CoverFormat
from ..interface.types import MediaTags, PlaylistTags
from ..utils import async_subprocess, raise_for_status
from .constants import (
ILLEGAL_CHAR_REPLACEMENT,
ILLEGAL_CHARS_RE,
IMAGE_FILE_EXTENSION_MAP,
TEMP_PATH_TEMPLATE,
)
from .enums import CoverFormat, DownloadMode, RemuxMode
from ..utils import CustomStringFormatter, async_subprocess
from .constants import ILLEGAL_CHAR_REPLACEMENT, ILLEGAL_CHARS_RE, TEMP_PATH_TEMPLATE
from .enums import DownloadMode, RemuxMode
from .hardcoded_wvd import HARDCODED_WVD
@@ -84,19 +76,20 @@ class AppleMusicBaseDownloader:
self.cover_size = cover_size
self.truncate = truncate
self.silent = silent
self.initialize()
def setup(self):
self._setup_binary_paths()
self._setup_cdm()
def initialize(self):
self._initialize_binary_paths()
self._initialize_cdm()
def _setup_binary_paths(self):
def _initialize_binary_paths(self):
self.full_nm3u8dlre_path = shutil.which(self.nm3u8dlre_path)
self.full_mp4decrypt_path = shutil.which(self.mp4decrypt_path)
self.full_ffmpeg_path = shutil.which(self.ffmpeg_path)
self.full_mp4box_path = shutil.which(self.mp4box_path)
self.full_amdecrypt_path = shutil.which(self.amdecrypt_path)
def _setup_cdm(self):
def _initialize_cdm(self):
if self.wvd_path:
self.cdm = Cdm.from_device(Device.load(self.wvd_path))
else:
@@ -112,22 +105,6 @@ class AppleMusicBaseDownloader:
) -> bool:
return bool(media_metadata["attributes"].get("playParams"))
async def get_cover_file_extension(self, cover_url_template: str) -> str | None:
if self.cover_format != CoverFormat.RAW:
return f".{self.cover_format.value}"
cover_url = self.get_cover_url(cover_url_template)
cover_bytes = await self.get_cover_bytes(cover_url)
if cover_bytes is None:
return None
image_obj = Image.open(BytesIO(self.get_cover_bytes(cover_url)))
image_format = image_obj.format.lower()
return IMAGE_FILE_EXTENSION_MAP.get(
image_format,
f".{image_format.lower()}",
)
def get_playlist_tags(
self,
playlist_metadata: dict,
@@ -160,112 +137,98 @@ class AppleMusicBaseDownloader:
/ (f"{media_id}_{file_tag}" + file_extension)
)
@alru_cache()
async def get_cover_bytes(self, cover_url: str) -> bytes | None:
async with httpx.AsyncClient() as client:
response = await client.get(cover_url)
raise_for_status(response, {200, 404})
if response.status_code == 200:
return response.content
return None
def get_sanitized_string(self, dirty_string: str, is_folder: bool) -> str:
dirty_string = re.sub(
def sanitize_string(
self,
dirty_string: str,
file_ext: str = None,
) -> str:
sanitized_string = re.sub(
ILLEGAL_CHARS_RE,
ILLEGAL_CHAR_REPLACEMENT,
dirty_string,
)
if is_folder:
dirty_string = dirty_string[: self.truncate]
if dirty_string.endswith("."):
dirty_string = dirty_string[:-1] + ILLEGAL_CHAR_REPLACEMENT
if file_ext is None:
sanitized_string = sanitized_string[: self.truncate]
if sanitized_string.endswith("."):
sanitized_string = sanitized_string[:-1] + ILLEGAL_CHAR_REPLACEMENT
else:
if self.truncate is not None:
dirty_string = dirty_string[: self.truncate - 4]
return dirty_string.strip()
sanitized_string = sanitized_string[: self.truncate - len(file_ext)]
sanitized_string += file_ext
return sanitized_string.strip()
def get_final_path(
self,
tags: MediaTags,
file_extension: str,
playlist_tags: PlaylistTags,
playlist_tags: PlaylistTags | None,
) -> str:
if tags.album is not None:
template_folder = (
if tags.album:
template_folder_parts = (
self.compilation_folder_template.split("/")
if tags.compilation
else self.album_folder_template.split("/")
)
template_file = (
else:
template_folder_parts = self.no_album_folder_template.split("/")
if tags.album:
template_file_parts = (
self.multi_disc_file_template.split("/")
if tags.disc_total > 1
if isinstance(tags.disc_total, int) and tags.disc_total > 1
else self.single_disc_file_template.split("/")
)
else:
template_folder = self.no_album_folder_template.split("/")
template_file = self.no_album_file_template.split("/")
template_file_parts = self.no_album_file_template.split("/")
template_final = template_folder + template_file
template_parts = template_folder_parts + template_file_parts
formatted_parts = []
tags_dict = tags.__dict__.copy()
if playlist_tags:
tags_dict.update(playlist_tags.__dict__)
return str(
Path(
self.output_path,
*[
self.get_sanitized_string(i.format(**tags_dict), True)
for i in template_final[0:-1]
],
(
self.get_sanitized_string(
template_final[-1].format(**tags_dict), False
)
+ file_extension
for i, part in enumerate(template_parts):
is_folder = i < len(template_parts) - 1
formatted_part = CustomStringFormatter().format(
part,
album=(tags.album, "Unknown Album"),
album_artist=(tags.album_artist, "Unknown Artist"),
album_id=(tags.album_id, "Unknown Album ID"),
artist=(tags.artist, "Unknown Artist"),
artist_id=(tags.artist_id, "Unknown Artist ID"),
composer=(tags.composer, "Unknown Composer"),
composer_id=(tags.composer_id, "Unknown Composer ID"),
date=(tags.date, "Unknown Date"),
disc=(tags.disc, ""),
disc_total=(tags.disc_total, ""),
media_type=(tags.media_type, "Unknown Media Type"),
playlist_artist=(
(playlist_tags.playlist_artist if playlist_tags else None),
"Unknown Playlist Artist",
),
playlist_id=(
(playlist_tags.playlist_id if playlist_tags else None),
"Unknown Playlist ID",
),
playlist_title=(
(playlist_tags.playlist_title if playlist_tags else None),
"Unknown Playlist Title",
),
playlist_track=(
(playlist_tags.playlist_track if playlist_tags else None),
"",
),
title=(tags.title, "Unknown Title"),
title_id=(tags.title_id, "Unknown Title ID"),
track=(tags.track, ""),
track_total=(tags.track_total, ""),
)
)
sanitized_formatted_part = self.sanitize_string(
formatted_part,
file_extension if not is_folder else None,
)
formatted_parts.append(sanitized_formatted_part)
def get_cover_url_template(self, metadata: dict) -> str:
if self.cover_format == CoverFormat.RAW:
return self._get_raw_cover_url(metadata["attributes"]["artwork"]["url"])
return metadata["attributes"]["artwork"]["url"]
def _get_raw_cover_url(self, cover_url_template: str) -> str:
return re.sub(
r"image/thumb/",
"",
re.sub(
r"is1-ssl",
"a1",
cover_url_template,
),
)
def get_cover_url(self, cover_url_template: str) -> str:
return self.format_cover_url(
cover_url_template,
self.cover_size,
self.cover_format.value,
)
def format_cover_url(
self,
cover_url_template: str,
cover_size: int,
cover_format: str,
) -> str:
return re.sub(
r"\{w\}x\{h\}([a-z]{2})\.jpg",
(
f"{cover_size}x{cover_size}bb.{cover_format}"
if self.cover_format != CoverFormat.RAW
else ""
),
cover_url_template,
)
return str(Path(self.output_path, *formatted_parts))
async def download_stream(self, stream_url: str, download_path: str):
if self.download_mode == DownloadMode.YTDLP:
@@ -322,7 +285,8 @@ class AppleMusicBaseDownloader:
self,
media_path: Path,
tags: MediaTags,
cover_url_template: str,
cover_bytes: bytes | None,
extra_tags: dict | None = None,
):
exclude_tags = self.exclude_tags or []
@@ -335,9 +299,6 @@ class AppleMusicBaseDownloader:
)
mp4_tags = filtered_tags.as_mp4_tags(self.date_tag_template)
cover_url = self.get_cover_url(cover_url_template)
cover_bytes = await self.get_cover_bytes(cover_url)
skip_tagging = "all" in exclude_tags
await asyncio.to_thread(
@@ -346,6 +307,7 @@ class AppleMusicBaseDownloader:
mp4_tags,
cover_bytes,
skip_tagging,
extra_tags,
)
def apply_mp4_tags(
@@ -354,6 +316,7 @@ class AppleMusicBaseDownloader:
tags: dict,
cover_bytes: bytes | None,
skip_tagging: bool,
extra_tags: dict | None,
):
mp4 = MP4(media_path)
mp4.clear()
@@ -371,16 +334,16 @@ class AppleMusicBaseDownloader:
)
]
mp4.update(tags)
if extra_tags:
mp4.update(extra_tags)
mp4.save()
async def _apply_cover(
self,
mp4: MP4,
cover_url_template: str,
cover_bytes: bytes | None,
) -> None:
cover_url = self.get_cover_url(cover_url_template)
cover_bytes = await self.get_cover_bytes(cover_url)
if cover_bytes is None:
return
@@ -411,24 +374,26 @@ class AppleMusicBaseDownloader:
self,
tags: PlaylistTags,
) -> str:
template_file = self.playlist_file_template.split("/")
tags_dict = tags.__dict__.copy()
template_file_parts = self.playlist_file_template.split("/")
formatted_parts = []
return str(
Path(
self.output_path,
*[
self.get_sanitized_string(i.format(**tags_dict), True)
for i in template_file[0:-1]
],
*[
self.get_sanitized_string(
template_file[-1].format(**tags_dict), False
)
+ ".m3u8"
],
for i, part in enumerate(template_file_parts):
is_folder = i < len(template_file_parts) - 1
formatted_part = CustomStringFormatter().format(
part,
playlist_artist=(tags.playlist_artist, "Unknown Playlist Artist"),
playlist_id=(tags.playlist_id, "Unknown Playlist ID"),
playlist_title=(tags.playlist_title, "Unknown Playlist Title"),
playlist_track=(tags.playlist_track, ""),
)
)
file_ext = None if is_folder else ".m3u8"
sanitized_formatted_part = self.sanitize_string(
formatted_part,
file_ext,
)
formatted_parts.append(sanitized_formatted_part)
return str(Path(self.output_path, *formatted_parts))
def update_playlist_file(
self,
+12 -20
View File
@@ -142,23 +142,6 @@ class AppleMusicMusicVideoDownloader(AppleMusicBaseDownloader):
self,
music_video_metadata: dict,
playlist_metadata: dict = None,
) -> DownloadItem:
try:
return await self._get_download_item(
music_video_metadata,
playlist_metadata,
)
except Exception as e:
return DownloadItem(
media_metadata=music_video_metadata,
playlist_metadata=playlist_metadata,
error=e,
)
async def _get_download_item(
self,
music_video_metadata: dict,
playlist_metadata: dict = None,
) -> DownloadItem:
download_item = DownloadItem()
@@ -218,11 +201,19 @@ class AppleMusicMusicVideoDownloader(AppleMusicBaseDownloader):
playlist_metadata,
)
download_item.cover_url_template = self.get_cover_url_template(
download_item.cover_url_template = self.interface.get_cover_url_template(
music_video_metadata,
self.cover_format,
)
cover_file_extension = await self.get_cover_file_extension(
download_item.cover_url = self.interface.get_cover_url(
download_item.cover_url_template,
self.cover_size,
self.cover_format,
)
cover_file_extension = await self.interface.get_cover_file_extension(
download_item.cover_url,
self.cover_format,
)
if cover_file_extension:
download_item.cover_path = self.get_cover_path(
@@ -280,8 +271,9 @@ class AppleMusicMusicVideoDownloader(AppleMusicBaseDownloader):
download_item.decryption_key,
)
cover_bytes = await self.interface.get_cover_bytes(download_item.cover_url)
await self.apply_tags(
download_item.staged_path,
download_item.media_tags,
download_item.cover_url_template,
cover_bytes,
)
+25 -22
View File
@@ -19,6 +19,8 @@ class AppleMusicSongDownloader(AppleMusicBaseDownloader):
synced_lyrics_format: SyncedLyricsFormat = SyncedLyricsFormat.LRC,
no_synced_lyrics: bool = False,
synced_lyrics_only: bool = False,
use_album_date: bool = False,
fetch_extra_tags: bool = False,
):
self.__dict__.update(base_downloader.__dict__)
self.interface = interface
@@ -26,28 +28,13 @@ class AppleMusicSongDownloader(AppleMusicBaseDownloader):
self.synced_lyrics_format = synced_lyrics_format
self.no_synced_lyrics = no_synced_lyrics
self.synced_lyrics_only = synced_lyrics_only
self.use_album_date = use_album_date
self.fetch_extra_tags = fetch_extra_tags
async def get_download_item(
self,
song_metadata: dict,
playlist_metadata: dict = None,
) -> DownloadItem:
try:
return await self._get_download_item(
song_metadata,
playlist_metadata,
)
except Exception as e:
return DownloadItem(
media_metadata=song_metadata,
playlist_metadata=playlist_metadata,
error=e,
)
async def _get_download_item(
self,
song_metadata: dict,
playlist_metadata: dict = None,
) -> DownloadItem:
download_item = DownloadItem()
@@ -62,10 +49,15 @@ class AppleMusicSongDownloader(AppleMusicBaseDownloader):
)
webplayback = await self.interface.apple_music_api.get_webplayback(song_id)
download_item.media_tags = self.interface.get_tags(
download_item.media_tags = await self.interface.get_tags(
webplayback,
download_item.lyrics.unsynced if download_item.lyrics else None,
self.use_album_date,
)
if self.fetch_extra_tags:
download_item.extra_tags = await self.interface.get_extra_tags(
song_metadata,
)
if playlist_metadata:
download_item.playlist_tags = self.get_playlist_tags(
@@ -116,7 +108,15 @@ class AppleMusicSongDownloader(AppleMusicBaseDownloader):
else:
download_item.decryption_key = None
download_item.cover_url_template = self.get_cover_url_template(song_metadata)
download_item.cover_url_template = self.interface.get_cover_url_template(
song_metadata,
self.cover_format,
)
download_item.cover_url = self.interface.get_cover_url(
download_item.cover_url_template,
self.cover_size,
self.cover_format,
)
download_item.random_uuid = self.get_random_uuid()
if download_item.stream_info and download_item.stream_info.file_format:
@@ -129,8 +129,9 @@ class AppleMusicSongDownloader(AppleMusicBaseDownloader):
else:
download_item.staged_path = None
cover_file_extension = await self.get_cover_file_extension(
download_item.cover_url_template,
cover_file_extension = await self.interface.get_cover_file_extension(
download_item.cover_url,
self.cover_format,
)
if cover_file_extension:
download_item.cover_path = self.get_cover_path(
@@ -337,8 +338,10 @@ class AppleMusicSongDownloader(AppleMusicBaseDownloader):
download_item.stream_info.audio_track.fairplay_key,
)
cover_bytes = await self.interface.get_cover_bytes(download_item.cover_url)
await self.apply_tags(
download_item.staged_path,
download_item.media_tags,
download_item.cover_url_template,
cover_bytes,
download_item.extra_tags,
)
+13 -3
View File
@@ -64,11 +64,19 @@ class AppleMusicUploadedVideoDownloader(AppleMusicBaseDownloader):
None,
)
download_item.cover_url_template = self.get_cover_url_template(
download_item.cover_url_template = self.interface.get_cover_url_template(
uploaded_video_metadata,
self.cover_format,
)
cover_file_extension = await self.get_cover_file_extension(
download_item.cover_url = self.interface.get_cover_url(
download_item.cover_url_template,
self.cover_size,
self.cover_format,
)
cover_file_extension = await self.interface.get_cover_file_extension(
download_item.cover_url,
self.cover_format,
)
if cover_file_extension:
download_item.cover_path = self.get_cover_path(
@@ -86,8 +94,10 @@ class AppleMusicUploadedVideoDownloader(AppleMusicBaseDownloader):
download_item.stream_info.video_track.stream_url,
download_item.staged_path,
)
cover_bytes = await self.interface.get_cover_bytes(download_item.cover_url)
await self.apply_tags(
download_item.staged_path,
download_item.media_tags,
download_item.cover_url_template,
cover_bytes,
)
-6
View File
@@ -11,12 +11,6 @@ class RemuxMode(Enum):
MP4BOX = "mp4box"
class CoverFormat(Enum):
JPG = "jpg"
PNG = "png"
RAW = "raw"
class RemuxFormatMusicVideo(Enum):
M4V = "m4v"
MP4 = "mp4"
+5
View File
@@ -25,3 +25,8 @@ class ExecutableNotFound(GamdlError):
class SyncedLyricsOnly(GamdlError):
def __init__(self):
super().__init__("Only downloading synced lyrics is supported")
class UnsupportedMediaType(GamdlError):
def __init__(self, media_type: str):
super().__init__(f"Unsupported media type: {media_type}")
+2
View File
@@ -17,10 +17,12 @@ class DownloadItem:
random_uuid: str = None
lyrics: Lyrics = None
media_tags: MediaTags = None
extra_tags: dict = None
playlist_tags: PlaylistTags = None
stream_info: StreamInfoAv = None
decryption_key: DecryptionKeyAv = None
cover_url_template: str = None
cover_url: str = None
staged_path: str = None
final_path: str = None
playlist_file_path: str = None
+5
View File
@@ -55,3 +55,8 @@ UPLOADED_VIDEO_QUALITY_RANK = [
"sd480pVideo",
"provisionalUploadVideo",
]
IMAGE_FILE_EXTENSION_MAP = {
"jpeg": ".jpg",
"tiff": ".tif",
}
+6
View File
@@ -87,3 +87,9 @@ class MusicVideoResolution(Enum):
class UploadedVideoQuality(Enum):
BEST = "best"
ASK = "ask"
class CoverFormat(Enum):
JPG = "jpg"
PNG = "png"
RAW = "raw"
+93
View File
@@ -2,11 +2,18 @@ import asyncio
import base64
import datetime
import logging
import re
from io import BytesIO
from async_lru import alru_cache
from PIL import Image
from pywidevine import PSSH, Cdm
from ..api.apple_music_api import AppleMusicApi
from ..api.itunes_api import ItunesApi
from ..utils import get_response
from .constants import IMAGE_FILE_EXTENSION_MAP
from .enums import CoverFormat
from .types import DecryptionKey
logger = logging.getLogger(__name__)
@@ -66,3 +73,89 @@ class AppleMusicInterface:
logger.debug(f"Decryption key: {decryption_key}")
return decryption_key
def get_cover_url_template(self, metadata: dict, cover_format: CoverFormat) -> str:
if cover_format == CoverFormat.RAW:
cover_url_template = self._get_raw_cover_url(
metadata["attributes"]["artwork"]["url"]
)
cover_url_template = metadata["attributes"]["artwork"]["url"]
logger.debug(f"Cover URL template: {cover_url_template}")
return cover_url_template
def _get_raw_cover_url(self, cover_url_template: str) -> str:
return re.sub(
r"image/thumb/",
"",
re.sub(
r"is1-ssl",
"a1",
cover_url_template,
),
)
def get_cover_url(
self,
cover_url_template: str,
cover_size: int,
cover_format: CoverFormat,
) -> str:
cover_url = re.sub(
r"\{w\}x\{h\}([a-z]{2})\.jpg",
(
f"{cover_size}x{cover_size}bb.{cover_format.value}"
if cover_format != CoverFormat.RAW
else ""
),
cover_url_template,
)
logger.debug(f"Cover URL: {cover_url}")
return cover_url
@alru_cache()
async def get_cover_file_extension(
self,
cover_url: str,
cover_format: CoverFormat,
) -> str | None:
if cover_format != CoverFormat.RAW:
return f".{cover_format.value}"
cover_url = self.get_cover_url(cover_url)
cover_bytes = await self.get_cover_bytes(cover_url)
if cover_bytes is None:
return None
image_obj = Image.open(BytesIO(self.get_cover_bytes(cover_url)))
image_format = image_obj.format.lower()
return IMAGE_FILE_EXTENSION_MAP.get(
image_format,
f".{image_format.lower()}",
)
@alru_cache()
async def get_cover_bytes(self, cover_url: str) -> bytes | None:
response = await get_response(cover_url, {200, 404})
if response.status_code == 200:
return response.content
return None
@alru_cache()
async def get_media_date(
self,
media_id: str,
) -> datetime.datetime | None:
lookup_result = await self.itunes_api.get_lookup_result(media_id)
if not lookup_result["results"]:
return None
release_date = lookup_result["results"][0].get("releaseDate")
if not release_date:
return None
parsed_date = self.parse_date(release_date)
logger.debug(f"Parsed media date: {parsed_date}")
return parsed_date
+10 -4
View File
@@ -7,7 +7,7 @@ from InquirerPy import inquirer
from InquirerPy.base.control import Choice
from pywidevine import Cdm
from ..utils import get_response_text
from ..utils import get_response
from .constants import MP4_FORMAT_CODECS
from .enums import MediaRating, MediaType, MusicVideoCodec, MusicVideoResolution
from .interface import AppleMusicInterface
@@ -139,7 +139,9 @@ class AppleMusicMusicVideoInterface(AppleMusicInterface):
webplayback_response["songList"][0],
)
playlist_master_m3u8_obj = m3u8.loads(await get_response_text(m3u8_master_url))
playlist_master_m3u8_obj = m3u8.loads(
(await get_response(m3u8_master_url)).text
)
playlist_master_m3u8_obj.base_uri = m3u8_master_url.rpartition("/")[0]
stream_info_video = await self.get_stream_info_video(
playlist_master_m3u8_obj,
@@ -318,7 +320,9 @@ class AppleMusicMusicVideoInterface(AppleMusicInterface):
stream_info.codec = playlist.stream_info.codecs
stream_info.width, stream_info.height = playlist.stream_info.resolution
playlist_m3u8_obj = m3u8.loads(await get_response_text(stream_info.stream_url))
playlist_m3u8_obj = m3u8.loads(
(await get_response(stream_info.stream_url)).text
)
stream_info.widevine_pssh = self.get_widevine_pssh(playlist_m3u8_obj)
stream_info.fairplay_key = self.get_fairplay_key(playlist_m3u8_obj)
stream_info.playready_pssh = self.get_playready_pssh(playlist_m3u8_obj)
@@ -343,7 +347,9 @@ class AppleMusicMusicVideoInterface(AppleMusicInterface):
stream_info.stream_url = playlist["uri"]
stream_info.codec = playlist["group_id"]
playlist_m3u8_obj = m3u8.loads(await get_response_text(stream_info.stream_url))
playlist_m3u8_obj = m3u8.loads(
(await get_response(stream_info.stream_url)).text
)
stream_info.widevine_pssh = self.get_widevine_pssh(playlist_m3u8_obj)
stream_info.fairplay_key = self.get_fairplay_key(playlist_m3u8_obj)
stream_info.playready_pssh = self.get_playready_pssh(playlist_m3u8_obj)
+31 -8
View File
@@ -1,6 +1,7 @@
import asyncio
import base64
import datetime
import io
import json
import logging
import re
@@ -10,10 +11,11 @@ from xml.etree import ElementTree
import m3u8
from InquirerPy import inquirer
from InquirerPy.base.control import Choice
from mutagen.mp4 import MP4
from pywidevine import PSSH, Cdm
from pywidevine.license_protocol_pb2 import WidevinePsshData
from ..utils import get_response_text
from ..utils import get_response
from .constants import DRM_DEFAULT_KEY_MAPPING, MP4_FORMAT_CODECS, SONG_CODEC_REGEX_MAP
from .enums import MediaRating, MediaType, SongCodec, SyncedLyricsFormat
from .interface import AppleMusicInterface
@@ -168,10 +170,11 @@ class AppleMusicSongInterface(AppleMusicInterface):
return f"[{timestamp.strftime('%M:%S.%f')[:-4]}]{text}"
def get_tags(
async def get_tags(
self,
webplayback: dict,
lyrics: str | None = None,
use_album_date: bool = False,
) -> MediaTags:
webplayback_metadata = webplayback["songList"][0]["assets"][0]["metadata"]
@@ -194,9 +197,13 @@ class AppleMusicSongInterface(AppleMusicInterface):
composer_sort=webplayback_metadata.get("sort-composer"),
copyright=webplayback_metadata.get("copyright"),
date=(
self.parse_date(webplayback_metadata["releaseDate"])
if webplayback_metadata.get("releaseDate")
else None
await self.get_media_date(webplayback_metadata["playlistId"])
if use_album_date
else (
self.parse_date(webplayback_metadata["releaseDate"])
if webplayback_metadata.get("releaseDate")
else None
)
),
disc=webplayback_metadata["discNumber"],
disc_total=webplayback_metadata["discCount"],
@@ -229,7 +236,7 @@ class AppleMusicSongInterface(AppleMusicInterface):
if not m3u8_master_url:
return None
m3u8_master_obj = m3u8.loads(await get_response_text(m3u8_master_url))
m3u8_master_obj = m3u8.loads((await get_response(m3u8_master_url)).text)
m3u8_master_data = m3u8_master_obj.data
if codec == SongCodec.ASK:
@@ -273,7 +280,7 @@ class AppleMusicSongInterface(AppleMusicInterface):
"com.apple.streamingkeydelivery",
)
else:
m3u8_obj = m3u8.loads(await get_response_text(stream_info.stream_url))
m3u8_obj = m3u8.loads((await get_response(stream_info.stream_url)).text)
stream_info.widevine_pssh = self._get_drm_uri_from_m3u8_keys(
m3u8_obj,
@@ -384,7 +391,7 @@ class AppleMusicSongInterface(AppleMusicInterface):
i for i in webplayback["songList"][0]["assets"] if i["flavor"] == flavor
)["URL"]
m3u8_obj = m3u8.loads(await get_response_text(stream_info.stream_url))
m3u8_obj = m3u8.loads((await get_response(stream_info.stream_url)).text)
stream_info.widevine_pssh = m3u8_obj.keys[0].uri
stream_info_av = StreamInfoAv(
@@ -457,3 +464,19 @@ class AppleMusicSongInterface(AppleMusicInterface):
cdm,
)
)
async def get_extra_tags(
self,
song_metadata: dict,
) -> dict:
previews = song_metadata["attributes"].get("previews", [])
if not previews:
return {}
preview_url = previews[0]["url"]
preview_response = await get_response(preview_url)
preview_bytes = preview_response.content
preview_tags = dict(MP4(io.BytesIO(preview_bytes)).tags)
logger.debug(f"Extra tags: {preview_tags.keys()}")
return preview_tags
+44 -18
View File
@@ -1,7 +1,8 @@
import json
import typing
import subprocess
import asyncio
import json
import string
import subprocess
import typing
import httpx
@@ -20,11 +21,14 @@ def safe_json(httpx_response: httpx.Response) -> dict:
return {}
async def get_response_text(url: str) -> str:
async with httpx.AsyncClient() as client:
async def get_response(
url: str,
valid_responses: set[int] = {200},
) -> httpx.Response:
async with httpx.AsyncClient(timeout=60.0) as client:
response = await client.get(url)
raise_for_status(response)
return response.text
raise_for_status(response, valid_responses)
return response
async def async_subprocess(*args: str, silent: bool = False) -> None:
@@ -48,24 +52,46 @@ async def async_subprocess(*args: str, silent: bool = False) -> None:
async def safe_gather(
*tasks: typing.Awaitable[typing.Any],
limit: int = 3,
retries: int = 10,
limit: int = 10,
) -> list[typing.Any]:
semaphore = asyncio.Semaphore(limit)
async def bounded_task(task: typing.Awaitable[typing.Any]) -> typing.Any:
async with semaphore:
last_exception = None
for attempt in range(retries + 1):
try:
return await task
except Exception as e:
last_exception = e
if attempt < retries:
await asyncio.sleep(2**attempt)
return last_exception
return await task
return await asyncio.gather(
*(bounded_task(task) for task in tasks),
return_exceptions=True,
)
async def sequential_gather(
*tasks: typing.Awaitable[typing.Any],
interval: float = 0.5,
) -> list[typing.Any]:
results = []
for i, task in enumerate(tasks):
try:
result = await task
results.append(result)
except Exception as e:
results.append(e)
if interval > 0 and i < len(tasks) - 1:
await asyncio.sleep(interval)
return results
class CustomStringFormatter(string.Formatter):
def format_field(self, value: typing.Any, format_spec: str) -> str:
if isinstance(value, tuple) and len(value) == 2:
actual_value, fallback_value = value
if actual_value is None:
return fallback_value
try:
return super().format_field(actual_value, format_spec)
except Exception:
return fallback_value
return super().format_field(value, format_spec)
+7 -2
View File
@@ -1,13 +1,15 @@
[project]
name = "gamdl"
version = "2.8"
version = "2.8.3"
description = "A command-line app for downloading Apple Music songs, music videos and post videos."
readme = "README.md"
license = { text = "MIT" }
license = "MIT"
requires-python = ">=3.10"
dependencies = [
"async-lru>=2.0.5",
"click>=8.3.0",
"colorama>=0.4.6",
"dataclass-click>=1.0.4",
"httpx>=0.28.1",
"inquirerpy>=0.3.4",
"m3u8>=6.0.0",
@@ -22,3 +24,6 @@ Repository = "https://github.com/glomatico/gamdl"
[project.scripts]
gamdl = "gamdl.cli.cli:main"
[tool.setuptools]
packages = ["gamdl"]
Generated
+17 -1
View File
@@ -188,6 +188,18 @@ version = "2.8.8"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/b6/2c/66bab4fef920ef8caa3e180ea601475b2cbbe196255b18f1c58215940607/construct-2.8.8.tar.gz", hash = "sha256:1b84b8147f6fd15bcf64b737c3e8ac5100811ad80c830cb4b2545140511c4157", size = 717694, upload-time = "2016-10-20T22:29:12.563Z" }
[[package]]
name = "dataclass-click"
version = "1.0.4"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "click" },
]
sdist = { url = "https://files.pythonhosted.org/packages/89/82/5b6035efd90621771fa039960eab3e1ec7ff2a8625033272856843e8bd27/dataclass_click-1.0.4.tar.gz", hash = "sha256:10e7de638dd9e68ae9abd5086f61d8ddee42b1873a70f5fd9fd2167856afac11", size = 7580, upload-time = "2025-10-10T21:11:31.956Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/86/dc/38a94a2eb5f756724a6dc87a7aea38f7b747fe7b2e9daabc34a65e6cd9ac/dataclass_click-1.0.4-py3-none-any.whl", hash = "sha256:a225d30c04e4abbdba411cc3d5ec0a2ea829e1dca6500afe5f87cc243e5ead72", size = 8553, upload-time = "2025-10-10T21:11:30.514Z" },
]
[[package]]
name = "exceptiongroup"
version = "1.3.0"
@@ -202,11 +214,13 @@ wheels = [
[[package]]
name = "gamdl"
version = "2.7"
version = "2.8.3"
source = { virtual = "." }
dependencies = [
{ name = "async-lru" },
{ name = "click" },
{ name = "colorama" },
{ name = "dataclass-click" },
{ name = "httpx" },
{ name = "inquirerpy" },
{ name = "m3u8" },
@@ -220,6 +234,8 @@ dependencies = [
requires-dist = [
{ name = "async-lru", specifier = ">=2.0.5" },
{ name = "click", specifier = ">=8.3.0" },
{ name = "colorama", specifier = ">=0.4.6" },
{ name = "dataclass-click", specifier = ">=1.0.4" },
{ name = "httpx", specifier = ">=0.28.1" },
{ name = "inquirerpy", specifier = ">=0.3.4" },
{ name = "m3u8", specifier = ">=6.0.0" },