Refactor iTunes API client

This commit is contained in:
Rafael Moraes
2026-04-13 22:25:09 -03:00
parent eba97c8344
commit c5e001fda5
2 changed files with 138 additions and 86 deletions
+138
View File
@@ -0,0 +1,138 @@
import re
import httpx
import structlog
from .constants import (
APPLE_MUSIC_MUSIC_KIT_URL,
ITUNES_LOOKUP_API_URL,
ITUNES_PAGE_API_URL,
)
from .exceptions import GamdlApiResponseError
logger = structlog.get_logger(__name__)
class ItunesApi:
def __init__(
self,
client: httpx.AsyncClient,
) -> None:
self.client = client
@staticmethod
async def get_storefront_id(storefront: str) -> int:
log = logger.bind(action="get_storefront_id", storefront=storefront)
async with httpx.AsyncClient() as client:
try:
response = await client.get(APPLE_MUSIC_MUSIC_KIT_URL)
response.raise_for_status()
music_kit_content = response.text
except httpx.HTTPError:
raise GamdlApiResponseError(
"Error fetching MusicKit content",
status_code=response.status_code,
)
normalized_storefront = storefront.upper()
country_code_pattern = f'{normalized_storefront}:"([A-Z]{{3}})"'
country_code_match = re.search(country_code_pattern, music_kit_content)
if not country_code_match:
raise GamdlApiResponseError(
f"Country code {storefront} not found in MusicKit content"
)
three_letter_code = country_code_match.group(1)
storefront_pattern = f'{three_letter_code}:"(\\d+)"'
storefront_match = re.search(storefront_pattern, music_kit_content)
if not storefront_match:
raise GamdlApiResponseError(
f"Storefront ID not found for country code {storefront}"
)
storefront_id = int(storefront_match.group(1))
log.debug("Extracted storefront", storefront_id=storefront_id)
return storefront_id
@classmethod
async def create(
cls,
storefront: str = "us",
storefront_id: int | None = 143441,
language: str = "en-US",
) -> "ItunesApi":
storefront_id = storefront_id or await cls.get_storefront_id(storefront)
client = httpx.AsyncClient(
params={
"country": storefront,
"lang": language,
},
headers={
"X-Apple-Store-Front": f"{storefront_id}-1,32 t:music31",
},
timeout=60.0,
)
return cls(client=client)
async def get_lookup_result(
self,
media_id: str,
entity: str = "album",
) -> dict:
log = logger.bind(action="get_lookup_result", media_id=media_id, entity=entity)
try:
response = await self.client.get(
ITUNES_LOOKUP_API_URL,
params={
"id": media_id,
"entity": entity,
},
)
response.raise_for_status()
lookup_result = response.json()
except httpx.HTTPError:
raise GamdlApiResponseError(
"Error fetching iTunes lookup result",
content=response.text,
status_code=response.status_code,
)
log.debug("Received lookup result", lookup_result=lookup_result)
return lookup_result
async def get_itunes_page(
self,
media_type: str,
media_id: str,
) -> dict:
log = logger.bind(
action="get_itunes_page",
media_type=media_type,
media_id=media_id,
)
try:
response = await self.client.get(
ITUNES_PAGE_API_URL.format(media_type=media_type, media_id=media_id)
)
response.raise_for_status()
itunes_page = response.json()
except httpx.HTTPError:
raise GamdlApiResponseError(
"Error fetching iTunes page",
content=response.text,
status_code=response.status_code,
)
log.debug("Received iTunes page", itunes_page=itunes_page)
return itunes_page
-86
View File
@@ -1,86 +0,0 @@
import logging
import httpx
from ..utils import safe_json
from .constants import ITUNES_LOOKUP_API_URL, ITUNES_PAGE_API_URL, STOREFRONT_IDS
from .exceptions import ApiError
logger = logging.getLogger(__name__)
class ItunesApi:
def __init__(
self,
storefront: str = "us",
language: str = "en-US",
) -> None:
self.storefront = storefront
self.language = language
self.initialize()
def initialize(self) -> None:
self._initialize_storefront_id()
self._initialize_client()
def _initialize_storefront_id(self) -> None:
try:
self.storefront_id = STOREFRONT_IDS[self.storefront.upper()]
except KeyError:
raise Exception(f"No storefront id for {self.storefront}")
def _initialize_client(self) -> None:
self.client = httpx.AsyncClient(
params={
"country": self.storefront,
"lang": self.language,
},
headers={
"X-Apple-Store-Front": f"{self.storefront_id} t:music31",
},
timeout=60.0,
)
async def get_lookup_result(
self,
media_id: str,
entity: str = "album",
) -> dict:
response = await self.client.get(
ITUNES_LOOKUP_API_URL,
params={
"id": media_id,
"entity": entity,
},
)
lookup_result = safe_json(response)
if response.status_code != 200 or lookup_result is None:
raise ApiError(
message=response.text,
status_code=response.status_code,
)
logger.debug(f"Lookup result: {lookup_result}")
return lookup_result
async def get_itunes_page(
self,
media_type: str,
media_id: str,
) -> dict:
response = await self.client.get(
f"{ITUNES_PAGE_API_URL}/{media_type}/{media_id}"
)
itunes_page = safe_json(response)
if response.status_code != 200 or itunes_page is None:
raise ApiError(
message=response.text,
status_code=response.status_code,
)
logger.debug(f"iTunes page: {itunes_page}")
return itunes_page