Refactor cover image handling to interface layer

This commit is contained in:
Rafael Moraes
2026-01-03 15:08:35 -03:00
parent 6ed596ca42
commit 83ca91e91c
12 changed files with 133 additions and 105 deletions
+1 -1
View File
@@ -15,7 +15,6 @@ from ..downloader import (
AppleMusicMusicVideoDownloader,
AppleMusicSongDownloader,
AppleMusicUploadedVideoDownloader,
CoverFormat,
DownloadItem,
DownloadMode,
GamdlError,
@@ -32,6 +31,7 @@ from ..interface import (
SongCodec,
SyncedLyricsFormat,
UploadedVideoQuality,
CoverFormat,
)
from .config_file import ConfigFile
from .constants import X_NOT_IN_PATH
-4
View File
@@ -1,10 +1,6 @@
import re
DEFAULT_SONG_DECRYPTION_KEY = "32b8ade1769e26b1ffb8986352793fc6"
IMAGE_FILE_EXTENSION_MAP = {
"jpeg": ".jpg",
"tiff": ".tif",
}
TEMP_PATH_TEMPLATE = "gamdl_temp_{}"
ILLEGAL_CHARS_RE = r'[\\/:*?"<>|;]'
ILLEGAL_CHAR_REPLACEMENT = "_"
+1 -4
View File
@@ -486,10 +486,7 @@ class AppleMusicDownloader:
return
if download_item.cover_path and self.base_downloader.save_cover:
cover_url = self.base_downloader.get_cover_url(
download_item.cover_url_template,
)
cover_bytes = await self.base_downloader.get_cover_bytes(cover_url)
cover_bytes = await self.interface.get_cover_bytes(download_item.cover_url)
if cover_bytes and (
self.base_downloader.overwrite
or not Path(download_item.cover_path).exists()
+6 -80
View File
@@ -2,24 +2,17 @@ import asyncio
import re
import shutil
import uuid
from io import BytesIO
from pathlib import Path
from async_lru import alru_cache
from mutagen.mp4 import MP4, MP4Cover
from PIL import Image
from pywidevine import Cdm, Device
from yt_dlp import YoutubeDL
from ..interface.enums import CoverFormat
from ..interface.types import MediaTags, PlaylistTags
from ..utils import async_subprocess, get_response
from .constants import (
ILLEGAL_CHAR_REPLACEMENT,
ILLEGAL_CHARS_RE,
IMAGE_FILE_EXTENSION_MAP,
TEMP_PATH_TEMPLATE,
)
from .enums import CoverFormat, DownloadMode, RemuxMode
from ..utils import async_subprocess
from .constants import ILLEGAL_CHAR_REPLACEMENT, ILLEGAL_CHARS_RE, TEMP_PATH_TEMPLATE
from .enums import DownloadMode, RemuxMode
from .hardcoded_wvd import HARDCODED_WVD
@@ -112,22 +105,6 @@ class AppleMusicBaseDownloader:
) -> bool:
return bool(media_metadata["attributes"].get("playParams"))
async def get_cover_file_extension(self, cover_url_template: str) -> str | None:
if self.cover_format != CoverFormat.RAW:
return f".{self.cover_format.value}"
cover_url = self.get_cover_url(cover_url_template)
cover_bytes = await self.get_cover_bytes(cover_url)
if cover_bytes is None:
return None
image_obj = Image.open(BytesIO(self.get_cover_bytes(cover_url)))
image_format = image_obj.format.lower()
return IMAGE_FILE_EXTENSION_MAP.get(
image_format,
f".{image_format.lower()}",
)
def get_playlist_tags(
self,
playlist_metadata: dict,
@@ -160,13 +137,6 @@ class AppleMusicBaseDownloader:
/ (f"{media_id}_{file_tag}" + file_extension)
)
@alru_cache()
async def get_cover_bytes(self, cover_url: str) -> bytes | None:
response = await get_response(cover_url, {200, 404})
if response.status_code == 200:
return response.content
return None
def get_sanitized_string(self, dirty_string: str, is_folder: bool) -> str:
dirty_string = re.sub(
ILLEGAL_CHARS_RE,
@@ -225,45 +195,6 @@ class AppleMusicBaseDownloader:
)
)
def get_cover_url_template(self, metadata: dict) -> str:
if self.cover_format == CoverFormat.RAW:
return self._get_raw_cover_url(metadata["attributes"]["artwork"]["url"])
return metadata["attributes"]["artwork"]["url"]
def _get_raw_cover_url(self, cover_url_template: str) -> str:
return re.sub(
r"image/thumb/",
"",
re.sub(
r"is1-ssl",
"a1",
cover_url_template,
),
)
def get_cover_url(self, cover_url_template: str) -> str:
return self.format_cover_url(
cover_url_template,
self.cover_size,
self.cover_format.value,
)
def format_cover_url(
self,
cover_url_template: str,
cover_size: int,
cover_format: str,
) -> str:
return re.sub(
r"\{w\}x\{h\}([a-z]{2})\.jpg",
(
f"{cover_size}x{cover_size}bb.{cover_format}"
if self.cover_format != CoverFormat.RAW
else ""
),
cover_url_template,
)
async def download_stream(self, stream_url: str, download_path: str):
if self.download_mode == DownloadMode.YTDLP:
await self.download_ytdlp(stream_url, download_path)
@@ -319,7 +250,7 @@ class AppleMusicBaseDownloader:
self,
media_path: Path,
tags: MediaTags,
cover_url_template: str,
cover_bytes: bytes | None,
):
exclude_tags = self.exclude_tags or []
@@ -332,9 +263,6 @@ class AppleMusicBaseDownloader:
)
mp4_tags = filtered_tags.as_mp4_tags(self.date_tag_template)
cover_url = self.get_cover_url(cover_url_template)
cover_bytes = await self.get_cover_bytes(cover_url)
skip_tagging = "all" in exclude_tags
await asyncio.to_thread(
@@ -374,10 +302,8 @@ class AppleMusicBaseDownloader:
async def _apply_cover(
self,
mp4: MP4,
cover_url_template: str,
cover_bytes: bytes | None,
) -> None:
cover_url = self.get_cover_url(cover_url_template)
cover_bytes = await self.get_cover_bytes(cover_url)
if cover_bytes is None:
return
+12 -3
View File
@@ -201,11 +201,19 @@ class AppleMusicMusicVideoDownloader(AppleMusicBaseDownloader):
playlist_metadata,
)
download_item.cover_url_template = self.get_cover_url_template(
download_item.cover_url_template = self.interface.get_cover_url_template(
music_video_metadata,
self.cover_format,
)
cover_file_extension = await self.get_cover_file_extension(
download_item.cover_url = self.interface.get_cover_url(
download_item.cover_url_template,
self.cover_size,
self.cover_format,
)
cover_file_extension = await self.interface.get_cover_file_extension(
download_item.cover_url,
self.cover_format,
)
if cover_file_extension:
download_item.cover_path = self.get_cover_path(
@@ -263,8 +271,9 @@ class AppleMusicMusicVideoDownloader(AppleMusicBaseDownloader):
download_item.decryption_key,
)
cover_bytes = await self.interface.get_cover_bytes(download_item.cover_url)
await self.apply_tags(
download_item.staged_path,
download_item.media_tags,
download_item.cover_url_template,
cover_bytes,
)
+14 -4
View File
@@ -102,7 +102,15 @@ class AppleMusicSongDownloader(AppleMusicBaseDownloader):
else:
download_item.decryption_key = None
download_item.cover_url_template = self.get_cover_url_template(song_metadata)
download_item.cover_url_template = self.interface.get_cover_url_template(
song_metadata,
self.cover_format,
)
download_item.cover_url = self.interface.get_cover_url(
download_item.cover_url_template,
self.cover_size,
self.cover_format,
)
download_item.random_uuid = self.get_random_uuid()
if download_item.stream_info and download_item.stream_info.file_format:
@@ -115,8 +123,9 @@ class AppleMusicSongDownloader(AppleMusicBaseDownloader):
else:
download_item.staged_path = None
cover_file_extension = await self.get_cover_file_extension(
download_item.cover_url_template,
cover_file_extension = await self.interface.get_cover_file_extension(
download_item.cover_url,
self.cover_format,
)
if cover_file_extension:
download_item.cover_path = self.get_cover_path(
@@ -323,8 +332,9 @@ class AppleMusicSongDownloader(AppleMusicBaseDownloader):
download_item.stream_info.audio_track.fairplay_key,
)
cover_bytes = await self.interface.get_cover_bytes(download_item.cover_url)
await self.apply_tags(
download_item.staged_path,
download_item.media_tags,
download_item.cover_url_template,
cover_bytes,
)
+13 -3
View File
@@ -64,11 +64,19 @@ class AppleMusicUploadedVideoDownloader(AppleMusicBaseDownloader):
None,
)
download_item.cover_url_template = self.get_cover_url_template(
download_item.cover_url_template = self.interface.get_cover_url_template(
uploaded_video_metadata,
self.cover_format,
)
cover_file_extension = await self.get_cover_file_extension(
download_item.cover_url = self.interface.get_cover_url(
download_item.cover_url_template,
self.cover_size,
self.cover_format,
)
cover_file_extension = await self.interface.get_cover_file_extension(
download_item.cover_url,
self.cover_format,
)
if cover_file_extension:
download_item.cover_path = self.get_cover_path(
@@ -86,8 +94,10 @@ class AppleMusicUploadedVideoDownloader(AppleMusicBaseDownloader):
download_item.stream_info.video_track.stream_url,
download_item.staged_path,
)
cover_bytes = await self.interface.get_cover_bytes(download_item.cover_url)
await self.apply_tags(
download_item.staged_path,
download_item.media_tags,
download_item.cover_url_template,
cover_bytes,
)
-6
View File
@@ -11,12 +11,6 @@ class RemuxMode(Enum):
MP4BOX = "mp4box"
class CoverFormat(Enum):
JPG = "jpg"
PNG = "png"
RAW = "raw"
class RemuxFormatMusicVideo(Enum):
M4V = "m4v"
MP4 = "mp4"
+1
View File
@@ -21,6 +21,7 @@ class DownloadItem:
stream_info: StreamInfoAv = None
decryption_key: DecryptionKeyAv = None
cover_url_template: str = None
cover_url: str = None
staged_path: str = None
final_path: str = None
playlist_file_path: str = None
+5
View File
@@ -55,3 +55,8 @@ UPLOADED_VIDEO_QUALITY_RANK = [
"sd480pVideo",
"provisionalUploadVideo",
]
IMAGE_FILE_EXTENSION_MAP = {
"jpeg": ".jpg",
"tiff": ".tif",
}
+6
View File
@@ -87,3 +87,9 @@ class MusicVideoResolution(Enum):
class UploadedVideoQuality(Enum):
BEST = "best"
ASK = "ask"
class CoverFormat(Enum):
JPG = "jpg"
PNG = "png"
RAW = "raw"
+74
View File
@@ -2,12 +2,18 @@ import asyncio
import base64
import datetime
import logging
import re
from io import BytesIO
from async_lru import alru_cache
from PIL import Image
from pywidevine import PSSH, Cdm
from ..api.apple_music_api import AppleMusicApi
from ..api.itunes_api import ItunesApi
from ..utils import get_response
from .constants import IMAGE_FILE_EXTENSION_MAP
from .enums import CoverFormat
from .types import DecryptionKey
logger = logging.getLogger(__name__)
@@ -68,6 +74,74 @@ class AppleMusicInterface:
return decryption_key
def get_cover_url_template(self, metadata: dict, cover_format: CoverFormat) -> str:
if cover_format == CoverFormat.RAW:
cover_url_template = self._get_raw_cover_url(
metadata["attributes"]["artwork"]["url"]
)
cover_url_template = metadata["attributes"]["artwork"]["url"]
logger.debug(f"Cover URL template: {cover_url_template}")
return cover_url_template
def _get_raw_cover_url(self, cover_url_template: str) -> str:
return re.sub(
r"image/thumb/",
"",
re.sub(
r"is1-ssl",
"a1",
cover_url_template,
),
)
def get_cover_url(
self,
cover_url_template: str,
cover_size: int,
cover_format: CoverFormat,
) -> str:
cover_url = re.sub(
r"\{w\}x\{h\}([a-z]{2})\.jpg",
(
f"{cover_size}x{cover_size}bb.{cover_format.value}"
if cover_format != CoverFormat.RAW
else ""
),
cover_url_template,
)
logger.debug(f"Cover URL: {cover_url}")
return cover_url
@alru_cache()
async def get_cover_file_extension(
self,
cover_url: str,
cover_format: CoverFormat,
) -> str | None:
if cover_format != CoverFormat.RAW:
return f".{cover_format.value}"
cover_url = self.get_cover_url(cover_url)
cover_bytes = await self.get_cover_bytes(cover_url)
if cover_bytes is None:
return None
image_obj = Image.open(BytesIO(self.get_cover_bytes(cover_url)))
image_format = image_obj.format.lower()
return IMAGE_FILE_EXTENSION_MAP.get(
image_format,
f".{image_format.lower()}",
)
@alru_cache()
async def get_cover_bytes(self, cover_url: str) -> bytes | None:
response = await get_response(cover_url, {200, 404})
if response.status_code == 200:
return response.content
return None
@alru_cache()
async def get_media_date(
self,