Compare commits

..

14 Commits

Author SHA1 Message Date
Rafael Moraes 24de608bc8 bump version 2025-05-12 09:28:28 -03:00
Rafael Moraes d0e2e08748 use prompt_path function for wvd and cookies 2025-05-12 09:11:58 -03:00
Rafael Moraes 2223d36d5e added prompt_path function 2025-05-12 09:11:07 -03:00
Rafael Moraes 3077456ab7 update cover path retrieval to use downloader_song 2025-05-09 14:19:39 -03:00
Rafael Moraes bbd96cbe6b remove redundant cover path lines 2025-05-09 14:16:33 -03:00
Rafael Moraes ca16a208ba rename custom_formatter to custom_logger_formatter 2025-05-09 14:07:31 -03:00
Rafael Moraes c32c8622b7 add error handling for missing media-user-token in cookies 2025-05-09 14:04:24 -03:00
Rafael Moraes 132ae0ea56 improve storefront retrieval 2025-05-05 23:04:09 -03:00
Rafael Moraes 70238facac better handling for media that has no cover 2025-02-25 02:35:39 -03:00
Rafael Moraes 4fb1fb609b Update custom_formatter.py 2025-02-23 16:28:38 -03:00
Rafael Moraes f97b3dba14 bump version 2025-02-23 04:36:05 -03:00
Rafael Moraes 2da824ecbc add colorama to dependencies 2025-02-23 04:34:28 -03:00
Rafael Moraes 24810da4b6 replace inline response exception handling with utility function 2025-02-23 04:32:50 -03:00
Rafael Moraes f16a30549c refactor logging color handling to use colorama and add utility function for colored text 2025-02-23 04:30:11 -03:00
10 changed files with 144 additions and 79 deletions
+1 -1
View File
@@ -1 +1 @@
__version__ = "2.4"
__version__ = "2.4.2"
+29 -11
View File
@@ -9,6 +9,8 @@ from pathlib import Path
import requests
from .utils import raise_response_exception
class AppleMusicApi:
APPLE_MUSIC_HOMEPAGE_URL = "https://beta.music.apple.com"
@@ -36,8 +38,13 @@ class AppleMusicApi:
cookies = MozillaCookieJar(self.cookies_path)
cookies.load(ignore_discard=True, ignore_expires=True)
self.session.cookies.update(cookies)
self.storefront = self.session.cookies.get_dict()["itua"]
media_user_token = self.session.cookies.get_dict()["media-user-token"]
media_user_token = self.session.cookies.get_dict().get("media-user-token")
if not media_user_token:
raise ValueError(
"media-user-token not found in cookies. "
"Make sure you're logged in to Apple Music, have an active subscription, and "
"exported the cookies from the Apple Music homepage."
)
else:
media_user_token = ""
self.session.headers.update(
@@ -68,12 +75,7 @@ class AppleMusicApi:
token = re.search('(?=eyJh)(.*?)(?=")', index_js_page).group(1)
self.session.headers.update({"authorization": f"Bearer {token}"})
self.session.params = {"l": self.language}
@staticmethod
def _raise_response_exception(response: requests.Response):
raise Exception(
f"Request failed with status code {response.status_code}: {response.text}"
)
self._set_storefront()
def _check_amp_api_response(self, response: requests.Response):
try:
@@ -85,7 +87,23 @@ class AppleMusicApi:
requests.exceptions.JSONDecodeError,
AssertionError,
):
self._raise_response_exception(response)
raise_response_exception(response)
def _set_storefront(self):
if self.cookies_path:
self.storefront = (
self.session.cookies.get_dict().get("itua")
or self.get_user_storefront()["id"]
)
else:
self.storefront = self.storefront or "us"
def get_user_storefront(
self,
) -> dict:
response = self.session.get(f"{self.AMP_API_URL}/v1/me/storefront")
self._check_amp_api_response(response)
return response.json()["data"][0]
def get_artist(
self,
@@ -254,7 +272,7 @@ class AppleMusicApi:
requests.exceptions.JSONDecodeError,
AssertionError,
):
self._raise_response_exception(response)
raise_response_exception(response)
return webplayback[0]
def get_widevine_license(
@@ -284,5 +302,5 @@ class AppleMusicApi:
requests.exceptions.JSONDecodeError,
AssertionError,
):
self._raise_response_exception(response)
raise_response_exception(response)
return widevine_license
+33 -30
View File
@@ -7,12 +7,12 @@ from enum import Enum
from pathlib import Path
import click
from termcolor import colored
import colorama
from . import __version__
from .apple_music_api import AppleMusicApi
from .constants import *
from .custom_formatter import CustomFormatter
from .custom_logger_formatter import CustomLoggerFormatter
from .downloader import Downloader
from .downloader_music_video import DownloaderMusicVideo
from .downloader_post import DownloaderPost
@@ -20,6 +20,7 @@ from .downloader_song import DownloaderSong
from .downloader_song_legacy import DownloaderSongLegacy
from .enums import CoverFormat, DownloadMode, MusicVideoCodec, PostQuality, RemuxMode
from .itunes_api import ItunesApi
from .utils import color_text, prompt_path
apple_music_api_sig = inspect.signature(AppleMusicApi.__init__)
downloader_sig = inspect.signature(Downloader.__init__)
@@ -350,20 +351,14 @@ def main(
quality_post: PostQuality,
no_config_file: bool,
):
colorama.just_fix_windows_console()
logger = logging.getLogger(__name__)
logger.setLevel(log_level)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(CustomFormatter())
stream_handler.setFormatter(CustomLoggerFormatter())
logger.addHandler(stream_handler)
logger.info("Starting Gamdl")
while not cookies_path.exists():
cookies_path_str = click.prompt(
X_NOT_FOUND_STRING.format("Cookies file", cookies_path.absolute())
+ ". Move it to that location or drag and drop it here. Then, press enter to continue",
default=str(cookies_path),
show_default=False,
)
cookies_path = Path(cookies_path_str.strip('"'))
prompt_path("Cookies file", cookies_path)
apple_music_api = AppleMusicApi(
cookies_path,
language=language,
@@ -415,9 +410,8 @@ def main(
quality_post,
)
if not synced_lyrics_only:
if wvd_path and not wvd_path.exists():
logger.critical(X_NOT_FOUND_STRING.format(".wvd file", wvd_path))
return
if wvd_path:
prompt_path(".wvd file", wvd_path)
logger.debug("Setting up CDM")
downloader.set_cdm()
if not downloader.ffmpeg_path_full and (
@@ -466,7 +460,7 @@ def main(
_urls.extend(Path(url).read_text(encoding="utf-8").splitlines())
urls = _urls
for url_index, url in enumerate(urls, start=1):
url_progress = colored(f"URL {url_index}/{len(urls)}", "grey")
url_progress = color_text(f"URL {url_index}/{len(urls)}", colorama.Style.DIM)
try:
logger.info(f'({url_progress}) Checking "{url}"')
url_info = downloader.get_url_info(url)
@@ -482,9 +476,9 @@ def main(
for download_index, track_metadata in enumerate(
download_queue_tracks_metadata, start=1
):
queue_progress = colored(
queue_progress = color_text(
f"Track {download_index}/{len(download_queue_tracks_metadata)} from URL {url_index}/{len(urls)}",
"grey",
colorama.Style.DIM,
)
try:
remuxed_path = None
@@ -533,10 +527,13 @@ def main(
)
cover_url = downloader.get_cover_url(track_metadata)
cover_file_extesion = downloader.get_cover_file_extension(cover_url)
cover_path = downloader_song.get_cover_path(
final_path,
cover_file_extesion,
)
if cover_file_extesion:
cover_path = downloader_song.get_cover_path(
final_path,
cover_file_extesion,
)
else:
cover_path = None
if synced_lyrics_only:
pass
elif final_path.exists() and not overwrite:
@@ -655,10 +652,13 @@ def main(
final_path = downloader.get_final_path(tags, ".m4v")
cover_url = downloader.get_cover_url(track_metadata)
cover_file_extesion = downloader.get_cover_file_extension(cover_url)
cover_path = downloader_music_video.get_cover_path(
final_path,
cover_file_extesion,
)
if cover_file_extesion:
cover_path = downloader_music_video.get_cover_path(
final_path,
cover_file_extesion,
)
else:
cover_path = None
if final_path.exists() and not overwrite:
logger.warning(
f'({queue_progress}) Music video already exists at "{final_path}", skipping'
@@ -732,10 +732,13 @@ def main(
final_path = downloader.get_final_path(tags, ".m4v")
cover_url = downloader.get_cover_url(track_metadata)
cover_file_extesion = downloader.get_cover_file_extension(cover_url)
cover_path = downloader_music_video.get_cover_path(
final_path,
cover_file_extesion,
)
if cover_file_extesion:
cover_path = downloader_music_video.get_cover_path(
final_path,
cover_file_extesion,
)
else:
cover_path = None
if final_path.exists() and not overwrite:
logger.warning(
f'({queue_progress}) Post video already exists at "{final_path}", skipping'
@@ -746,7 +749,7 @@ def main(
)
logger.debug(f'Downloading to "{remuxed_path}"')
downloader.download_ytdlp(remuxed_path, stream_url)
if synced_lyrics_only or not save_cover:
if synced_lyrics_only or not save_cover or cover_path is None:
pass
elif cover_path.exists() and not overwrite:
logger.debug(f'Cover already exists at "{cover_path}", skipping')
-20
View File
@@ -1,20 +0,0 @@
import logging
from termcolor import colored
class CustomFormatter(logging.Formatter):
basic_format = "[%(levelname)-8s %(asctime)s]"
formats = {
logging.DEBUG: colored(basic_format, "grey"),
logging.INFO: colored(basic_format, "green"),
logging.WARNING: colored(basic_format, "yellow"),
logging.ERROR: colored(basic_format, "red"),
logging.CRITICAL: colored(basic_format, "red"),
}
date_format = "%H:%M:%S"
def format(self, record: logging.LogRecord) -> str:
return logging.Formatter(
self.formats.get(record.levelno) + " %(message)s",
datefmt=self.date_format,
).format(record)
+24
View File
@@ -0,0 +1,24 @@
import logging
import colorama
from .utils import color_text
class CustomLoggerFormatter(logging.Formatter):
base_format = "[%(levelname)-8s %(asctime)s]"
format_colors = {
logging.DEBUG: colorama.Style.DIM,
logging.INFO: colorama.Fore.GREEN,
logging.WARNING: colorama.Fore.YELLOW,
logging.ERROR: colorama.Fore.RED,
logging.CRITICAL: colorama.Fore.RED,
}
date_format = "%H:%M:%S"
def format(self, record: logging.LogRecord) -> str:
return logging.Formatter(
color_text(self.base_format, self.format_colors.get(record.levelno))
+ " %(message)s",
datefmt=self.date_format,
).format(record)
+23 -12
View File
@@ -24,6 +24,7 @@ from .enums import CoverFormat, DownloadMode, RemuxMode
from .hardcoded_wvd import HARDCODED_WVD
from .itunes_api import ItunesApi
from .models import DownloadQueue, UrlInfo
from .utils import raise_response_exception
class Downloader:
@@ -419,7 +420,10 @@ class Downloader:
),
)
def get_cover_file_extension(self, cover_url: str) -> str:
def get_cover_file_extension(self, cover_url: str) -> str | None:
cover_bytes = self.get_url_response_bytes(cover_url)
if cover_bytes is None:
return None
image_obj = Image.open(io.BytesIO(self.get_url_response_bytes(cover_url)))
image_format = image_obj.format.lower()
return IMAGE_FILE_EXTENSION_MAP.get(image_format, f".{image_format}")
@@ -455,7 +459,12 @@ class Downloader:
@functools.lru_cache()
def get_url_response_bytes(url: str) -> bytes:
response = requests.get(url)
response.raise_for_status()
if response.status_code == 200:
return response.content
elif response.status_code == 404:
return None
else:
raise_response_exception(response)
return response.content
def apply_tags(
@@ -498,16 +507,18 @@ class Downloader:
"cover" not in self.exclude_tags_list
and self.cover_format != CoverFormat.RAW
):
mp4_tags["covr"] = [
MP4Cover(
self.get_url_response_bytes(cover_url),
imageformat=(
MP4Cover.FORMAT_JPEG
if self.cover_format == CoverFormat.JPG
else MP4Cover.FORMAT_PNG
),
)
]
cover_bytes = self.get_url_response_bytes(cover_url)
if cover_bytes is not None:
mp4_tags["covr"] = [
MP4Cover(
self.get_url_response_bytes(cover_url),
imageformat=(
MP4Cover.FORMAT_JPEG
if self.cover_format == CoverFormat.JPG
else MP4Cover.FORMAT_PNG
),
)
]
mp4 = MP4(path)
mp4.clear()
mp4.update(mp4_tags)
+3 -3
View File
@@ -4,8 +4,8 @@ import functools
import requests
from .apple_music_api import AppleMusicApi
from .constants import STOREFRONT_IDS
from .utils import raise_response_exception
class ItunesApi:
@@ -58,7 +58,7 @@ class ItunesApi:
requests.exceptions.JSONDecodeError,
AssertionError,
):
AppleMusicApi._raise_response_exception(response)
raise_response_exception(response)
return resource
def get_itunes_page(
@@ -81,5 +81,5 @@ class ItunesApi:
requests.exceptions.JSONDecodeError,
AssertionError,
):
AppleMusicApi._raise_response_exception(response)
raise_response_exception(response)
return itunes_page
+29
View File
@@ -0,0 +1,29 @@
from pathlib import Path
import click
import colorama
import requests
from .constants import X_NOT_FOUND_STRING
def color_text(text: str, color) -> str:
return color + text + colorama.Style.RESET_ALL
def raise_response_exception(response: requests.Response):
raise Exception(
f"Request failed with status code {response.status_code}: {response.text}"
)
def prompt_path(path_description: str, path_obj: Path) -> Path:
while not path_obj.exists():
path_obj_str = click.prompt(
X_NOT_FOUND_STRING.format(path_description, path_obj.absolute())
+ ". Move it to that location or drag and drop it here. Then, press enter to continue",
default=str(path_obj),
show_default=False,
)
path_obj = Path(path_obj_str.strip('"'))
return path_obj
+1 -2
View File
@@ -5,13 +5,12 @@ requires-python = ">=3.9"
authors = [{ name = "glomatico" }]
dependencies = [
"click",
"colorama",
"inquirerpy",
"m3u8",
"mutagen",
"pillow",
"pywidevine",
"pyyaml",
"termcolor",
"yt-dlp",
]
readme = "README.md"
+1
View File
@@ -1,4 +1,5 @@
click
colorama
inquirerpy
m3u8
mutagen