Refactor API modules and migrate to async httpx

This commit is contained in:
Rafael Moraes
2025-10-18 17:10:10 -03:00
parent 97703f6512
commit 5c8e47fc76
6 changed files with 695 additions and 497 deletions
+3
View File
@@ -0,0 +1,3 @@
from .apple_music_api import AppleMusicApi
from .itunes_api import ItunesApi
from .constants import *
+446
View File
@@ -0,0 +1,446 @@
import logging
import re
import typing
from http.cookiejar import MozillaCookieJar
from urllib.parse import parse_qs, urlparse
import httpx
from ..utils import raise_for_status, safe_json
from .constants import (
AMP_API_URL,
APPLE_MUSIC_COOKIE_DOMAIN,
APPLE_MUSIC_HOMEPAGE_URL,
LICENSE_API_URL,
WEBPLAYBACK_API_URL,
)
logger = logging.getLogger(__name__)
class AppleMusicApi:
def __init__(
self,
storefront: str = "us",
media_user_token: str | None = None,
language: str = "en-US",
) -> None:
self.storefront = storefront
self.media_user_token = media_user_token
self.language = language
@classmethod
def from_netscape_cookies(
cls,
cookies_path: str = "./cookies.txt",
language: str = "en-US",
) -> "AppleMusicApi":
cookies = MozillaCookieJar(cookies_path)
cookies.load(ignore_discard=True, ignore_expires=True)
parse_cookie = lambda name: next(
(
cookie.value
for cookie in cookies
if cookie.name == name and cookie.domain == APPLE_MUSIC_COOKIE_DOMAIN
),
None,
)
media_user_token = parse_cookie("media-user-token")
if not media_user_token:
raise ValueError(
'"media-user-token" cookie not found in cookies. '
"Make sure you have exported the cookies from the Apple Music webpage "
"and are logged in with an active subscription."
)
return cls(
storefront=None,
media_user_token=media_user_token,
language=language,
)
async def setup(self) -> None:
await self._setup_client()
await self._setup_token()
await self._setup_account_info()
async def _setup_client(self) -> None:
self.client = httpx.AsyncClient(
headers={
"accept": "*/*",
"accept-language": "en-US",
"origin": APPLE_MUSIC_HOMEPAGE_URL,
"priority": "u=1, i",
"referer": APPLE_MUSIC_HOMEPAGE_URL,
"sec-ch-ua": '"Google Chrome";v="137", "Chromium";v="137", "Not/A)Brand";v="24"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Windows"',
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-site",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36",
},
params={
"l": self.language,
},
follow_redirects=True,
)
async def _setup_token(self) -> None:
response = await self.client.get(APPLE_MUSIC_HOMEPAGE_URL)
raise_for_status(response)
home_page = response.text
index_js_uri_match = re.search(
r"/(assets/index-legacy[~-][^/\"]+\.js)",
home_page,
)
if not index_js_uri_match:
raise Exception("index.js URI not found in Apple Music homepage")
index_js_uri = index_js_uri_match.group(1)
response = await self.client.get(f"{APPLE_MUSIC_HOMEPAGE_URL}/{index_js_uri}")
raise_for_status(response)
index_js_page = response.text
token_match = re.search('(?=eyJh)(.*?)(?=")', index_js_page)
if not token_match:
raise Exception("Token not found in index.js page")
token = token_match.group(1)
logger.debug(f"Token: {token}")
self.client.headers.update({"authorization": f"Bearer {token}"})
async def _setup_account_info(self) -> None:
if not self.media_user_token:
return
self.client.cookies.update(
{
"media-user-token": self.media_user_token,
}
)
self.account_info = await self.get_account_info()
self.storefront = self.account_info["meta"]["subscription"]["storefront"]
async def get_account_info(self, meta: str | None = "subscription") -> dict:
response = await self.client.get(
f"{AMP_API_URL}/v1/me/account",
params={
**({"meta": meta} if meta else {}),
},
)
raise_for_status(response)
account_info = safe_json(response)
if not "data" in account_info or (meta and "meta" not in account_info):
raise Exception("Error getting account info:", response.text)
logger.debug(f"Account info: {account_info}")
return account_info
async def get_song(
self,
song_id: str,
extend: str = "extendedAssetUrls",
include: str = "lyrics,albums",
) -> dict | None:
response = await self.client.get(
f"{AMP_API_URL}/v1/catalog/{self.storefront}/songs/{song_id}",
params={
"extend": extend,
"include": include,
},
)
raise_for_status(response, {200, 404})
if response.status_code == 404:
return None
song = safe_json(response)
if not "data" in song:
raise Exception("Error getting song:", response.text)
logger.debug(f"Song: {song}")
return song
async def get_music_video(
self,
music_video_id: str,
include: str = "albums",
) -> dict | None:
response = await self.client.get(
f"{AMP_API_URL}/v1/catalog/{self.storefront}/music-videos/{music_video_id}",
params={
"include": include,
},
)
raise_for_status(response, {200, 404})
if response.status_code == 404:
return None
music_video = safe_json(response)
if not "data" in music_video:
raise Exception("Error getting music video:", response.text)
logger.debug(f"Music video: {music_video}")
return music_video
async def get_uploaded_video(
self,
post_id: str,
) -> dict | None:
response = await self.client.get(
f"{AMP_API_URL}/v1/catalog/{self.storefront}/uploaded-videos/{post_id}"
)
raise_for_status(response, {200, 404})
if response.status_code == 404:
return None
uploaded_video = safe_json(response)
if not "data" in uploaded_video:
raise Exception("Error getting uploaded video:", response.text)
logger.debug(f"Uploaded video: {uploaded_video}")
return uploaded_video
async def get_album(
self,
album_id: str,
extend: str = "extendedAssetUrls",
) -> dict | None:
response = await self.client.get(
f"{AMP_API_URL}/v1/catalog/{self.storefront}/albums/{album_id}",
params={
"extend": extend,
},
)
raise_for_status(response, {200, 404})
if response.status_code == 404:
return None
album = safe_json(response)
if not "data" in album:
raise Exception("Error getting album:", response.text)
logger.debug(f"Album: {album}")
return album
async def get_playlist(
self,
playlist_id: str,
limit_tracks: int = 300,
extend: str = "extendedAssetUrls",
) -> dict | None:
response = await self.client.get(
f"{AMP_API_URL}/v1/catalog/{self.storefront}/playlists/{playlist_id}",
params={
"limit[tracks]": limit_tracks,
"extend": extend,
},
)
raise_for_status(response, {200, 404})
if response.status_code == 404:
return None
playlist = safe_json(response)
if not "data" in playlist:
raise Exception("Error getting playlist:", response.text)
logger.debug(f"Playlist: {playlist}")
return playlist
async def get_artist(
self,
artist_id: str,
include: str = "albums,music-videos",
limit: int = 100,
) -> dict | None:
response = await self.client.get(
f"{AMP_API_URL}/v1/catalog/{self.storefront}/artists/{artist_id}",
params={
"include": include,
**{f"limit[{_include}]": limit for _include in include.split(",")},
},
)
raise_for_status(response, {200, 404})
if response.status_code == 404:
return None
artist = safe_json(response)
if not "data" in artist:
raise Exception("Error getting artist:", response.text)
logger.debug(f"Artist: {artist}")
return artist
async def get_library_album(
self,
album_id: str,
extend: str = "extendedAssetUrls",
) -> dict | None:
response = await self.client.get(
f"{AMP_API_URL}/v1/me/library/albums/{album_id}",
params={
"extend": extend,
},
)
raise_for_status(response, {200, 404})
if response.status_code == 404:
return None
album = safe_json(response)
if not "data" in album:
raise Exception("Error getting library album:", response.text)
logger.debug(f"Library album: {album}")
return album
async def get_library_playlist(
self,
playlist_id: str,
include: str = "tracks",
limit: int = 100,
extend: str = "extendedAssetUrls",
) -> dict | None:
response = await self.client.get(
f"{AMP_API_URL}/v1/me/library/playlists/{playlist_id}",
params={
"include": include,
**{f"limit[{_include}]": limit for _include in include.split(",")},
"extend": extend,
},
)
raise_for_status(response, {200, 404})
if response.status_code == 404:
return None
playlist = safe_json(response)
if not "data" in playlist:
raise Exception("Error getting library playlist:", response.text)
return playlist
async def get_search_results(
self,
term: str,
types: str = "songs,music-videos,albums,playlists,artists",
limit: int = 50,
offset: int = 0,
) -> dict:
response = await self.client.get(
f"{AMP_API_URL}/v1/catalog/{self.storefront}/search",
params={
"term": term,
"types": types,
"limit": limit,
"offset": offset,
},
)
raise_for_status(response)
search_results = safe_json(response)
if not "results" in search_results:
raise Exception("Error searching:", response.text)
logger.debug(f"Search results: {search_results}")
return search_results
async def extend_api_data(
self,
api_response: dict,
extend: str,
) -> typing.AsyncGenerator[dict, None]:
next_uri = api_response.get("next")
if not next_uri:
return
next_uri_params = parse_qs(urlparse(next_uri).query)
limit = int(next_uri_params["offset"][0])
while next_uri:
extended_api_data = await self._get_extended_api_data(
next_uri,
limit,
extend,
)
yield extended_api_data
next_uri = extended_api_data.get("next")
async def _get_extended_api_data(
self,
next_uri: str,
limit: int,
extend: str,
) -> dict:
response = await self.client.get(
AMP_API_URL + next_uri,
params={
"limit": limit,
"extend": extend,
**parse_qs(urlparse(next_uri).query),
},
)
raise_for_status(response)
extended_api_data = safe_json(response)
if not "data" in extended_api_data:
raise Exception("Error getting extended API data:", response.text)
logger.debug(f"Extended API data: {extended_api_data}")
return extended_api_data
async def get_webplayback(
self,
track_id: str,
) -> dict:
response = await self.client.post(
WEBPLAYBACK_API_URL,
json={
"salableAdamId": track_id,
"language": self.language,
},
)
raise_for_status(response)
webplayback = safe_json(response)
if not "songList" in webplayback:
raise Exception("Error getting webplayback:", response.text)
logger.debug(f"Webplayback: {webplayback}")
return webplayback
async def get_license_exchange(
self,
track_id: str,
track_uri: str,
challenge: str,
key_system: str = "com.widevine.alpha",
) -> dict:
response = await self.client.post(
LICENSE_API_URL,
json={
"challenge": challenge,
"key-system": key_system,
"uri": track_uri,
"adamId": track_id,
"isLibrary": False,
"user-initiated": True,
},
)
raise_for_status(response)
license_exchange = safe_json(response)
if not "license" in license_exchange:
raise Exception("Error getting license exchange:", response.text)
logger.debug(f"License exchange: {license_exchange}")
return license_exchange
+169
View File
@@ -0,0 +1,169 @@
APPLE_MUSIC_HOMEPAGE_URL = "https://music.apple.com"
APPLE_MUSIC_COOKIE_DOMAIN = ".music.apple.com"
AMP_API_URL = "https://amp-api.music.apple.com"
WEBPLAYBACK_API_URL = (
"https://play.itunes.apple.com/WebObjects/MZPlay.woa/wa/webPlayback"
)
LICENSE_API_URL = (
"https://play.itunes.apple.com/WebObjects/MZPlay.woa/wa/acquireWebPlaybackLicense"
)
ITUNES_LOOKUP_API_URL = "https://itunes.apple.com/lookup"
ITUNES_PAGE_API_URL = "https://music.apple.com"
STOREFRONT_IDS = {
"AE": "143481-2,32",
"AG": "143540-2,32",
"AI": "143538-2,32",
"AL": "143575-2,32",
"AM": "143524-2,32",
"AO": "143564-2,32",
"AR": "143505-28,32",
"AT": "143445-4,32",
"AU": "143460-27,32",
"AZ": "143568-2,32",
"BB": "143541-2,32",
"BE": "143446-2,32",
"BF": "143578-2,32",
"BG": "143526-2,32",
"BH": "143559-2,32",
"BJ": "143576-2,32",
"BM": "143542-2,32",
"BN": "143560-2,32",
"BO": "143556-28,32",
"BR": "143503-15,32",
"BS": "143539-2,32",
"BT": "143577-2,32",
"BW": "143525-2,32",
"BY": "143565-2,32",
"BZ": "143555-2,32",
"CA": "143455-6,32",
"CG": "143582-2,32",
"CH": "143459-57,32",
"CL": "143483-28,32",
"CN": "143465-19,32",
"CO": "143501-28,32",
"CR": "143495-28,32",
"CV": "143580-2,32",
"CY": "143557-2,32",
"CZ": "143489-2,32",
"DE": "143443-4,32",
"DK": "143458-2,32",
"DM": "143545-2,32",
"DO": "143508-28,32",
"DZ": "143563-2,32",
"EC": "143509-28,32",
"EE": "143518-2,32",
"EG": "143516-2,32",
"ES": "143454-8,32",
"FI": "143447-2,32",
"FJ": "143583-2,32",
"FM": "143591-2,32",
"FR": "143442-3,32",
"GB": "143444-2,32",
"GD": "143546-2,32",
"GH": "143573-2,32",
"GM": "143584-2,32",
"GR": "143448-2,32",
"GT": "143504-28,32",
"GW": "143585-2,32",
"GY": "143553-2,32",
"HK": "143463-45,32",
"HN": "143510-28,32",
"HR": "143494-2,32",
"HU": "143482-2,32",
"ID": "143476-2,32",
"IE": "143449-2,32",
"IL": "143491-2,32",
"IN": "143467-2,32",
"IS": "143558-2,32",
"IT": "143450-7,32",
"JM": "143511-2,32",
"JO": "143528-2,32",
"JP": "143462-9,32",
"KE": "143529-2,32",
"KG": "143586-2,32",
"KH": "143579-2,32",
"KN": "143548-2,32",
"KR": "143466-13,32",
"KW": "143493-2,32",
"KY": "143544-2,32",
"KZ": "143517-2,32",
"LA": "143587-2,32",
"LB": "143497-2,32",
"LC": "143549-2,32",
"LK": "143486-2,32",
"LR": "143588-2,32",
"LT": "143520-2,32",
"LU": "143451-2,32",
"LV": "143519-2,32",
"MD": "143523-2,32",
"MG": "143531-2,32",
"MK": "143530-2,32",
"ML": "143532-2,32",
"MN": "143592-2,32",
"MO": "143515-45,32",
"MR": "143590-2,32",
"MS": "143547-2,32",
"MT": "143521-2,32",
"MU": "143533-2,32",
"MW": "143589-2,32",
"MX": "143468-28,32",
"MY": "143473-2,32",
"MZ": "143593-2,32",
"NA": "143594-2,32",
"NE": "143534-2,32",
"NG": "143561-2,32",
"NI": "143512-28,32",
"NL": "143452-10,32",
"NO": "143457-2,32",
"NP": "143484-2,32",
"NZ": "143461-27,32",
"OM": "143562-2,32",
"PA": "143485-28,32",
"PE": "143507-28,32",
"PG": "143597-2,32",
"PH": "143474-2,32",
"PK": "143477-2,32",
"PL": "143478-2,32",
"PT": "143453-24,32",
"PW": "143595-2,32",
"PY": "143513-28,32",
"QA": "143498-2,32",
"RO": "143487-2,32",
"RU": "143469-16,32",
"SA": "143479-2,32",
"SB": "143601-2,32",
"SC": "143599-2,32",
"SE": "143456-17,32",
"SG": "143464-19,32",
"SI": "143499-2,32",
"SK": "143496-2,32",
"SL": "143600-2,32",
"SN": "143535-2,32",
"SR": "143554-2,32",
"ST": "143598-2,32",
"SV": "143506-28,32",
"SZ": "143602-2,32",
"TC": "143552-2,32",
"TD": "143581-2,32",
"TH": "143475-2,32",
"TJ": "143603-2,32",
"TM": "143604-2,32",
"TN": "143536-2,32",
"TR": "143480-2,32",
"TT": "143551-2,32",
"TW": "143470-18,32",
"TZ": "143572-2,32",
"UA": "143492-2,32",
"UG": "143537-2,32",
"US": "143441-1,32",
"UY": "143514-2,32",
"UZ": "143566-2,32",
"VC": "143550-2,32",
"VE": "143502-28,32",
"VG": "143543-2,32",
"VN": "143471-2,32",
"YE": "143571-2,32",
"ZA": "143472-2,32",
"ZW": "143605-2,32",
}
+77
View File
@@ -0,0 +1,77 @@
import logging
import httpx
from ..utils import raise_for_status, safe_json
from .constants import ITUNES_LOOKUP_API_URL, ITUNES_PAGE_API_URL, STOREFRONT_IDS
logger = logging.getLogger(__name__)
class ItunesApi:
def __init__(
self,
storefront: str = "us",
language: str = "en-US",
) -> None:
self.storefront = storefront
self.language = language
async def setup(self) -> None:
self._setup_storefront_id()
await self._setup_session()
def _setup_storefront_id(self) -> None:
try:
self.storefront_id = STOREFRONT_IDS[self.storefront.upper()]
except KeyError:
raise Exception(f"No storefront id for {self.storefront}")
async def _setup_session(self) -> None:
self.client = httpx.AsyncClient(
params={
"country": self.storefront,
"lang": self.language,
},
headers={
"X-Apple-Store-Front": f"{self.storefront_id} t:music31",
},
)
async def get_lookup_result(
self,
media_id: str,
entity: str = "album",
) -> dict:
response = await self.client.get(
ITUNES_LOOKUP_API_URL,
params={
"id": media_id,
"entity": entity,
},
)
raise_for_status(response)
lookup_result = safe_json(response)
if "results" not in lookup_result:
raise Exception("Error getting lookup result:", response.text)
logger.debug(f"Lookup result: {lookup_result}")
return lookup_result
async def get_itunes_page(
self,
media_type: str,
media_id: str,
) -> dict:
response = await self.client.get(
f"{ITUNES_PAGE_API_URL}/{media_type}/{media_id}"
)
raise_for_status(response)
itunes_page = safe_json(response)
if "storePlatformData" not in itunes_page:
raise Exception("Error getting iTunes page:", response.text)
logger.debug(f"iTunes page: {itunes_page}")
return itunes_page
-415
View File
@@ -1,415 +0,0 @@
from __future__ import annotations
import functools
import re
import time
import typing
from http.cookiejar import MozillaCookieJar
from pathlib import Path
from urllib.parse import urlparse
import requests
from .utils import raise_response_exception
class AppleMusicApi:
APPLE_MUSIC_HOMEPAGE_URL = "https://music.apple.com"
AMP_API_URL = "https://amp-api.music.apple.com"
WEBPLAYBACK_API_URL = (
"https://play.itunes.apple.com/WebObjects/MZPlay.woa/wa/webPlayback"
)
LICENSE_API_URL = "https://play.itunes.apple.com/WebObjects/MZPlay.woa/wa/acquireWebPlaybackLicense"
WAIT_TIME = 2
def __init__(
self,
storefront: str,
media_user_token: str | None = None,
language: str = "en-US",
):
self.media_user_token = media_user_token
self.storefront = storefront
self.language = language
self._set_session()
@classmethod
def from_netscape_cookies(
cls,
cookies_path: Path = Path("./cookies.txt"),
language: str = "en-US",
) -> AppleMusicApi:
parse_cookie = lambda name: next(
(
cookie.value
for cookie in cookies
if cookie.name == name
and cookie.domain.endswith(
urlparse(cls.APPLE_MUSIC_HOMEPAGE_URL).netloc
)
),
None,
)
cookies = MozillaCookieJar(cookies_path)
cookies.load(ignore_discard=True, ignore_expires=True)
media_user_token = parse_cookie("media-user-token")
if not media_user_token:
raise ValueError(
'"media-user-token" cookie not found in cookies. '
"Make sure you have exported the cookies from Apple Music webpage and are logged in "
"with an active subscription."
)
return cls(
storefront=None,
media_user_token=media_user_token,
language=language,
)
def _set_session(self):
self.session = requests.Session()
self.session.headers.update(
{
"accept": "*/*",
"accept-language": "en-US",
"origin": self.APPLE_MUSIC_HOMEPAGE_URL,
"priority": "u=1, i",
"referer": self.APPLE_MUSIC_HOMEPAGE_URL,
"sec-ch-ua": '"Google Chrome";v="137", "Chromium";v="137", "Not/A)Brand";v="24"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Windows"',
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-site",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36",
}
)
home_page = self.session.get(self.APPLE_MUSIC_HOMEPAGE_URL).text
index_js_uri = re.search(
r"/(assets/index-legacy[~-][^/\"]+\.js)",
home_page,
).group(1)
index_js_page = self.session.get(
f"{self.APPLE_MUSIC_HOMEPAGE_URL}/{index_js_uri}"
).text
token = re.search('(?=eyJh)(.*?)(?=")', index_js_page).group(1)
self.session.headers.update({"authorization": f"Bearer {token}"})
self.session.params = {"l": self.language}
if self.media_user_token:
self.session.cookies.update(
{
"media-user-token": self.media_user_token,
}
)
self._set_account_info()
def _set_account_info(self):
self.account_info = self.get_account_info()
self.storefront = self.account_info["meta"]["subscription"]["storefront"]
def _check_amp_api_response(self, response: requests.Response) -> None:
try:
response.raise_for_status()
response_dict = response.json()
assert response_dict.get("data") or response_dict.get("results") is not None
except (
requests.HTTPError,
requests.exceptions.JSONDecodeError,
AssertionError,
):
raise_response_exception(response)
def get_account_info(self, meta: str = "subscription") -> dict:
response = self.session.get(
f"{self.AMP_API_URL}/v1/me/account",
params={"meta": meta},
)
self._check_amp_api_response(response)
return response.json()
def get_artist(
self,
artist_id: str,
include: str = "albums,music-videos",
limit: int = 100,
fetch_all: bool = True,
) -> dict | None:
response = self.session.get(
f"{self.AMP_API_URL}/v1/catalog/{self.storefront}/artists/{artist_id}",
params={
"include": include,
**{f"limit[{_include}]": limit for _include in include.split(",")},
},
)
if response.status_code == 404:
return None
self._check_amp_api_response(response)
artist = response.json()["data"][0]
if fetch_all:
for _include in include.split(","):
for additional_data in self._extend_api_data(
artist["relationships"][_include],
limit,
"",
):
artist["relationships"][_include]["data"].extend(additional_data)
return artist
def get_song(
self,
song_id: str,
extend: str = "extendedAssetUrls",
include: str = "lyrics,albums",
) -> dict | None:
response = self.session.get(
f"{self.AMP_API_URL}/v1/catalog/{self.storefront}/songs/{song_id}",
params={
"include": include,
"extend": extend,
},
)
if response.status_code == 404:
return None
self._check_amp_api_response(response)
return response.json()["data"][0]
def get_music_video(
self,
music_video_id: str,
include: str = "albums",
) -> dict | None:
response = self.session.get(
f"{self.AMP_API_URL}/v1/catalog/{self.storefront}/music-videos/{music_video_id}",
params={
"include": include,
},
)
if response.status_code == 404:
return None
self._check_amp_api_response(response)
return response.json()["data"][0]
def get_post(
self,
post_id: str,
) -> dict | None:
response = self.session.get(
f"{self.AMP_API_URL}/v1/catalog/{self.storefront}/uploaded-videos/{post_id}"
)
if response.status_code == 404:
return None
self._check_amp_api_response(response)
return response.json()["data"][0]
@functools.lru_cache()
def get_album(
self,
album_id: str,
extend: str = "extendedAssetUrls",
) -> dict | None:
response = self.session.get(
f"{self.AMP_API_URL}/v1/catalog/{self.storefront}/albums/{album_id}",
params={
"extend": extend,
},
)
if response.status_code == 404:
return None
self._check_amp_api_response(response)
return response.json()["data"][0]
def get_playlist(
self,
playlist_id: str,
limit_tracks: int = 300,
extend: str = "extendedAssetUrls",
fetch_all: bool = True,
) -> dict | None:
response = self.session.get(
f"{self.AMP_API_URL}/v1/catalog/{self.storefront}/playlists/{playlist_id}",
params={
"extend": extend,
"limit[tracks]": limit_tracks,
},
)
if response.status_code == 404:
return None
self._check_amp_api_response(response)
playlist = response.json()["data"][0]
if fetch_all:
for additional_data in self._extend_api_data(
playlist["relationships"]["tracks"],
limit_tracks,
extend,
):
playlist["relationships"]["tracks"]["data"].extend(additional_data)
return playlist
def search(
self,
term: str,
types: str = "songs,albums,artists,playlists",
limit: int = 25,
offset: int = 0,
) -> dict | None:
response = self.session.get(
f"{self.AMP_API_URL}/v1/catalog/{self.storefront}/search",
params={
"term": term,
"types": types,
"limit": limit,
"offset": offset,
},
)
if response.status_code == 404:
return None
self._check_amp_api_response(response)
return response.json()["results"]
def get_library_album(
self,
album_id: str,
extend: str = "extendedAssetUrls",
) -> dict | None:
response = self.session.get(
f"{self.AMP_API_URL}/v1/me/library/albums/{album_id}",
params={
"extend": extend,
},
)
if response.status_code == 404:
return None
self._check_amp_api_response(response)
return response.json()["data"][0]
def get_library_playlist(
self,
playlist_id: str,
include: str = "tracks",
limit: int = 100,
extend: str = "extendedAssetUrls",
fetch_all: bool = True,
) -> dict | None:
response = self.session.get(
f"{self.AMP_API_URL}/v1/me/library/playlists/{playlist_id}",
params={
"include": include,
**{f"limit[{_include}]": limit for _include in include.split(",")},
"extend": extend,
},
)
if response.status_code == 404:
return None
self._check_amp_api_response(response)
playlist = response.json()["data"][0]
if fetch_all:
for additional_data in self._extend_api_data(
playlist["relationships"]["tracks"],
limit,
extend,
):
playlist["relationships"]["tracks"]["data"].extend(additional_data)
return playlist
def _extend_api_data(
self,
api_response: dict,
limit: int,
extend: str,
) -> typing.Generator[list[dict], None, None]:
next_uri = api_response.get("next")
while next_uri:
playlist_next = self._get_next_uri_response(next_uri, limit, extend)
yield playlist_next["data"]
next_uri = playlist_next.get("next")
time.sleep(self.WAIT_TIME)
def _get_next_uri_response(
self,
next_uri: str,
limit: int,
extend: str,
) -> dict:
response = self.session.get(
self.AMP_API_URL + next_uri,
params={
"limit": limit,
"extend": extend,
},
)
self._check_amp_api_response(response)
return response.json()
def get_webplayback(
self,
track_id: str,
) -> dict:
response = self.session.post(
self.WEBPLAYBACK_API_URL,
json={
"salableAdamId": track_id,
"language": self.language,
},
)
try:
response.raise_for_status()
response_dict = response.json()
webplayback = response_dict.get("songList")
assert webplayback
except (
requests.HTTPError,
requests.exceptions.JSONDecodeError,
AssertionError,
):
raise_response_exception(response)
return webplayback[0]
def get_widevine_license(
self,
track_id: str,
track_uri: str,
challenge: str,
) -> str:
response = self.session.post(
self.LICENSE_API_URL,
json={
"challenge": challenge,
"key-system": "com.widevine.alpha",
"uri": track_uri,
"adamId": track_id,
"isLibrary": False,
"user-initiated": True,
},
)
try:
response.raise_for_status()
response_dict = response.json()
widevine_license = response_dict.get("license")
assert widevine_license
except (
requests.HTTPError,
requests.exceptions.JSONDecodeError,
AssertionError,
):
raise_response_exception(response)
return widevine_license
-82
View File
@@ -1,82 +0,0 @@
from __future__ import annotations
import functools
import requests
from .constants import STOREFRONT_IDS
from .utils import raise_response_exception
class ItunesApi:
ITUNES_LOOKUP_API_URL = "https://itunes.apple.com/lookup"
ITUNES_PAGE_API_URL = "https://music.apple.com"
def __init__(
self,
storefront: str = "us",
language: str = "en-US",
):
self.storefront = storefront
self.language = language
self._setup_session()
def _setup_session(self):
try:
self.storefront_id = STOREFRONT_IDS[self.storefront.upper()]
except KeyError:
raise Exception(f"No storefront id for {self.storefront}")
self.session = requests.Session()
self.session.params = {
"country": self.storefront,
"lang": self.language,
}
self.session.headers = {
"X-Apple-Store-Front": f"{self.storefront_id} t:music31",
}
@functools.lru_cache()
def get_resource(
self,
resource_id: str,
entity: str = "album",
) -> dict | None:
response = self.session.get(
self.ITUNES_LOOKUP_API_URL,
params={
"id": resource_id,
"entity": entity,
},
)
try:
response.raise_for_status()
response_dict = response.json()
except (
requests.HTTPError,
requests.exceptions.JSONDecodeError,
):
raise_response_exception(response)
if response_dict.get("results"):
return response_dict["results"]
return None
def get_itunes_page(
self,
resource_type: str,
resource_id: str,
) -> dict | None:
response = self.session.get(
f"{self.ITUNES_PAGE_API_URL}/{resource_type}/{resource_id}"
)
try:
response.raise_for_status()
response_dict = response.json()
itunes_page = response_dict["storePlatformData"]["product-dv"][
"results"
].get(resource_id)
except (
requests.HTTPError,
requests.exceptions.JSONDecodeError,
):
raise_response_exception(response)
return itunes_page