♻️ close #57

This commit is contained in:
oskvr37
2025-01-27 21:38:47 +01:00
parent 39b3d38db5
commit e5b38fb537
2 changed files with 140 additions and 109 deletions
+3 -3
View File
@@ -38,8 +38,8 @@ class TestApi(unittest.TestCase):
self.assertEqual(artist.name, "Kanye West")
def test_artist_albums(self):
self.api.getArtistAlbums(25022)
self.api.getArtistAlbums(25022, onlyNonAlbum=True)
self.api.getArtistAlbums(25022, filter="ALBUMS")
self.api.getArtistAlbums(25022, filter="EPSANDSINGLES")
def test_album(self):
album = self.api.getAlbum(103805723)
@@ -71,7 +71,7 @@ class TestApi(unittest.TestCase):
self.assertGreaterEqual(len(favorites.ARTIST), 0)
def test_search(self):
self.api.search("Kanye West")
self.api.getSearch("Kanye West")
if __name__ == "__main__":
+137 -106
View File
@@ -1,57 +1,82 @@
import logging
import json
import logging
from pathlib import Path
from typing import Any, Literal, Type, TypeVar
from pydantic import BaseModel
from requests import Session
from tiddl.exceptions import AuthError, ApiError
from tiddl.models.api import (
Album,
AlbumItems,
Artist,
ArtistAlbumsItems,
Favorites,
Playlist,
PlaylistItems,
SessionResponse,
Search,
SessionResponse,
Track,
TrackStream,
Video,
)
from tiddl.models.constants import TrackQuality
from tiddl.models.resource import Track, Album, Playlist, Artist
from tiddl.exceptions import ApiError
DEBUG = False
API_URL = "https://api.tidal.com/v1"
T = TypeVar("T", bound=BaseModel)
# Tidal default limits
ARTIST_ALBUMS_LIMIT = 50
ALBUM_ITEMS_LIMIT = 10
PLAYLIST_LIMIT = 50
logger = logging.getLogger(__name__)
def ensureLimit(limit: int, max_limit: int) -> int:
if limit > max_limit:
logger.warning(f"Max limit is {max_limit}")
return max_limit
return limit
class Limits:
ARTIST_ALBUMS = 50
ALBUM_ITEMS = 10
ALBUM_ITEMS_MAX = 100
PLAYLIST = 50
class TidalApi:
URL = "https://api.tidal.com/v1"
LIMITS = Limits
def __init__(self, token: str, user_id: str, country_code: str) -> None:
self.token = token
self.user_id = user_id
self.country_code = country_code
self._session = Session()
self._session.headers = {"authorization": f"Bearer {token}"}
self._logger = logging.getLogger("TidalApi")
self.session = Session()
self.session.headers = {
"Authorization": f"Bearer {token}",
"Accept": "application/json",
}
def _request(self, endpoint: str, params={}):
self._logger.debug(f"{endpoint} {params}")
req = self._session.request(
method="GET", url=f"{API_URL}/{endpoint}", params=params
)
def fetch(
self, model: Type[T], endpoint: str, params: dict[str, Any] = {}
) -> T:
"""Fetch data from the API and parse it into the given Pydantic model."""
req = self.session.get(f"{self.URL}/{endpoint}", params=params)
logger.debug((endpoint, params, req.status_code))
data = req.json()
if req.status_code == 401:
raise AuthError(**data)
if req.status_code != 200:
raise ApiError(**data)
if DEBUG:
debug_data = {"endpoint": endpoint, "params": params, "data": data}
debug_data = {
"status_code": req.status_code,
"endpoint": endpoint,
"params": params,
"data": data,
}
path = Path(f"debug_data/{endpoint}.json")
path.parent.mkdir(parents=True, exist_ok=True)
@@ -59,98 +84,104 @@ class TidalApi:
with path.open("w", encoding="utf-8") as f:
json.dump(debug_data, f, indent=2)
return data
if req.status_code != 200:
raise ApiError(**data)
def getSession(self):
return SessionResponse(
**self._request(
"sessions",
)
return model.model_validate(data)
def getAlbum(self, album_id: str | int):
return self.fetch(
Album, f"albums/{album_id}", {"countryCode": self.country_code}
)
def getTrackStream(self, id: str | int, quality: TrackQuality):
return TrackStream(
**self._request(
f"tracks/{id}/playbackinfo",
{
"audioquality": quality,
"playbackmode": "STREAM",
"assetpresentation": "FULL",
},
)
def getAlbumItems(
self, album_id: str | int, limit=LIMITS.ALBUM_ITEMS, offset=0
):
return self.fetch(
AlbumItems,
f"albums/{album_id}/items",
{
"countryCode": self.country_code,
"limit": ensureLimit(limit, self.LIMITS.ALBUM_ITEMS_MAX),
"offset": offset,
},
)
def getTrack(self, id: str | int):
return Track(
**self._request(f"tracks/{id}", {"countryCode": self.country_code})
)
def getArtist(self, id: str | int):
return Artist(
**self._request(f"artists/{id}", {"countryCode": self.country_code})
def getArtist(self, artist_id: str | int):
return self.fetch(
Artist, f"artists/{artist_id}", {"countryCode": self.country_code}
)
def getArtistAlbums(
self, id: str | int, limit=ARTIST_ALBUMS_LIMIT, offset=0, onlyNonAlbum=False
self,
artist_id: str | int,
limit=LIMITS.ARTIST_ALBUMS,
offset=0,
filter: Literal["ALBUMS", "EPSANDSINGLES"] = "ALBUMS",
):
params = {"countryCode": self.country_code, "limit": limit, "offset": offset}
if onlyNonAlbum:
params.update({"filter": "EPSANDSINGLES"})
return ArtistAlbumsItems(
**self._request(
f"artists/{id}/albums",
params,
)
)
def getAlbum(self, id: str | int):
return Album(
**self._request(f"albums/{id}", {"countryCode": self.country_code})
)
def getAlbumItems(self, id: str | int, limit=ALBUM_ITEMS_LIMIT, offset=0):
MAX_LIMIT = 100
if limit > MAX_LIMIT:
logging.warning(f"Too big page, max page size is {MAX_LIMIT}")
limit = MAX_LIMIT
return AlbumItems(
**self._request(
f"albums/{id}/items",
{"countryCode": self.country_code, "limit": limit, "offset": offset},
)
)
def getPlaylist(self, uuid: str):
return Playlist(
**self._request(
f"playlists/{uuid}",
{"countryCode": self.country_code},
)
)
def getPlaylistItems(self, uuid: str, limit=PLAYLIST_LIMIT, offset=0):
return PlaylistItems(
**self._request(
f"playlists/{uuid}/items",
{"countryCode": self.country_code, "limit": limit, "offset": offset},
)
return self.fetch(
ArtistAlbumsItems,
f"artists/{artist_id}/albums",
{
"countryCode": self.country_code,
"limit": limit, # tested limit 10,000
"offset": offset,
"filter": filter,
},
)
def getFavorites(self):
return Favorites(
**self._request(
f"users/{self.user_id}/favorites/ids",
{"countryCode": self.country_code},
)
return self.fetch(
Favorites,
f"users/{self.user_id}/favorites/ids",
{"countryCode": self.country_code},
)
def search(self, query: str):
return Search(
**self._request(
"search", {"countryCode": self.country_code, "query": query}
)
def getPlaylist(self, playlist_uuid: str):
return self.fetch(
Playlist,
f"playlists/{playlist_uuid}",
{"countryCode": self.country_code},
)
def getPlaylistItems(
self, playlist_uuid: str, limit=LIMITS.PLAYLIST, offset=0
):
return self.fetch(
PlaylistItems,
f"playlists/{playlist_uuid}/items",
{
"countryCode": self.country_code,
"limit": limit,
"offset": offset,
},
)
def getSearch(self, query: str):
return self.fetch(
Search, "search", {"countryCode": self.country_code, "query": query}
)
def getSession(self):
return self.fetch(SessionResponse, "sessions")
def getTrack(self, track_id: str | int):
return self.fetch(
Track, f"tracks/{track_id}", {"countryCode": self.country_code}
)
def getTrackStream(self, track_id: str | int, quality: TrackQuality):
return self.fetch(
TrackStream,
f"tracks/{track_id}/playbackinfo",
{
"audioquality": quality,
"playbackmode": "STREAM",
"assetpresentation": "FULL",
},
)
def getVideo(self, video_id: str | int):
return self.fetch(
Video, f"videos/{video_id}", {"countryCode": self.country_code}
)