mirror of
https://github.com/glomatico/gamdl.git
synced 2026-06-13 12:15:18 +03:00
Compare commits
14 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 24de608bc8 | |||
| d0e2e08748 | |||
| 2223d36d5e | |||
| 3077456ab7 | |||
| bbd96cbe6b | |||
| ca16a208ba | |||
| c32c8622b7 | |||
| 132ae0ea56 | |||
| 70238facac | |||
| 4fb1fb609b | |||
| f97b3dba14 | |||
| 2da824ecbc | |||
| 24810da4b6 | |||
| f16a30549c |
+1
-1
@@ -1 +1 @@
|
||||
__version__ = "2.4"
|
||||
__version__ = "2.4.2"
|
||||
|
||||
+29
-11
@@ -9,6 +9,8 @@ from pathlib import Path
|
||||
|
||||
import requests
|
||||
|
||||
from .utils import raise_response_exception
|
||||
|
||||
|
||||
class AppleMusicApi:
|
||||
APPLE_MUSIC_HOMEPAGE_URL = "https://beta.music.apple.com"
|
||||
@@ -36,8 +38,13 @@ class AppleMusicApi:
|
||||
cookies = MozillaCookieJar(self.cookies_path)
|
||||
cookies.load(ignore_discard=True, ignore_expires=True)
|
||||
self.session.cookies.update(cookies)
|
||||
self.storefront = self.session.cookies.get_dict()["itua"]
|
||||
media_user_token = self.session.cookies.get_dict()["media-user-token"]
|
||||
media_user_token = self.session.cookies.get_dict().get("media-user-token")
|
||||
if not media_user_token:
|
||||
raise ValueError(
|
||||
"media-user-token not found in cookies. "
|
||||
"Make sure you're logged in to Apple Music, have an active subscription, and "
|
||||
"exported the cookies from the Apple Music homepage."
|
||||
)
|
||||
else:
|
||||
media_user_token = ""
|
||||
self.session.headers.update(
|
||||
@@ -68,12 +75,7 @@ class AppleMusicApi:
|
||||
token = re.search('(?=eyJh)(.*?)(?=")', index_js_page).group(1)
|
||||
self.session.headers.update({"authorization": f"Bearer {token}"})
|
||||
self.session.params = {"l": self.language}
|
||||
|
||||
@staticmethod
|
||||
def _raise_response_exception(response: requests.Response):
|
||||
raise Exception(
|
||||
f"Request failed with status code {response.status_code}: {response.text}"
|
||||
)
|
||||
self._set_storefront()
|
||||
|
||||
def _check_amp_api_response(self, response: requests.Response):
|
||||
try:
|
||||
@@ -85,7 +87,23 @@ class AppleMusicApi:
|
||||
requests.exceptions.JSONDecodeError,
|
||||
AssertionError,
|
||||
):
|
||||
self._raise_response_exception(response)
|
||||
raise_response_exception(response)
|
||||
|
||||
def _set_storefront(self):
|
||||
if self.cookies_path:
|
||||
self.storefront = (
|
||||
self.session.cookies.get_dict().get("itua")
|
||||
or self.get_user_storefront()["id"]
|
||||
)
|
||||
else:
|
||||
self.storefront = self.storefront or "us"
|
||||
|
||||
def get_user_storefront(
|
||||
self,
|
||||
) -> dict:
|
||||
response = self.session.get(f"{self.AMP_API_URL}/v1/me/storefront")
|
||||
self._check_amp_api_response(response)
|
||||
return response.json()["data"][0]
|
||||
|
||||
def get_artist(
|
||||
self,
|
||||
@@ -254,7 +272,7 @@ class AppleMusicApi:
|
||||
requests.exceptions.JSONDecodeError,
|
||||
AssertionError,
|
||||
):
|
||||
self._raise_response_exception(response)
|
||||
raise_response_exception(response)
|
||||
return webplayback[0]
|
||||
|
||||
def get_widevine_license(
|
||||
@@ -284,5 +302,5 @@ class AppleMusicApi:
|
||||
requests.exceptions.JSONDecodeError,
|
||||
AssertionError,
|
||||
):
|
||||
self._raise_response_exception(response)
|
||||
raise_response_exception(response)
|
||||
return widevine_license
|
||||
|
||||
+33
-30
@@ -7,12 +7,12 @@ from enum import Enum
|
||||
from pathlib import Path
|
||||
|
||||
import click
|
||||
from termcolor import colored
|
||||
import colorama
|
||||
|
||||
from . import __version__
|
||||
from .apple_music_api import AppleMusicApi
|
||||
from .constants import *
|
||||
from .custom_formatter import CustomFormatter
|
||||
from .custom_logger_formatter import CustomLoggerFormatter
|
||||
from .downloader import Downloader
|
||||
from .downloader_music_video import DownloaderMusicVideo
|
||||
from .downloader_post import DownloaderPost
|
||||
@@ -20,6 +20,7 @@ from .downloader_song import DownloaderSong
|
||||
from .downloader_song_legacy import DownloaderSongLegacy
|
||||
from .enums import CoverFormat, DownloadMode, MusicVideoCodec, PostQuality, RemuxMode
|
||||
from .itunes_api import ItunesApi
|
||||
from .utils import color_text, prompt_path
|
||||
|
||||
apple_music_api_sig = inspect.signature(AppleMusicApi.__init__)
|
||||
downloader_sig = inspect.signature(Downloader.__init__)
|
||||
@@ -350,20 +351,14 @@ def main(
|
||||
quality_post: PostQuality,
|
||||
no_config_file: bool,
|
||||
):
|
||||
colorama.just_fix_windows_console()
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(log_level)
|
||||
stream_handler = logging.StreamHandler()
|
||||
stream_handler.setFormatter(CustomFormatter())
|
||||
stream_handler.setFormatter(CustomLoggerFormatter())
|
||||
logger.addHandler(stream_handler)
|
||||
logger.info("Starting Gamdl")
|
||||
while not cookies_path.exists():
|
||||
cookies_path_str = click.prompt(
|
||||
X_NOT_FOUND_STRING.format("Cookies file", cookies_path.absolute())
|
||||
+ ". Move it to that location or drag and drop it here. Then, press enter to continue",
|
||||
default=str(cookies_path),
|
||||
show_default=False,
|
||||
)
|
||||
cookies_path = Path(cookies_path_str.strip('"'))
|
||||
prompt_path("Cookies file", cookies_path)
|
||||
apple_music_api = AppleMusicApi(
|
||||
cookies_path,
|
||||
language=language,
|
||||
@@ -415,9 +410,8 @@ def main(
|
||||
quality_post,
|
||||
)
|
||||
if not synced_lyrics_only:
|
||||
if wvd_path and not wvd_path.exists():
|
||||
logger.critical(X_NOT_FOUND_STRING.format(".wvd file", wvd_path))
|
||||
return
|
||||
if wvd_path:
|
||||
prompt_path(".wvd file", wvd_path)
|
||||
logger.debug("Setting up CDM")
|
||||
downloader.set_cdm()
|
||||
if not downloader.ffmpeg_path_full and (
|
||||
@@ -466,7 +460,7 @@ def main(
|
||||
_urls.extend(Path(url).read_text(encoding="utf-8").splitlines())
|
||||
urls = _urls
|
||||
for url_index, url in enumerate(urls, start=1):
|
||||
url_progress = colored(f"URL {url_index}/{len(urls)}", "grey")
|
||||
url_progress = color_text(f"URL {url_index}/{len(urls)}", colorama.Style.DIM)
|
||||
try:
|
||||
logger.info(f'({url_progress}) Checking "{url}"')
|
||||
url_info = downloader.get_url_info(url)
|
||||
@@ -482,9 +476,9 @@ def main(
|
||||
for download_index, track_metadata in enumerate(
|
||||
download_queue_tracks_metadata, start=1
|
||||
):
|
||||
queue_progress = colored(
|
||||
queue_progress = color_text(
|
||||
f"Track {download_index}/{len(download_queue_tracks_metadata)} from URL {url_index}/{len(urls)}",
|
||||
"grey",
|
||||
colorama.Style.DIM,
|
||||
)
|
||||
try:
|
||||
remuxed_path = None
|
||||
@@ -533,10 +527,13 @@ def main(
|
||||
)
|
||||
cover_url = downloader.get_cover_url(track_metadata)
|
||||
cover_file_extesion = downloader.get_cover_file_extension(cover_url)
|
||||
cover_path = downloader_song.get_cover_path(
|
||||
final_path,
|
||||
cover_file_extesion,
|
||||
)
|
||||
if cover_file_extesion:
|
||||
cover_path = downloader_song.get_cover_path(
|
||||
final_path,
|
||||
cover_file_extesion,
|
||||
)
|
||||
else:
|
||||
cover_path = None
|
||||
if synced_lyrics_only:
|
||||
pass
|
||||
elif final_path.exists() and not overwrite:
|
||||
@@ -655,10 +652,13 @@ def main(
|
||||
final_path = downloader.get_final_path(tags, ".m4v")
|
||||
cover_url = downloader.get_cover_url(track_metadata)
|
||||
cover_file_extesion = downloader.get_cover_file_extension(cover_url)
|
||||
cover_path = downloader_music_video.get_cover_path(
|
||||
final_path,
|
||||
cover_file_extesion,
|
||||
)
|
||||
if cover_file_extesion:
|
||||
cover_path = downloader_music_video.get_cover_path(
|
||||
final_path,
|
||||
cover_file_extesion,
|
||||
)
|
||||
else:
|
||||
cover_path = None
|
||||
if final_path.exists() and not overwrite:
|
||||
logger.warning(
|
||||
f'({queue_progress}) Music video already exists at "{final_path}", skipping'
|
||||
@@ -732,10 +732,13 @@ def main(
|
||||
final_path = downloader.get_final_path(tags, ".m4v")
|
||||
cover_url = downloader.get_cover_url(track_metadata)
|
||||
cover_file_extesion = downloader.get_cover_file_extension(cover_url)
|
||||
cover_path = downloader_music_video.get_cover_path(
|
||||
final_path,
|
||||
cover_file_extesion,
|
||||
)
|
||||
if cover_file_extesion:
|
||||
cover_path = downloader_music_video.get_cover_path(
|
||||
final_path,
|
||||
cover_file_extesion,
|
||||
)
|
||||
else:
|
||||
cover_path = None
|
||||
if final_path.exists() and not overwrite:
|
||||
logger.warning(
|
||||
f'({queue_progress}) Post video already exists at "{final_path}", skipping'
|
||||
@@ -746,7 +749,7 @@ def main(
|
||||
)
|
||||
logger.debug(f'Downloading to "{remuxed_path}"')
|
||||
downloader.download_ytdlp(remuxed_path, stream_url)
|
||||
if synced_lyrics_only or not save_cover:
|
||||
if synced_lyrics_only or not save_cover or cover_path is None:
|
||||
pass
|
||||
elif cover_path.exists() and not overwrite:
|
||||
logger.debug(f'Cover already exists at "{cover_path}", skipping')
|
||||
|
||||
@@ -1,20 +0,0 @@
|
||||
import logging
|
||||
from termcolor import colored
|
||||
|
||||
|
||||
class CustomFormatter(logging.Formatter):
|
||||
basic_format = "[%(levelname)-8s %(asctime)s]"
|
||||
formats = {
|
||||
logging.DEBUG: colored(basic_format, "grey"),
|
||||
logging.INFO: colored(basic_format, "green"),
|
||||
logging.WARNING: colored(basic_format, "yellow"),
|
||||
logging.ERROR: colored(basic_format, "red"),
|
||||
logging.CRITICAL: colored(basic_format, "red"),
|
||||
}
|
||||
date_format = "%H:%M:%S"
|
||||
|
||||
def format(self, record: logging.LogRecord) -> str:
|
||||
return logging.Formatter(
|
||||
self.formats.get(record.levelno) + " %(message)s",
|
||||
datefmt=self.date_format,
|
||||
).format(record)
|
||||
@@ -0,0 +1,24 @@
|
||||
import logging
|
||||
|
||||
import colorama
|
||||
|
||||
from .utils import color_text
|
||||
|
||||
|
||||
class CustomLoggerFormatter(logging.Formatter):
|
||||
base_format = "[%(levelname)-8s %(asctime)s]"
|
||||
format_colors = {
|
||||
logging.DEBUG: colorama.Style.DIM,
|
||||
logging.INFO: colorama.Fore.GREEN,
|
||||
logging.WARNING: colorama.Fore.YELLOW,
|
||||
logging.ERROR: colorama.Fore.RED,
|
||||
logging.CRITICAL: colorama.Fore.RED,
|
||||
}
|
||||
date_format = "%H:%M:%S"
|
||||
|
||||
def format(self, record: logging.LogRecord) -> str:
|
||||
return logging.Formatter(
|
||||
color_text(self.base_format, self.format_colors.get(record.levelno))
|
||||
+ " %(message)s",
|
||||
datefmt=self.date_format,
|
||||
).format(record)
|
||||
+23
-12
@@ -24,6 +24,7 @@ from .enums import CoverFormat, DownloadMode, RemuxMode
|
||||
from .hardcoded_wvd import HARDCODED_WVD
|
||||
from .itunes_api import ItunesApi
|
||||
from .models import DownloadQueue, UrlInfo
|
||||
from .utils import raise_response_exception
|
||||
|
||||
|
||||
class Downloader:
|
||||
@@ -419,7 +420,10 @@ class Downloader:
|
||||
),
|
||||
)
|
||||
|
||||
def get_cover_file_extension(self, cover_url: str) -> str:
|
||||
def get_cover_file_extension(self, cover_url: str) -> str | None:
|
||||
cover_bytes = self.get_url_response_bytes(cover_url)
|
||||
if cover_bytes is None:
|
||||
return None
|
||||
image_obj = Image.open(io.BytesIO(self.get_url_response_bytes(cover_url)))
|
||||
image_format = image_obj.format.lower()
|
||||
return IMAGE_FILE_EXTENSION_MAP.get(image_format, f".{image_format}")
|
||||
@@ -455,7 +459,12 @@ class Downloader:
|
||||
@functools.lru_cache()
|
||||
def get_url_response_bytes(url: str) -> bytes:
|
||||
response = requests.get(url)
|
||||
response.raise_for_status()
|
||||
if response.status_code == 200:
|
||||
return response.content
|
||||
elif response.status_code == 404:
|
||||
return None
|
||||
else:
|
||||
raise_response_exception(response)
|
||||
return response.content
|
||||
|
||||
def apply_tags(
|
||||
@@ -498,16 +507,18 @@ class Downloader:
|
||||
"cover" not in self.exclude_tags_list
|
||||
and self.cover_format != CoverFormat.RAW
|
||||
):
|
||||
mp4_tags["covr"] = [
|
||||
MP4Cover(
|
||||
self.get_url_response_bytes(cover_url),
|
||||
imageformat=(
|
||||
MP4Cover.FORMAT_JPEG
|
||||
if self.cover_format == CoverFormat.JPG
|
||||
else MP4Cover.FORMAT_PNG
|
||||
),
|
||||
)
|
||||
]
|
||||
cover_bytes = self.get_url_response_bytes(cover_url)
|
||||
if cover_bytes is not None:
|
||||
mp4_tags["covr"] = [
|
||||
MP4Cover(
|
||||
self.get_url_response_bytes(cover_url),
|
||||
imageformat=(
|
||||
MP4Cover.FORMAT_JPEG
|
||||
if self.cover_format == CoverFormat.JPG
|
||||
else MP4Cover.FORMAT_PNG
|
||||
),
|
||||
)
|
||||
]
|
||||
mp4 = MP4(path)
|
||||
mp4.clear()
|
||||
mp4.update(mp4_tags)
|
||||
|
||||
+3
-3
@@ -4,8 +4,8 @@ import functools
|
||||
|
||||
import requests
|
||||
|
||||
from .apple_music_api import AppleMusicApi
|
||||
from .constants import STOREFRONT_IDS
|
||||
from .utils import raise_response_exception
|
||||
|
||||
|
||||
class ItunesApi:
|
||||
@@ -58,7 +58,7 @@ class ItunesApi:
|
||||
requests.exceptions.JSONDecodeError,
|
||||
AssertionError,
|
||||
):
|
||||
AppleMusicApi._raise_response_exception(response)
|
||||
raise_response_exception(response)
|
||||
return resource
|
||||
|
||||
def get_itunes_page(
|
||||
@@ -81,5 +81,5 @@ class ItunesApi:
|
||||
requests.exceptions.JSONDecodeError,
|
||||
AssertionError,
|
||||
):
|
||||
AppleMusicApi._raise_response_exception(response)
|
||||
raise_response_exception(response)
|
||||
return itunes_page
|
||||
|
||||
@@ -0,0 +1,29 @@
|
||||
from pathlib import Path
|
||||
|
||||
import click
|
||||
import colorama
|
||||
import requests
|
||||
|
||||
from .constants import X_NOT_FOUND_STRING
|
||||
|
||||
|
||||
def color_text(text: str, color) -> str:
|
||||
return color + text + colorama.Style.RESET_ALL
|
||||
|
||||
|
||||
def raise_response_exception(response: requests.Response):
|
||||
raise Exception(
|
||||
f"Request failed with status code {response.status_code}: {response.text}"
|
||||
)
|
||||
|
||||
|
||||
def prompt_path(path_description: str, path_obj: Path) -> Path:
|
||||
while not path_obj.exists():
|
||||
path_obj_str = click.prompt(
|
||||
X_NOT_FOUND_STRING.format(path_description, path_obj.absolute())
|
||||
+ ". Move it to that location or drag and drop it here. Then, press enter to continue",
|
||||
default=str(path_obj),
|
||||
show_default=False,
|
||||
)
|
||||
path_obj = Path(path_obj_str.strip('"'))
|
||||
return path_obj
|
||||
+1
-2
@@ -5,13 +5,12 @@ requires-python = ">=3.9"
|
||||
authors = [{ name = "glomatico" }]
|
||||
dependencies = [
|
||||
"click",
|
||||
"colorama",
|
||||
"inquirerpy",
|
||||
"m3u8",
|
||||
"mutagen",
|
||||
"pillow",
|
||||
"pywidevine",
|
||||
"pyyaml",
|
||||
"termcolor",
|
||||
"yt-dlp",
|
||||
]
|
||||
readme = "README.md"
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
click
|
||||
colorama
|
||||
inquirerpy
|
||||
m3u8
|
||||
mutagen
|
||||
|
||||
Reference in New Issue
Block a user