Compare commits

..

15 Commits

Author SHA1 Message Date
Rafael Moraes e1f027dcb1 Bump version to 2.6.2 2025-08-31 14:09:15 -03:00
Rafael Moraes ba4e9576bc Improve error handling for missing media in downloader 2025-08-31 14:08:04 -03:00
Rafael Moraes 8c7ad61811 Fix URL parsing for encoded characters in downloader 2025-08-31 14:03:37 -03:00
Rafael Moraes e3d2cfa357 Raise exception if media file already exists 2025-08-31 13:56:53 -03:00
Rafael Moraes 3680afa017 Pass file path to MediaFileAlreadyExistsException 2025-08-31 13:56:06 -03:00
Rafael Moraes 9f93b0e791 Refactor exception classes for clarity and consistency 2025-08-31 13:55:59 -03:00
Rafael Moraes ce2bdc8d61 Refactor temp path handling in Downloader class 2025-08-31 13:49:05 -03:00
Rafael Moraes 30e498aeeb Fix type for MP4 date tag in MediaTags 2025-08-31 13:47:57 -03:00
Rafael Moraes 4d150c35a8 Bump version to 2.6.1 2025-08-31 12:12:28 -03:00
Rafael Moraes be8eeb80c9 Improve error message for failed URL processing 2025-08-31 12:12:14 -03:00
Rafael Moraes b17c31d416 Add override to cleanup_temp_path skip check 2025-08-31 12:10:32 -03:00
Rafael Moraes 42d10d555a Refactor error handling with custom media exceptions 2025-08-31 10:48:55 -03:00
Rafael Moraes 38d131a699 Remove redundant stremeable 2025-08-31 10:35:41 -03:00
Rafael Moraes 322cb7714e Fix playlist saving condition in downloader 2025-08-31 10:15:46 -03:00
Rafael Moraes 6383dd78c4 Handle non-str, non-datetime dates in MediaTags 2025-08-31 10:11:58 -03:00
8 changed files with 117 additions and 71 deletions
+1 -1
View File
@@ -5,4 +5,4 @@ from .downloader_post import DownloaderPost
from .downloader_song import DownloaderSong
from .itunes_api import ItunesApi
__version__ = "2.6"
__version__ = "2.6.2"
+17 -4
View File
@@ -28,6 +28,7 @@ from .enums import (
SongCodec,
SyncedLyricsFormat,
)
from .exceptions import *
from .itunes_api import ItunesApi
from .utils import color_text, prompt_path
@@ -550,22 +551,26 @@ def main(
for url_index, url in enumerate(urls, start=1):
url_progress = color_text(f"URL {url_index}/{len(urls)}", colorama.Style.DIM)
try:
logger.info(f'({url_progress}) Checking "{url}"')
logger.info(f'({url_progress}) Processing "{url}"')
url_info = downloader.parse_url_info(url)
if not url_info:
error_count += 1
logger.error(f"({url_progress}) Invalid URL, skipping")
continue
download_queue = downloader.get_download_queue(url_info)
download_queue_medias_metadata = download_queue.medias_metadata
if not download_queue_medias_metadata[0]:
if not download_queue:
error_count += 1
logger.error(f"({url_progress}) Media not found, skipping")
continue
download_queue_medias_metadata = download_queue.medias_metadata
except Exception as e:
error_count += 1
logger.error(
f'({url_progress}) Failed to check "{url}"',
f'({url_progress}) Failed to process URL "{url}", skipping',
exc_info=not no_exceptions,
)
continue
@@ -619,6 +624,14 @@ def main(
)
except KeyboardInterrupt:
exit(0)
except (
MediaNotStreamableException,
MediaFileAlreadyExistsException,
MediaFormatNotAvailableException,
) as e:
logger.warning(
f"({queue_progress}) {e}, skipping",
)
except Exception as e:
error_count += 1
logger.error(
+59 -26
View File
@@ -9,6 +9,7 @@ import re
import shutil
import subprocess
import typing
import urllib.parse
import uuid
from pathlib import Path
@@ -45,7 +46,7 @@ class Downloader:
r"("
r"/(?P<storefront>[a-z]{2})"
r"/(?P<type>artist|album|playlist|song|music-video|post)"
r"(?:/(?P<slug>[a-z0-9-]+))?"
r"(?:/(?P<slug>[^\s/]+))?"
r"/(?P<id>[0-9]+|pl\.[0-9a-z]{32}|pl\.u-[a-zA-Z0-9]{15})"
r"(?:\?i=(?P<sub_id>[0-9]+))?"
r")|("
@@ -130,7 +131,7 @@ class Downloader:
def _set_temp_path(self):
random_suffix = uuid.uuid4().hex[:8]
self.temp_path = self.temp_path / f"gamdl_temp_{random_suffix}"
self.temp_path_generated = self.temp_path / f"gamdl_temp_{random_suffix}"
def _set_exclude_tags(self):
self.exclude_tags = self.exclude_tags if self.exclude_tags is not None else []
@@ -161,6 +162,8 @@ class Downloader:
self.cdm = Cdm.from_device(Device.loads(HARDCODED_WVD))
def parse_url_info(self, url: str) -> UrlInfo | None:
url = urllib.parse.unquote(url)
url_regex_result = re.search(
self.VALID_URL_RE,
url,
@@ -184,39 +187,70 @@ class Downloader:
url_type: str,
id: str,
is_library: bool,
) -> DownloadQueue:
) -> DownloadQueue | None:
download_queue = DownloadQueue()
if url_type == "artist":
artist = self.apple_music_api.get_artist(id)
if artist is None:
return None
download_queue.medias_metadata = list(
self.get_download_queue_from_artist(artist)
)
elif url_type == "song":
download_queue.medias_metadata = [self.apple_music_api.get_song(id)]
elif url_type in {"album", "albums"}:
if url_type == "song":
song = self.apple_music_api.get_song(id)
if song is None:
return None
download_queue.medias_metadata = [song]
if url_type in {"album", "albums"}:
if is_library:
album = self.apple_music_api.get_library_album(id)
else:
album = self.apple_music_api.get_album(id)
if album is None:
return None
download_queue.medias_metadata = [
track for track in album["relationships"]["tracks"]["data"]
]
elif url_type == "playlist":
if url_type == "playlist":
if is_library:
playlist = self.apple_music_api.get_library_playlist(id)
download_queue.medias_metadata = [
track for track in playlist["relationships"]["tracks"]["data"]
]
else:
playlist = self.apple_music_api.get_playlist(id)
download_queue.medias_metadata = [
track for track in playlist["relationships"]["tracks"]["data"]
]
if playlist is None:
return None
download_queue.medias_metadata = [
track for track in playlist["relationships"]["tracks"]["data"]
]
download_queue.playlist_attributes = playlist["attributes"]
elif url_type == "music-video":
download_queue.medias_metadata = [self.apple_music_api.get_music_video(id)]
elif url_type == "post":
download_queue.medias_metadata = [self.apple_music_api.get_post(id)]
if url_type == "music-video":
music_video = self.apple_music_api.get_music_video(id)
if music_video is None:
return None
download_queue.medias_metadata = [music_video]
if url_type == "post":
post = self.apple_music_api.get_post(id)
if post is None:
return None
download_queue.medias_metadata = [post]
return download_queue
def get_download_queue_from_artist(
@@ -474,7 +508,7 @@ class Downloader:
tag: str,
file_extension: str,
):
temp_path = self.temp_path / (f"{media_id}_{tag}" + file_extension)
temp_path = self.temp_path_generated / (f"{media_id}_{tag}" + file_extension)
return temp_path
def get_final_path(
@@ -639,9 +673,12 @@ class Downloader:
encoding="utf8",
)
def cleanup_temp_path(self):
if self.temp_path.exists() and not self.skip_processing:
shutil.rmtree(self.temp_path)
def cleanup_temp_path(self, override_skip_processing_check: bool = False) -> None:
if self.skip_processing and not override_skip_processing_check:
return
if self.temp_path_generated.exists():
shutil.rmtree(self.temp_path_generated)
def _final_processing(
self,
@@ -705,11 +742,7 @@ class Downloader:
download_info.synced_lyrics_path,
download_info.lyrics.synced,
)
if (
download_info.playlist_tags
and self.save_playlist
and download_info.staged_path
):
if download_info.playlist_tags and self.save_playlist:
playlist_file_path = self.get_playlist_file_path(
download_info.playlist_tags
)
+4 -13
View File
@@ -18,6 +18,7 @@ from .enums import (
RemuxFormatMusicVideo,
RemuxMode,
)
from .exceptions import *
from .models import (
DecryptionKeyAv,
DownloadInfo,
@@ -487,11 +488,7 @@ class DownloaderMusicVideo:
colored_media_id = color_text(media_id, colorama.Style.DIM)
if not self.downloader.is_media_streamable(media_metadata):
logger.warning(
f"[{colored_media_id}] "
"Music Video is not streamable or downloadable, skipping"
)
return download_info
raise MediaNotStreamableException()
alt_media_id = self.get_music_video_id_alt(media_metadata) or media_id
download_info.alt_media_id = alt_media_id
@@ -519,10 +516,7 @@ class DownloaderMusicVideo:
logger.debug(f"[{colored_media_id}] Getting stream info")
stream_info = self.get_stream_info_from_webplayback(webplayback)
if not stream_info:
logger.warning(
f"[{colored_media_id}] Video/Audio stream with the selected codec(s) not found, skipping"
)
return download_info
raise MediaFormatNotAvailableException()
download_info.stream_info = stream_info
final_path = self.downloader.get_final_path(
@@ -543,10 +537,7 @@ class DownloaderMusicVideo:
download_info.cover_path = cover_path
if final_path.exists() and not self.downloader.overwrite:
logger.warning(
f'[{colored_media_id}] Music Video already exists at "{final_path}", skipping'
)
return download_info
raise MediaFileAlreadyExistsException(final_path)
logger.debug(f"[{colored_media_id}] Getting decryption key")
decryption_key = self.get_decryption_key(
+5 -5
View File
@@ -9,6 +9,7 @@ from InquirerPy.base.control import Choice
from .downloader import Downloader
from .enums import PostQuality
from .exceptions import MediaFileAlreadyExistsException, MediaNotStreamableException
from .models import DownloadInfo, MediaTags
from .utils import color_text
@@ -121,11 +122,7 @@ class DownloaderPost:
colored_media_id = color_text(media_id, colorama.Style.DIM)
if not self.downloader.is_media_streamable(media_metadata):
logger.warning(
f"[{colored_media_id}] "
"Post Video is not streamable or downloadable, skipping"
)
return download_info
raise MediaNotStreamableException()
tags = self.get_tags(media_metadata)
final_path = self.downloader.get_final_path(
@@ -136,6 +133,9 @@ class DownloaderPost:
download_info.tags = tags
download_info.final_path = final_path
if final_path.exists() and not self.downloader.overwrite:
raise MediaFileAlreadyExistsException(final_path)
cover_url = self.downloader.get_cover_url(media_metadata)
cover_format = self.downloader.get_cover_format(cover_url)
if cover_format and self.downloader.save_cover:
+4 -21
View File
@@ -19,6 +19,7 @@ from pywidevine.license_protocol_pb2 import WidevinePsshData
from .downloader import Downloader
from .enums import MediaFileFormat, RemuxMode, SongCodec, SyncedLyricsFormat
from .exceptions import *
from .models import (
DecryptionKey,
DecryptionKeyAv,
@@ -643,18 +644,7 @@ class DownloaderSong:
colored_media_id = color_text(media_id, colorama.Style.DIM)
if not self.downloader.is_media_streamable(media_metadata):
logger.warning(
f"[{colored_media_id}] "
"Song is not streamable or downloadable, skipping"
)
return download_info
if not self.downloader.is_media_streamable(media_metadata):
logger.warning(
f"[{colored_media_id}] "
"Track is not streamable or downloadable, skipping"
)
return download_info
raise MediaNotStreamableException()
logger.debug(f"[{colored_media_id}] Getting lyrics")
lyrics = self.get_lyrics(media_metadata)
@@ -695,10 +685,7 @@ class DownloaderSong:
download_info.cover_path = cover_path
if final_path.exists() and not self.downloader.overwrite:
logger.warning(
f'[{colored_media_id}] Song already exists at "{final_path}", skipping'
)
return download_info
raise MediaFileAlreadyExistsException(final_path)
logger.debug(f"[{colored_media_id}] Getting stream info")
if self.codec.is_legacy():
@@ -713,11 +700,7 @@ class DownloaderSong:
else:
stream_info = self.get_stream_info(media_metadata)
if not stream_info or not stream_info.audio_track.widevine_pssh:
logger.warning(
f"[{colored_media_id}] Song is not downloadable or is not "
"available in the selected codec, skipping",
)
return download_info
raise MediaFormatNotAvailableException()
logger.debug(f"[{colored_media_id}] Getting decryption key")
decryption_key = self.get_decryption_key(
stream_info,
+24
View File
@@ -0,0 +1,24 @@
from __future__ import annotations
from pathlib import Path
class MediaNotStreamableException(Exception):
DEFAULT_MESSAGE = "Media is not streamable"
def __init__(self):
super().__init__(self.DEFAULT_MESSAGE)
class MediaFileAlreadyExistsException(Exception):
DEFAULT_MESSAGE = "Media file already exists at '{media_path}'"
def __init__(self, media_path: Path):
super().__init__(self.DEFAULT_MESSAGE.format(media_path=media_path))
class MediaFormatNotAvailableException(Exception):
DEFAULT_MESSAGE = "Requested media format or codec not available"
def __init__(self):
super().__init__(self.DEFAULT_MESSAGE)
+3 -1
View File
@@ -118,6 +118,8 @@ class MediaTags:
date_mp4 = self.date.strftime(date_format)
elif isinstance(self.date, str):
date_mp4 = self.date
else:
date_mp4 = None
mp4_tags = {
"\xa9alb": [self.album],
@@ -133,7 +135,7 @@ class MediaTags:
"cmID": [self.composer_id],
"soco": [self.composer_sort],
"cprt": [self.copyright],
"\xa9day": date_mp4,
"\xa9day": [date_mp4],
"disk": disc_mp4,
"pgap": [bool(self.gapless) if self.gapless is not None else None],
"\xa9gen": [self.genre],