Merge pull request #70 from oskvr37/2.0

2.0
This commit is contained in:
Oskar Dudziński
2025-01-28 00:01:34 +01:00
committed by GitHub
35 changed files with 1657 additions and 1695 deletions
+8 -2
View File
@@ -1,3 +1,9 @@
{
"python.analysis.typeCheckingMode": "basic"
}
"python.analysis.typeCheckingMode": "basic",
"[python]": {
"editor.codeActionsOnSave": {
"source.organizeImports.ruff": "explicit",
}
},
"ruff.lineLength": 80,
}
+61 -69
View File
@@ -1,7 +1,6 @@
# Tidal Downloader
TIDDL is the Python CLI application that allows downloading Tidal tracks.
Fully typed, only 2 requirements.
TIDDL is Python CLI application that allows downloading Tidal tracks.
![GitHub top language](https://img.shields.io/github/languages/top/oskvr37/tiddl?style=for-the-badge)
![PyPI - Version](https://img.shields.io/pypi/v/tiddl?style=for-the-badge)
@@ -11,6 +10,9 @@ Fully typed, only 2 requirements.
It's inspired by [Tidal-Media-Downloader](https://github.com/yaronzz/Tidal-Media-Downloader) - currently not mantained project.
This repository will contain features requests from that project and will be the enhanced version.
> [!WARNING]
> This app is for personal use only and is not affiliated with Tidal. Users must ensure their use complies with Tidal's terms of service and local copyright laws. Downloaded tracks are for personal use and may not be shared or redistributed. The developer assumes no responsibility for misuse of this app.
# Installation
Install package using `pip`
@@ -19,84 +21,74 @@ Install package using `pip`
pip install tiddl
```
After installation you can use `tiddl` to set up auth token
Run the package cli with `tiddl`
```bash
$ tiddl
> go to https://link.tidal.com/xxxxx and add device!
authenticated!
token expires in 7 days
Usage: tiddl [OPTIONS] COMMAND [ARGS]...
TIDDL - Download Tidal tracks ✨
Options:
-v, --verbose Show debug logs
--help Show this message and exit.
Commands:
...
```
Use `tiddl -h` to show help message
# Basic usage
# CLI
Login with Tidal account
After authentication - when your token is ready - you can start downloading!
You can download `tracks` `albums` `playlists` `artists albums`
- `tiddl -s -q high` sets high quality as default quality
- `tiddl <input>` downloads with high quality
- `tiddl <input> -q master` downloads with best possible quality
- `tiddl 284165609 -p my_folder -o "{artist} - {title}"` downloads track to `my_folder/{artist} - {title}.flac`
- `tiddl track/284165609 -p my_folder -o "{artist} - {title}" -s` same as above, but saves `my_folder` as default download path and `{artist} - {title}` as default file format
### Valid input
- 284165609 (will treat this as track id)
- https://tidal.com/browse/track/284165609
- track/284165609
- https://listen.tidal.com/album/284165608/track/284165609
- https://listen.tidal.com/album/284165608
- album/284165608
- https://listen.tidal.com/artist/7695548
- artist/7695548
- https://listen.tidal.com/playlist/803be625-97e4-4cbb-88dd-43f0b1c61ed7
- playlist/803be625-97e4-4cbb-88dd-43f0b1c61ed7
### File formatting
| Key | Example | Comment |
| --------------- | ------------------------- | ------------------------------------------------------------- |
| title | Money Trees | |
| artist | Kendrick Lamar | |
| artists | Kendrick Lamar, Jay Rock | |
| album | good kid, m.A.A.d city | |
| number | 5 | number on album |
| disc_number | 1 | number of album volume |
| released | 10/22/2012 | release date |
| year | 2012 | year of release date |
| playlist | Kendrick Lamar Essentials | title of playlist will only appear when you download playlist |
| playlist_number | 15 | index of track on the playlist |
| id | 20556797 | id on Tidal |
# Modules
You can also use TIDDL as module, it's fully typed so you will get type hints
```python
from tiddl import TidalApi, Config
config = Config()
api = TidalApi(
config["token"],
config["user"]["user_id"],
config["user"]["country_code"]
)
album_id = 284165608
album = api.getAlbum(album_id)
print(f"{album["title"]} has {album["numberOfTracks"]} tracks!")
```bash
tiddl auth login
```
# Testing
Download track / album / artist / playlist
```bash
tiddl url https://listen.tidal.com/track/103805726 download
tiddl url https://listen.tidal.com/album/103805723 download
tiddl url https://listen.tidal.com/artist/25022 download
tiddl url https://listen.tidal.com/playlist/84974059-76af-406a-aede-ece2b78fa372 download
```
python -m unittest tiddl/tests.py
> [!TIP]
> You don't have to paste full urls, track/103805726, album/103805723 etc. will also work
Set download quality and output format
```bash
tiddl ... download -q master -o "{artist}/{title} ({album})"
```
This command will:
- download with highest quality
- save track with title and album name in artist folder
> [!NOTE]
> More about file templating [on wiki](https://github.com/oskvr37/tiddl/wiki/Template-formatting).
# Development
Clone the repository
```bash
git clone https://github.com/oskvr37/tiddl
```
Install package with `--editable` flag
```bash
pip install -e .
```
Run tests
```bash
python -m unittest
```
# Resources
+29
View File
@@ -0,0 +1,29 @@
[build-system]
requires = ["setuptools>=42", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "tiddl"
version = "2.0.0"
description = "Download Tidal tracks with CLI downloader."
readme = "README.md"
requires-python = ">=3.11"
authors = [{ name = "oskvr37" }]
classifiers = [
"Environment :: Console",
"Programming Language :: Python :: 3",
"Operating System :: OS Independent",
]
dependencies = [
"pydantic>=2.9.2",
"requests>=2.20.0",
"click>=8.1.7",
"mutagen>=1.47.0",
"ffmpeg-python>=0.2.0",
]
[project.urls]
homepage = "https://github.com/oskvr37/tiddl"
[project.scripts]
tiddl = "tiddl.cli:cli"
-3
View File
@@ -1,3 +0,0 @@
requests>=2.20.0
mutagen>=1.47.0
ffmpeg-python>=0.2.0
-15
View File
@@ -1,15 +0,0 @@
from setuptools import setup, find_packages
setup(
name="tiddl",
version="1.9.4",
description="TIDDL (Tidal Downloader) is a Python CLI application that allows downloading Tidal tracks.",
long_description=open("README.md", encoding="utf-8").read(),
long_description_content_type="text/markdown",
readme="README.md",
author="oskvr37",
packages=find_packages(),
entry_points={
"console_scripts": ["tiddl=tiddl:main"],
},
)
View File
+78
View File
@@ -0,0 +1,78 @@
import unittest
from tiddl.config import Config
from tiddl.api import TidalApi
class TestApi(unittest.TestCase):
api: TidalApi
def setUp(self):
config = Config.fromFile()
auth = config.auth
token, user_id, country_code = (
auth.token,
auth.user_id,
auth.country_code
)
assert token, "No token found in config file"
assert user_id, "No user_id found in config file"
assert country_code, "No country_code found in config file"
self.api = TidalApi(token, user_id, country_code)
def test_ready(self):
session = self.api.getSession()
self.assertEqual(session.userId, int(self.api.user_id))
self.assertEqual(session.countryCode, self.api.country_code)
def test_track(self):
track = self.api.getTrack(103805726)
self.assertEqual(track.title, "Stronger")
def test_artist(self):
artist = self.api.getArtist(25022)
self.assertEqual(artist.name, "Kanye West")
def test_artist_albums(self):
self.api.getArtistAlbums(25022, filter="ALBUMS")
self.api.getArtistAlbums(25022, filter="EPSANDSINGLES")
def test_album(self):
album = self.api.getAlbum(103805723)
self.assertEqual(album.title, "Graduation")
def test_album_items(self):
album_items = self.api.getAlbumItems(103805723, limit=10)
self.assertEqual(len(album_items.items), 10)
album_items = self.api.getAlbumItems(103805723, limit=10, offset=10)
self.assertEqual(len(album_items.items), 4)
def test_playlist(self):
playlist = self.api.getPlaylist("84974059-76af-406a-aede-ece2b78fa372")
self.assertEqual(playlist.title, "Kanye West Essentials")
def test_playlist_items(self):
playlist_items = self.api.getPlaylistItems(
"84974059-76af-406a-aede-ece2b78fa372"
)
self.assertEqual(len(playlist_items.items), 25)
def test_favorites(self):
favorites = self.api.getFavorites()
self.assertGreaterEqual(len(favorites.PLAYLIST), 0)
self.assertGreaterEqual(len(favorites.ALBUM), 0)
self.assertGreaterEqual(len(favorites.VIDEO), 0)
self.assertGreaterEqual(len(favorites.TRACK), 0)
self.assertGreaterEqual(len(favorites.ARTIST), 0)
def test_search(self):
self.api.getSearch("Kanye West")
if __name__ == "__main__":
unittest.main()
+137
View File
@@ -0,0 +1,137 @@
import unittest
from tiddl.models.resource import Track
from tiddl.utils import TidalResource, formatTrack
class TestTidalResource(unittest.TestCase):
def test_resource_parsing(self):
positive_cases = [
("https://tidal.com/browse/track/12345678", "track", "12345678"),
("track/12345678", "track", "12345678"),
("https://tidal.com/browse/album/12345678", "album", "12345678"),
("album/12345678", "album", "12345678"),
("https://tidal.com/browse/playlist/12345678", "playlist", "12345678"),
("playlist/12345678", "playlist", "12345678"),
("https://tidal.com/browse/artist/12345678", "artist", "12345678"),
("artist/12345678", "artist", "12345678"),
]
for resource, expected_type, expected_id in positive_cases:
with self.subTest(resource=resource):
tidal_resource = TidalResource.fromString(resource)
self.assertEqual(tidal_resource.type, expected_type)
self.assertEqual(tidal_resource.id, expected_id)
def test_failing_cases(self):
failing_cases = [
"https://tidal.com/browse/invalid/12345678",
"invalid/12345678",
"https://tidal.com/browse/track/invalid",
"track/invalid",
"",
"invalid",
"https://tidal.com/browse/track/",
"track/",
"/12345678",
]
for resource in failing_cases:
with self.subTest(resource=resource):
with self.assertRaises(ValueError):
TidalResource.fromString(resource)
class TestFormatTrack(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.track = Track(
**{
"id": 66421438,
"title": "Shutdown",
"duration": 189,
"replayGain": -9.95,
"peak": 0.966051,
"allowStreaming": True,
"streamReady": True,
"adSupportedStreamReady": True,
"djReady": True,
"stemReady": False,
"streamStartDate": "2016-11-15T00:00:00.000+0000",
"premiumStreamingOnly": False,
"trackNumber": 9,
"volumeNumber": 1,
"version": None,
"popularity": 24,
"copyright": "(P) 2016 Boy Better Know",
"bpm": 69,
"url": "http://www.tidal.com/track/66421438",
"isrc": "GB7QY1500024",
"editable": False,
"explicit": True,
"audioQuality": "LOSSLESS",
"audioModes": ["STEREO"],
"mediaMetadata": {"tags": ["LOSSLESS", "HIRES_LOSSLESS"]},
"artist": {
"id": 3566984,
"name": "Skepta",
"type": "MAIN",
"picture": "747af850-fa9c-4178-a3e6-49259b67df86",
},
"artists": [
{
"id": 3566984,
"name": "Skepta",
"type": "MAIN",
"picture": "747af850-fa9c-4178-a3e6-49259b67df86",
}
],
"album": {
"id": 66421429,
"title": "Konnichiwa",
"cover": "e0c2f05e-e21f-47c5-9c37-2993437df27d",
"vibrantColor": "#ae3b31",
"videoCover": None,
},
"mixes": {"TRACK_MIX": "001aa4abeb471e8f55f5784772b478"},
"playlistNumber": None,
}
)
def test_templating(self):
test_cases = [
("{id}", "66421438"),
("{title}", "Shutdown"),
("{version}", ""),
("{artist}", "Skepta"),
("{artists}", "Skepta"),
("{album}", "Konnichiwa"),
("{number}", "9"),
("{disc}", "1"),
("{date:%m-%d-%y}", "11-15-16"),
("{date:%Y}", "2016"),
("{year}", "2016"),
("{playlist_number}", "0"),
("{playlist_number:02d}", "00"),
("{bpm}", "69"),
("{quality}", "high"),
("{artist}/{album}/{title}", "Skepta/Konnichiwa/Shutdown"),
("{number:02d}. {title}", "09. Shutdown"),
]
for template, expected_result in test_cases:
with self.subTest(template=template, expected_result=expected_result):
result = formatTrack(template, self.track)
self.assertEqual(result, expected_result)
def test_invalid_characters(self):
test_cases = ["\\", ":", '"', "?", "<", ">", "|", "{number}:{title}", "{date}"]
for template in test_cases:
with self.subTest(template=template):
with self.assertRaises(ValueError):
formatTrack(template, self.track)
if __name__ == "__main__":
unittest.main()
-357
View File
@@ -1,357 +0,0 @@
import os
import time
import logging
from random import randint
from .api import TidalApi, ApiError
from .auth import getDeviceAuth, getToken, refreshToken
from .config import Config
from .download import downloadTrackStream, Cover
from .parser import QUALITY_ARGS, parser
from .types import TRACK_QUALITY, TrackQuality, Track
from .types.api import _PlaylistItem
from .utils import (
RESOURCE,
parseURL,
formatFilename,
loadingSymbol,
setMetadata,
convertFileExtension,
initLogging,
parseFileInput,
)
SAVE_COVER = True
def main():
args = parser.parse_args()
initLogging(
silent=args.silent, verbose=args.verbose, colored_logging=not args.no_color
)
logger = logging.getLogger("TIDDL")
logger.debug(args)
config = Config()
include_singles = args.include_singles
download_path = args.download_path or config["settings"]["download_path"]
track_template = args.file_template or config["settings"]["track_template"]
track_quality = (
QUALITY_ARGS[args.quality]
if args.quality
else config["settings"]["track_quality"]
)
file_extension = args.file_extension or config["settings"]["file_extension"]
if args.save_options:
logger.info("saving new settings...")
settings = config.update(
{
"settings": {
"download_path": download_path,
"track_quality": track_quality,
"track_template": track_template,
"file_extension": file_extension,
}
}
).get("settings")
if settings:
print("Current Settings:")
for k, v in settings.items():
print(f'> {k.upper()} "{v}"')
logger.info(f"saved settings to {config.config_path}")
if not config["token"]:
auth = getDeviceAuth()
text = f"> go to https://{auth['verificationUriComplete']} and add device!"
expires_at = time.time() + auth["expiresIn"]
i = 0
while time.time() < expires_at:
for _ in range(50):
loadingSymbol(i, text)
i += 1
time.sleep(0.1)
token = getToken(auth["deviceCode"])
if token.get("error"):
continue
print()
config.update(
{
"token": token["access_token"],
"refresh_token": token["refresh_token"],
"token_expires_at": int(time.time()) + token["expires_in"],
"user": {
"user_id": str(token["user"]["userId"]),
"country_code": token["user"]["countryCode"],
},
}
)
logger.info(f"authenticated!")
break
else:
logger.info("time for authentication has expired")
return
t_now = int(time.time())
token_expired = t_now > config["token_expires_at"]
if token_expired:
token = refreshToken(config["refresh_token"])
config.update(
{
"token": token["access_token"],
"token_expires_at": int(time.time()) + token["expires_in"],
}
)
logger.info(f"refreshed token!")
time_to_expire = config["token_expires_at"] - t_now
days, hours = time_to_expire // (24 * 3600), time_to_expire % (24 * 3600) // 3600
days_text = f" {days} {'day' if days == 1 else 'days'}" if days else ""
hours_text = f" {hours} {'hour' if hours == 1 else 'hours'}" if hours else ""
logger.debug(f"token expires in{days_text}{hours_text}")
user_inputs: list[str] = args.input
if args.input_file:
file_inputs = parseFileInput(args.input_file)
user_inputs.extend(file_inputs)
if len(user_inputs) == 0:
logger.warning("no ID nor URL provided")
return
api = TidalApi(
config["token"], config["user"]["user_id"], config["user"]["country_code"]
)
def downloadTrack(
track: Track,
file_template: str,
skip_existing=True,
sleep=False,
playlist="",
cover_data=b"",
) -> tuple[str, str]:
if track.get("status", 200) != 200 or not track["allowStreaming"]:
raise ValueError(
f"The track is not streamable: {track["title"]} ({track["id"]})"
)
file_dir, file_name = formatFilename(file_template, track, playlist)
file_path = f"{download_path}/{file_dir}/{file_name}"
if skip_existing and (
os.path.isfile(file_path + ".m4a") or os.path.isfile(file_path + ".flac")
):
logger.info(f"already exists: {file_path}")
return file_dir, file_name
if sleep:
sleep_time = randint(5, 15) / 10 + 1
logger.info(f"sleeping for {sleep_time}s")
try:
time.sleep(sleep_time)
except KeyboardInterrupt:
logger.info("stopping...")
exit()
stream = api.getTrackStream(track["id"], track_quality)
logger.debug({"stream": stream})
quality = TRACK_QUALITY[stream["audioQuality"]]
MASTER_QUALITIES: list[TrackQuality] = ["HI_RES_LOSSLESS", "LOSSLESS"]
if stream["audioQuality"] in MASTER_QUALITIES:
bit_depth, sample_rate = stream.get("bitDepth"), stream.get("sampleRate")
if bit_depth is None or sample_rate is None:
raise ValueError(
"bitDepth and sampleRate must be provided for master qualities"
)
details = f"{bit_depth} bit {sample_rate/1000:.1f} kHz"
else:
details = quality["details"]
logger.info(f"{file_name} :: {quality['name']} Quality - {details}")
track_data, extension = downloadTrackStream(
stream["manifest"],
stream["manifestMimeType"],
)
os.makedirs(f"{download_path}/{file_dir}", exist_ok=True)
file_path = f"{download_path}/{file_dir}/{file_name}.{extension}"
with open(file_path, "wb+") as f:
f.write(track_data)
if not cover_data and track["album"]["cover"]:
cover = Cover(track["album"]["cover"])
cover_data = cover.content
setMetadata(file_path, extension, track, cover_data)
if file_extension:
file_path = convertFileExtension(
source_path=file_path, file_extension=file_extension
)
logger.info(f"track saved as {file_path}")
return file_dir, file_name
def downloadAlbum(album_id: str | int, skip_existing: bool):
album = api.getAlbum(album_id)
logger.info(f"album: {album['title']}")
# i dont know if limit 100 is suspicious
# but i will leave it here
album_items = api.getAlbumItems(album_id, limit=100)
album_cover = Cover(album["cover"])
for item in album_items["items"]:
track = item["item"]
if item["type"] != "track":
logger.warning(f"item is not a track: {track["title"]} ({track["id"]})")
continue
try:
file_dir, file_name = downloadTrack(
track,
file_template=args.file_template
or config["settings"]["album_template"],
skip_existing=skip_existing,
sleep=True,
cover_data=album_cover.content,
)
if SAVE_COVER:
album_cover.save(f"{download_path}/{file_dir}")
except ValueError as e:
logger.error(e)
skip_existing = not args.no_skip
failed_input = []
for user_input in user_inputs:
input_type: RESOURCE
input_id: str
if user_input.isdigit():
input_type = "track"
input_id = user_input
else:
try:
input_type, input_id = parseURL(user_input)
except ValueError as e:
logger.error(e)
failed_input.append(user_input)
continue
match input_type:
case "track":
try:
track = api.getTrack(input_id)
except ApiError as e:
logger.warning(f"{e.error['userMessage']} ({e.error['status']})")
continue
try:
downloadTrack(
track,
file_template=track_template,
skip_existing=skip_existing,
)
except ValueError as e:
logger.error(e)
continue
case "album":
downloadAlbum(input_id, skip_existing)
continue
case "artist":
all_albums = []
artist_albums = api.getArtistAlbums(input_id)
all_albums.extend(artist_albums["items"])
if include_singles:
artist_singles = api.getArtistAlbums(input_id, onlyNonAlbum=True)
all_albums.extend(artist_singles["items"])
for album in all_albums:
downloadAlbum(album["id"], skip_existing)
continue
case "playlist":
# TODO: add option to limit and set offset of playlist ✨
# or just make a feature in GUI that lets user choose
# which tracks from playlist to download
playlist = api.getPlaylist(input_id)
logger.info(f"playlist: {playlist['title']} ({playlist['url']})")
playlist_cover = Cover(
playlist["squareImage"], 1080
) # playlists have 1080x1080 size
items: list[_PlaylistItem] = []
offset = 0
while True:
playlist_items = api.getPlaylistItems(input_id, offset=offset)
items.extend(playlist_items["items"])
if (
playlist_items["limit"] + playlist_items["offset"]
> playlist_items["totalNumberOfItems"]
):
break
offset += playlist_items["limit"]
for index, item in enumerate(items, 1):
track = item["item"]
track["playlistNumber"] = index
try:
file_dir, file_name = downloadTrack(
track,
file_template=args.file_template
or config["settings"]["playlist_template"],
skip_existing=skip_existing,
sleep=True,
playlist=playlist["title"],
)
if SAVE_COVER:
playlist_cover.save(f"{download_path}/{file_dir}")
except ValueError as e:
logger.warning(f"track unavailable")
continue
case _:
logger.warning(f"invalid input: `{input_type}`")
failed_input.append(input_id)
if len(failed_input) > 0:
logger.info(f"failed: {failed_input}")
if __name__ == "__main__":
main()
+152 -81
View File
@@ -1,66 +1,179 @@
import json
import logging
from requests import Session
from typing import TypedDict
from pathlib import Path
from typing import Any, Literal, Type, TypeVar
from .types import (
ErrorResponse,
SessionResponse,
TrackQuality,
Track,
TrackStream,
AristAlbumsItems,
from pydantic import BaseModel
from requests import Session
from tiddl.models.api import (
Album,
AlbumItems,
Artist,
ArtistAlbumsItems,
Favorites,
Playlist,
PlaylistItems,
Favorites,
Search,
SessionResponse,
Track,
TrackStream,
Video,
)
API_URL = "https://api.tidal.com/v1"
from tiddl.models.constants import TrackQuality
from tiddl.exceptions import ApiError
# Tidal default limits
ARTIST_ALBUMS_LIMIT = 50
ALBUM_ITEMS_LIMIT = 10
PLAYLIST_LIMIT = 50
DEBUG = False
T = TypeVar("T", bound=BaseModel)
logger = logging.getLogger(__name__)
class ApiError(Exception):
def __init__(self, message: str, error: ErrorResponse):
super().__init__(message)
self.error = error
def ensureLimit(limit: int, max_limit: int) -> int:
if limit > max_limit:
logger.warning(f"Max limit is {max_limit}")
return max_limit
return limit
class Limits:
ARTIST_ALBUMS = 50
ALBUM_ITEMS = 10
ALBUM_ITEMS_MAX = 100
PLAYLIST = 50
class TidalApi:
URL = "https://api.tidal.com/v1"
LIMITS = Limits
def __init__(self, token: str, user_id: str, country_code: str) -> None:
self.token = token
self.user_id = user_id
self.country_code = country_code
self._session = Session()
self._session.headers = {"authorization": f"Bearer {token}"}
self._logger = logging.getLogger("TidalApi")
self.session = Session()
self.session.headers = {
"Authorization": f"Bearer {token}",
"Accept": "application/json",
}
def _request(self, endpoint: str, params={}):
self._logger.debug(f"{endpoint} {params}")
req = self._session.request(
method="GET", url=f"{API_URL}/{endpoint}", params=params
)
def fetch(
self, model: Type[T], endpoint: str, params: dict[str, Any] = {}
) -> T:
"""Fetch data from the API and parse it into the given Pydantic model."""
req = self.session.get(f"{self.URL}/{endpoint}", params=params)
logger.debug((endpoint, params, req.status_code))
data = req.json()
if DEBUG:
debug_data = {
"status_code": req.status_code,
"endpoint": endpoint,
"params": params,
"data": data,
}
path = Path(f"debug_data/{endpoint}.json")
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as f:
json.dump(debug_data, f, indent=2)
if req.status_code != 200:
raise ApiError(req.text, data)
raise ApiError(**data)
return data
return model.model_validate(data)
def getSession(self) -> SessionResponse:
return self._request(
f"sessions",
def getAlbum(self, album_id: str | int):
return self.fetch(
Album, f"albums/{album_id}", {"countryCode": self.country_code}
)
def getTrackStream(self, id: str | int, quality: TrackQuality) -> TrackStream:
return self._request(
f"tracks/{id}/playbackinfo",
def getAlbumItems(
self, album_id: str | int, limit=LIMITS.ALBUM_ITEMS, offset=0
):
return self.fetch(
AlbumItems,
f"albums/{album_id}/items",
{
"countryCode": self.country_code,
"limit": ensureLimit(limit, self.LIMITS.ALBUM_ITEMS_MAX),
"offset": offset,
},
)
def getArtist(self, artist_id: str | int):
return self.fetch(
Artist, f"artists/{artist_id}", {"countryCode": self.country_code}
)
def getArtistAlbums(
self,
artist_id: str | int,
limit=LIMITS.ARTIST_ALBUMS,
offset=0,
filter: Literal["ALBUMS", "EPSANDSINGLES"] = "ALBUMS",
):
return self.fetch(
ArtistAlbumsItems,
f"artists/{artist_id}/albums",
{
"countryCode": self.country_code,
"limit": limit, # tested limit 10,000
"offset": offset,
"filter": filter,
},
)
def getFavorites(self):
return self.fetch(
Favorites,
f"users/{self.user_id}/favorites/ids",
{"countryCode": self.country_code},
)
def getPlaylist(self, playlist_uuid: str):
return self.fetch(
Playlist,
f"playlists/{playlist_uuid}",
{"countryCode": self.country_code},
)
def getPlaylistItems(
self, playlist_uuid: str, limit=LIMITS.PLAYLIST, offset=0
):
return self.fetch(
PlaylistItems,
f"playlists/{playlist_uuid}/items",
{
"countryCode": self.country_code,
"limit": limit,
"offset": offset,
},
)
def getSearch(self, query: str):
return self.fetch(
Search, "search", {"countryCode": self.country_code, "query": query}
)
def getSession(self):
return self.fetch(SessionResponse, "sessions")
def getTrack(self, track_id: str | int):
return self.fetch(
Track, f"tracks/{track_id}", {"countryCode": self.country_code}
)
def getTrackStream(self, track_id: str | int, quality: TrackQuality):
return self.fetch(
TrackStream,
f"tracks/{track_id}/playbackinfo",
{
"audioquality": quality,
"playbackmode": "STREAM",
@@ -68,49 +181,7 @@ class TidalApi:
},
)
def getTrack(self, id: str | int) -> Track:
return self._request(f"tracks/{id}", {"countryCode": self.country_code})
def getArtistAlbums(
self, id: str | int, limit=ARTIST_ALBUMS_LIMIT, offset=0, onlyNonAlbum=False
) -> AristAlbumsItems:
params = {"countryCode": self.country_code, "limit": limit, "offset": offset}
if onlyNonAlbum:
params.update({"filter": "EPSANDSINGLES"})
return self._request(
f"artists/{id}/albums",
params,
)
def getAlbum(self, id: str | int) -> Album:
return self._request(f"albums/{id}", {"countryCode": self.country_code})
def getAlbumItems(
self, id: str | int, limit=ALBUM_ITEMS_LIMIT, offset=0
) -> AlbumItems:
return self._request(
f"albums/{id}/items",
{"countryCode": self.country_code, "limit": limit, "offset": offset},
)
def getPlaylist(self, uuid: str) -> Playlist:
return self._request(
f"playlists/{uuid}",
{"countryCode": self.country_code},
)
def getPlaylistItems(
self, uuid: str, limit=PLAYLIST_LIMIT, offset=0
) -> PlaylistItems:
return self._request(
f"playlists/{uuid}/items",
{"countryCode": self.country_code, "limit": limit, "offset": offset},
)
def getFavorites(self) -> Favorites:
return self._request(
f"users/{self.user_id}/favorites/ids",
{"countryCode": self.country_code},
def getVideo(self, video_id: str | int):
return self.fetch(
Video, f"videos/{video_id}", {"countryCode": self.country_code}
)
+48 -10
View File
@@ -1,21 +1,35 @@
import logging
from requests import request
from .types.auth import AuthDeviceResponse, AuthResponse, AuthResponseWithRefresh
from .exceptions import AuthError
from .models import auth
AUTH_URL = "https://auth.tidal.com/v1/oauth2"
CLIENT_ID = "zU4XHVVkc2tDPo4t"
CLIENT_SECRET = "VJKhDFqJPqvsPVNBV6ukXTJmwlvbttP7wlMlrc72se4="
def getDeviceAuth() -> AuthDeviceResponse:
return request(
logger = logging.getLogger(__name__)
def getDeviceAuth():
req = request(
"POST",
f"{AUTH_URL}/device_authorization",
data={"client_id": CLIENT_ID, "scope": "r_usr+w_usr+w_sub"},
).json()
)
data = req.json()
if req.status_code == 200:
return auth.AuthDeviceResponse(**data)
raise AuthError(**data)
def getToken(device_code: str) -> AuthResponseWithRefresh:
return request(
def getToken(device_code: str):
req = request(
"POST",
f"{AUTH_URL}/token",
data={
@@ -25,11 +39,18 @@ def getToken(device_code: str) -> AuthResponseWithRefresh:
"scope": "r_usr+w_usr+w_sub",
},
auth=(CLIENT_ID, CLIENT_SECRET),
).json()
)
data = req.json()
if req.status_code == 200:
return auth.AuthResponseWithRefresh(**data)
raise AuthError(**data)
def refreshToken(refresh_token: str) -> AuthResponse:
return request(
def refreshToken(refresh_token: str):
req = request(
"POST",
f"{AUTH_URL}/token",
data={
@@ -39,4 +60,21 @@ def refreshToken(refresh_token: str) -> AuthResponse:
"scope": "r_usr+w_usr+w_sub",
},
auth=(CLIENT_ID, CLIENT_SECRET),
).json()
)
data = req.json()
if req.status_code == 200:
return auth.AuthResponse(**data)
raise AuthError(**data)
def removeToken(access_token: str):
req = request(
"POST",
"https://api.tidal.com/v1/logout",
headers={"authorization": f"Bearer {access_token}"},
)
logger.debug((req.status_code, req.text))
+34
View File
@@ -0,0 +1,34 @@
import click
import logging
from .ctx import ContextObj, passContext, Context
from .auth import AuthGroup
from .download import UrlGroup, FavGroup, SearchGroup, FileGroup
from .config import ConfigCommand
@click.group()
@passContext
@click.option("--verbose", "-v", is_flag=True, help="Show debug logs")
def cli(ctx: Context, verbose: bool):
"""TIDDL - Download Tidal tracks \u266b"""
ctx.obj = ContextObj()
logging.basicConfig(
level=logging.DEBUG if verbose else logging.INFO,
handlers=[logging.StreamHandler()],
format="%(levelname)s [%(name)s.%(funcName)s] %(message)s",
)
logging.getLogger("urllib3").setLevel(logging.ERROR)
cli.add_command(ConfigCommand)
cli.add_command(AuthGroup)
cli.add_command(UrlGroup)
cli.add_command(FavGroup)
cli.add_command(SearchGroup)
cli.add_command(FileGroup)
if __name__ == "__main__":
cli()
+98
View File
@@ -0,0 +1,98 @@
import click
import logging
from click import style
from time import sleep, time
from tiddl.auth import getDeviceAuth, getToken, refreshToken, removeToken, AuthError
from tiddl.config import AuthConfig
from .ctx import passContext, Context
logger = logging.getLogger(__name__)
@click.group("auth")
def AuthGroup():
"""Manage Tidal token"""
@AuthGroup.command("login")
@passContext
def login(ctx: Context):
"""Add token to the config"""
auth = ctx.obj.config.auth
if auth.token:
if auth.refresh_token and time() > auth.expires:
click.echo(style("Refreshing token...", fg="yellow"))
token = refreshToken(auth.refresh_token)
ctx.obj.config.auth.expires = token.expires_in + int(time())
ctx.obj.config.auth.token = token.access_token
ctx.obj.config.save()
click.echo(style("Authenticated!", fg="green"))
return
auth = getDeviceAuth()
uri = f"https://{auth.verificationUriComplete}"
click.launch(uri)
click.echo(f"Go to {style(uri, fg='cyan')} and complete authentication!")
time_left = time() + auth.expiresIn
while True:
sleep(auth.interval)
try:
token = getToken(auth.deviceCode)
except AuthError as e:
if e.error == "authorization_pending":
# FIX: `Time left: 0 secondsss` 🐍
click.echo(f"\rTime left: {time_left - time():.0f} seconds", nl=False)
continue
if e.error == "expired_token":
click.echo(
f"\nTime for authentication {style('has expired', fg='red')}."
)
break
new_auth = AuthConfig(
token=token.access_token,
refresh_token=token.refresh_token,
expires=token.expires_in + int(time()),
user_id=str(token.user.userId),
country_code=token.user.countryCode,
)
ctx.obj.config.auth = new_auth
ctx.obj.config.save()
click.echo(style("\nAuthenticated!", fg="green"))
break
@AuthGroup.command("logout")
@passContext
def logout(ctx: Context):
"""Remove token from config"""
access_token = ctx.obj.config.auth.token
if not access_token:
click.echo(style("Not logged in", fg="yellow"))
return
removeToken(access_token)
ctx.obj.config.auth = AuthConfig()
ctx.obj.config.save()
click.echo(style("Logged out!", fg="green"))
+19
View File
@@ -0,0 +1,19 @@
import click
from tiddl.config import CONFIG_PATH
@click.command("config")
@click.option(
"--open",
"-o",
is_flag=True,
help="Open the configuration file with the default editor",
)
def ConfigCommand(open: bool):
"""Print path to the configuration file"""
click.echo(str(CONFIG_PATH))
if open:
click.launch(str(CONFIG_PATH))
+49
View File
@@ -0,0 +1,49 @@
import functools
import click
from typing import Callable, TypeVar, cast
from tiddl.api import TidalApi
from tiddl.config import Config
from tiddl.utils import TidalResource
class ContextObj:
api: TidalApi | None
config: Config
resources: list[TidalResource]
def __init__(self) -> None:
self.config = Config.fromFile()
self.resources = []
self.api = None
auth = self.config.auth
if auth.token and auth.user_id and auth.country_code:
self.api = TidalApi(auth.token, auth.user_id, auth.country_code)
def getApi(self) -> TidalApi:
if self.api is None:
raise click.UsageError("You must login first")
return self.api
class Context(click.Context):
obj: ContextObj
F = TypeVar("F", bound=Callable[..., None])
def passContext(func: F) -> F:
"""Wrapper for @click.pass_context to use custom Context"""
@click.pass_context
@functools.wraps(func)
def wrapper(ctx: click.Context, *args, **kwargs):
custom_ctx = cast(Context, ctx)
return func(custom_ctx, *args, **kwargs)
return cast(F, wrapper)
+162
View File
@@ -0,0 +1,162 @@
import click
from .fav import FavGroup
from .file import FileGroup
from .search import SearchGroup
from .url import UrlGroup
from ..ctx import Context, passContext
from tiddl.download import downloadTrackStream
from tiddl.utils import formatTrack, trackExists, TidalResource
from tiddl.metadata import addMetadata, Cover
from tiddl.exceptions import ApiError, AuthError
from tiddl.models.constants import TrackArg, ARG_TO_QUALITY
from tiddl.models.resource import Track, Album
from tiddl.models.api import PlaylistItems
@click.command("download")
@click.option("--quality", "-q", "quality", type=click.Choice(TrackArg.__args__))
@click.option(
"--output", "-o", "template", type=str, help="Format track file template."
)
@click.option(
"--noskip",
"-ns",
"noskip",
is_flag=True,
default=False,
help="Dont skip downloaded tracks.",
)
@passContext
def DownloadCommand(
ctx: Context, quality: TrackArg | None, template: str | None, noskip: bool
):
"""Download the tracks"""
api = ctx.obj.getApi()
def downloadTrack(track: Track, file_name: str, cover_data=b""):
if not track.allowStreaming:
click.echo(
f"{click.style('', 'yellow')} Track {click.style(file_name, 'yellow')} does not allow streaming"
)
return
download_quality = ARG_TO_QUALITY[quality or ctx.obj.config.download.quality]
# .suffix is needed because the Path.with_suffix method will replace any content after dot
# for example: 'album/01. title' becomes 'album/01.m4a'
path = ctx.obj.config.download.path / f"{file_name}.suffix"
if not noskip and trackExists(track.audioQuality, download_quality, path):
click.echo(
f"{click.style('', 'cyan')} Skipping track {click.style(file_name, 'cyan')}"
)
return
click.echo(
f"{click.style('', 'green')} Downloading track {click.style(file_name, 'green')}"
)
track_stream = api.getTrackStream(track.id, download_quality)
stream_data, file_extension = downloadTrackStream(track_stream)
full_path = path.with_suffix(file_extension)
full_path.parent.mkdir(parents=True, exist_ok=True)
with full_path.open("wb") as f:
f.write(stream_data)
# TODO: add track credits fetching to fill more metadata
if not cover_data and track.album.cover:
cover_data = Cover(track.album.cover).content
addMetadata(full_path, track, cover_data)
def downloadAlbum(album: Album):
click.echo(f"★ Album {album.title}")
# TODO: fetch all items
album_items = api.getAlbumItems(album.id, limit=100)
cover_data = Cover(album.cover).content if album.cover else b""
for item in album_items.items:
if isinstance(item.item, Track):
track = item.item
file_name = formatTrack(
template=template or ctx.obj.config.template.album,
track=track,
album_artist=album.artist.name,
)
downloadTrack(track=track, file_name=file_name, cover_data=cover_data)
def handleResource(resource: TidalResource):
match resource.type:
case "track":
track = api.getTrack(resource.id)
file_name = formatTrack(
template=template or ctx.obj.config.template.track, track=track
)
downloadTrack(
track=track,
file_name=file_name,
)
case "album":
album = api.getAlbum(resource.id)
downloadAlbum(album)
case "artist":
# TODO: add `include_singles`
# TODO: fetch all items
artist_albums = api.getArtistAlbums(resource.id)
for album in artist_albums.items:
downloadAlbum(album)
case "playlist":
playlist = api.getPlaylist(resource.id)
click.echo(f"★ Playlist {playlist.title}")
# TODO: fetch all items
playlist_items = api.getPlaylistItems(resource.id)
for item in playlist_items.items:
if isinstance(
item.item, PlaylistItems.PlaylistTrackItem.PlaylistTrack
):
track = item.item
file_name = formatTrack(
template=template or ctx.obj.config.template.playlist,
track=track,
playlist_title=playlist.title,
playlist_index=track.index // 100000,
)
downloadTrack(track=item.item, file_name=file_name)
for resource in ctx.obj.resources:
try:
handleResource(resource)
except ApiError as e:
click.echo(click.style(f"{e}", "red"))
except AuthError as e:
click.echo(click.style(f"{e}", "red"))
UrlGroup.add_command(DownloadCommand)
SearchGroup.add_command(DownloadCommand)
FavGroup.add_command(DownloadCommand)
FileGroup.add_command(DownloadCommand)
+46
View File
@@ -0,0 +1,46 @@
import click
from tiddl.utils import TidalResource, ResourceTypeLiteral
from ..ctx import Context, passContext
ResourceTypeList: list[ResourceTypeLiteral] = ["track", "album", "artist", "playlist"]
@click.group("fav")
@click.option(
"--resource",
"-r",
"resource_types",
multiple=True,
type=click.Choice(ResourceTypeList),
)
@passContext
def FavGroup(ctx: Context, resource_types: list[ResourceTypeLiteral]):
"""Get your Tidal favorites"""
api = ctx.obj.getApi()
favorites = api.getFavorites()
favorites_dict = favorites.model_dump()
click.echo(type(resource_types))
if not resource_types:
resource_types = ResourceTypeList
stats: dict[ResourceTypeLiteral, int] = dict()
for resource_type in resource_types:
resources = favorites_dict[resource_type.upper()]
stats[resource_type] = len(resources)
for resource_id in resources:
ctx.obj.resources.append(TidalResource(id=resource_id, type=resource_type))
# TODO: show pretty message
click.echo(click.style(f"Loaded {len(ctx.obj.resources)} resources", "green"))
for resource_type, count in stats.items():
click.echo(f"{resource_type} - {count}")
+40
View File
@@ -0,0 +1,40 @@
import click
import json
from io import TextIOWrapper
from os.path import splitext
from ..ctx import Context, passContext
from tiddl.utils import TidalResource
@click.group("file")
@click.argument("filename", type=click.File(mode="r"))
@passContext
def FileGroup(ctx: Context, filename: TextIOWrapper):
"""Parse txt or JSON file with urls"""
_, extension = splitext(filename.name)
resource_strings: list[str]
match extension:
case ".json":
try:
resource_strings = json.load(filename)
except json.JSONDecodeError as e:
raise click.UsageError(f"Cant decode JSON file - {e.msg}")
case ".txt":
resource_strings = [line.strip() for line in filename.readlines()]
case _:
raise click.UsageError(f"Unsupported file extension - {extension}")
for string in resource_strings:
try:
ctx.obj.resources.append(TidalResource.fromString(string))
except ValueError as e:
click.echo(click.style(e, "red"))
click.echo(click.style(f"Loaded {len(ctx.obj.resources)} resources", "green"))
+44
View File
@@ -0,0 +1,44 @@
import click
from tiddl.utils import TidalResource
from tiddl.models.resource import Artist, Album, Playlist, Track, Video
from ..ctx import Context, passContext
@click.group("search")
@click.argument("query")
@passContext
def SearchGroup(ctx: Context, query: str):
"""Search on Tidal"""
# TODO: give user interactive choice what to select
api = ctx.obj.getApi()
search = api.getSearch(query)
# issue is that we get resource data in search api call,
# in download we refetch that data.
# it's not that big deal as we refetch one resource at most,
# but it should be redesigned
value = search.topHit.value
icon = click.style("\u2bcc", "magenta")
if isinstance(value, Album):
resource = TidalResource(type="album", id=str(value.id))
click.echo(f"{icon} Album {value.title}")
elif isinstance(value, Artist):
resource = TidalResource(type="artist", id=str(value.id))
click.echo(f"{icon} Artist {value.name}")
elif isinstance(value, Track):
resource = TidalResource(type="track", id=str(value.id))
click.echo(f"{icon} Track {value.title}")
elif isinstance(value, Playlist):
resource = TidalResource(type="playlist", id=str(value.uuid))
click.echo(f"{icon} Playlist {value.title}")
elif isinstance(value, Video):
click.echo(f"{icon} Video {value.title} (currently not supported)")
ctx.obj.resources.append(resource)
+27
View File
@@ -0,0 +1,27 @@
import click
from ..ctx import Context, passContext
from tiddl.utils import TidalResource
class TidalURL(click.ParamType):
def convert(self, value: str, param, ctx) -> TidalResource:
try:
return TidalResource.fromString(value)
except ValueError as e:
self.fail(message=str(e), param=param, ctx=ctx)
@click.group("url")
@click.argument("url", type=TidalURL())
@passContext
def UrlGroup(ctx: Context, url: TidalResource):
"""
Get Tidal URL.
It can be Tidal link or `resource_type/resource_id` format.
The resource can be a track, album, playlist or artist.
"""
ctx.obj.resources.append(url)
+32 -89
View File
@@ -1,105 +1,48 @@
import json
import logging
from pydantic import BaseModel
from pathlib import Path
from typing import TypedDict, Any
from .types import TrackQuality
from tiddl.models.constants import TrackArg
class Settings(TypedDict, total=False):
download_path: str
track_quality: TrackQuality
track_template: str
album_template: str
playlist_template: str
file_extension: str
CONFIG_PATH = Path.home() / "tiddl.json"
CONFIG_INDENT = 2
class User(TypedDict, total=False):
user_id: str
country_code: str
class TemplateConfig(BaseModel):
track: str = "{artist} - {title}"
album: str = "{album_artist}/{album}/{number:02d}. {title}"
playlist: str = "{playlist}/{playlist_number:02d}. {artist} - {title}"
class ConfigData(TypedDict, total=False):
token: str
refresh_token: str
token_expires_at: int
settings: Settings
user: User
class DownloadConfig(BaseModel):
quality: TrackArg = "high"
path: Path = Path.home() / "Music" / "Tiddl"
HOME_DIRECTORY = str(Path.home())
CONFIG_FILENAME = ".tiddl_config.json"
DEFAULT_CONFIG: ConfigData = {
"token": "",
"refresh_token": "",
"token_expires_at": 0,
"settings": {
"download_path": f"{HOME_DIRECTORY}/tidal_download",
"track_quality": "HIGH",
"track_template": "{artist}/{title}",
"album_template": "{artist}/{album}/{title}",
"playlist_template": "{playlist}/{title}",
"file_extension": ""
},
"user": {"user_id": "", "country_code": ""},
}
class AuthConfig(BaseModel):
token: str = ""
refresh_token: str = ""
expires: int = 0
user_id: str = ""
country_code: str = ""
class Config:
def __init__(self, config_path="") -> None:
if config_path == "":
self.config_directory = HOME_DIRECTORY
else:
self.config_directory = config_path
class Config(BaseModel):
template: TemplateConfig = TemplateConfig()
download: DownloadConfig = DownloadConfig()
auth: AuthConfig = AuthConfig()
self.config_path = f"{self.config_directory}/{CONFIG_FILENAME}"
self._config: ConfigData = DEFAULT_CONFIG
self._logger = logging.getLogger("Config")
def save(self):
with open(CONFIG_PATH, "w") as f:
f.write(self.model_dump_json(indent=CONFIG_INDENT))
@classmethod
def fromFile(cls):
try:
with open(self.config_path, "r") as f:
loaded_config: ConfigData = json.load(f)
loaded_settings = loaded_config.get("settings")
self._logger.debug(f"loaded {loaded_settings}")
self.update(loaded_config)
with CONFIG_PATH.open() as f:
config = cls.model_validate_json(f.read())
except FileNotFoundError:
self._logger.debug("creating new file")
self._save() # save default config if file does not exist
self._logger.debug("created new file")
config = cls()
def _save(self) -> None:
with open(self.config_path, "w") as f:
self._logger.debug(self._config.get("settings"))
json.dump(self._config, f, indent=2)
def __getitem__(self, key: str) -> Any:
return self._config[key]
def __iter__(self):
return iter(self._config)
def __str__(self) -> str:
return json.dumps(self._config, indent=2)
def update(self, data: ConfigData) -> ConfigData:
self._logger.debug("updating")
merged_config: ConfigData = merge(data, self._config)
self._config.update(merged_config)
self._save()
self._logger.debug("updated")
return self._config.copy()
def merge(source, destination):
# https://stackoverflow.com/a/20666342
for key, value in source.items():
if isinstance(value, dict):
# get node or create one
node = destination.setdefault(key, {})
merge(value, node)
else:
destination[key] = value
return destination
config.save()
return config
+32 -242
View File
@@ -1,92 +1,14 @@
import logging
import requests
import json
import os
import ffmpeg
from queue import Queue
from threading import Thread
from time import time
from xml.etree.ElementTree import fromstring
from requests import Session
from pydantic import BaseModel
from base64 import b64decode
from typing import TypedDict, List
from xml.etree.ElementTree import fromstring
from .types.track import ManifestMimeType
THREADS_COUNT = 4
logger = logging.getLogger("download")
from tiddl.models.api import TrackStream
class Worker(Thread):
def __init__(self, queue: Queue, function):
Thread.__init__(self)
self.queue = queue
self.function = function
self.daemon = True
self.start()
def run(self):
while True:
arg = self.queue.get()
self.function(arg)
self.queue.task_done()
class Threader:
def __init__(self, workers_num: int, target, args: list) -> None:
self.queue = Queue()
for arg in args:
self.queue.put(arg)
self.workers: list[Worker] = [
Worker(self.queue, target) for _ in range(workers_num)
]
def run(self):
ts = time()
self.queue.join()
return round(time() - ts, 2)
class Downloader:
def __init__(self) -> None:
self.indexed_content: list[tuple[int, bytes]] = []
self.session = requests.Session()
self.total = 0
def download(self, urls: list[str]) -> bytes:
self.total = len(urls)
indexed_urls = [(i, url) for (i, url) in enumerate(urls)]
threader = Threader(THREADS_COUNT, self._downloadFragment, indexed_urls)
threader.run()
sorted_content = sorted(self.indexed_content, key=lambda x: x[0])
data = b"".join(content for _, content in sorted_content)
return data
def _downloadFragment(self, arg: tuple[int, str]):
index, url = arg
req = self.session.get(url)
self.indexed_content.append((index, req.content))
showProgressBar(
len(self.indexed_content), self.total, "threaded download", show_size=False
)
def decodeManifest(manifest: str):
return b64decode(manifest).decode()
def parseManifest(manifest: str):
class AudioFileInfo(TypedDict):
mimeType: str
codecs: str
encryptionType: str
urls: List[str]
data: AudioFileInfo = json.loads(manifest)
return data
logger = logging.getLogger(__name__)
def parseManifestXML(xml_content: str):
@@ -104,7 +26,7 @@ def parseManifestXML(xml_content: str):
if representationElement is None:
raise ValueError("Representation element not found")
codecs = representationElement.get("codecs")
codecs = representationElement.get("codecs", "")
segmentElement = representationElement.find(f"{NS}SegmentTemplate")
if segmentElement is None:
@@ -130,172 +52,40 @@ def parseManifestXML(xml_content: str):
return urls, codecs
def showProgressBar(iteration: int, total: int, text: str, length=30, show_size=True):
SQUARE, SQUARE_FILL = "", ""
iteration_mb = iteration / 1024 / 1024
total_mb = total / 1024 / 1024
percent = 100 * (iteration / total)
progress = int(length * iteration // total)
bar = f"{SQUARE_FILL * progress}{SQUARE * (length - progress)}"
size = f" {iteration_mb:.2f} / {total_mb:.2f} MB" if show_size else ""
print(
f"\r{text} {bar} {percent:.0f}%{size}",
end="\r",
)
if iteration >= total:
print()
class TrackManifest(BaseModel):
mimeType: str
codecs: str
encryptionType: str
urls: list[str]
def download(url: str) -> bytes:
logger.debug(url)
# use session for performance
session = requests.Session()
req = session.get(url, stream=True)
total_size = int(req.headers.get("content-length", 0))
block_size = 1024 * 1024
data = b""
def downloadTrackStream(stream: TrackStream) -> tuple[bytes, str]:
"""Download data from track stream and return it with file extension."""
for block in req.iter_content(block_size):
data += block
showProgressBar(len(data), total_size, "Single URL")
decoded_manifest = b64decode(stream.manifest).decode()
return data
def threadDownload(urls: list[str]) -> bytes:
dl = Downloader()
data = dl.download(urls)
return data
def toFlac(track_data: bytes) -> bytes:
process = (
ffmpeg.input("pipe:0")
.output("pipe:1", format="flac", codec="copy")
.run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True)
)
flac_data, stderr = process.communicate(input=track_data)
if process.returncode != 0:
raise RuntimeError(f"FFmpeg failed: {stderr.decode()}")
return flac_data
def downloadTrackStream(
encoded_manifest: str,
mime_type: ManifestMimeType,
) -> tuple[bytes, str]:
logger.debug(f"mime_type: {mime_type}")
manifest = decodeManifest(encoded_manifest)
match mime_type:
case "application/dash+xml":
track_urls, codecs = parseManifestXML(manifest)
match stream.manifestMimeType:
case "application/vnd.tidal.bts":
data = parseManifest(manifest)
track_urls, codecs = data["urls"], data["codecs"]
case _:
raise ValueError(f"Unknown `mime_type`: {mime_type}")
track_manifest = TrackManifest.model_validate_json(decoded_manifest)
urls, codecs = track_manifest.urls, track_manifest.codecs
logger.debug(f"codecs: {codecs}")
case "application/dash+xml":
urls, codecs = parseManifestXML(decoded_manifest)
if len(track_urls) == 1:
track_data = download(track_urls[0])
logger.debug((stream.trackId, stream.audioQuality, codecs, len(urls)))
if codecs == "flac":
file_extension = ".flac"
elif codecs.startswith("mp4"):
file_extension = ".m4a"
else:
track_data = threadDownload(track_urls)
track_data = toFlac(track_data)
raise ValueError(f"Unknown codecs: {codecs}")
"""
known codecs
flac (master)
mp4a.40.2 (high)
mp4a.40.5 (low)
"""
with Session() as s:
stream_data = b""
if codecs is None:
raise Exception("Missing codecs")
for url in urls:
req = s.get(url)
stream_data += req.content
extension = "flac"
if codecs.startswith("mp4a"):
extension = "m4a"
elif codecs != "flac":
logger.warning(
f'unknown file codecs: "{codecs}", please submit this as issue on GitHub'
)
return track_data, extension
def downloadCover(uid: str, path: str, size=1280):
file = f"{path}/cover.jpg"
if os.path.isfile(file):
logger.debug(f"cover already exists ({file})")
return
formatted_uid = uid.replace("-", "/")
url = f"https://resources.tidal.com/images/{formatted_uid}/{size}x{size}.jpg"
req = requests.get(url)
if req.status_code != 200:
logger.error(f"could not download cover. ({req.status_code}) {url}")
return
try:
with open(file, "wb") as f:
f.write(req.content)
except FileNotFoundError as e:
logger.error(f"could not save cover. {file} -> {e}")
class Cover:
def __init__(self, uid: str, size=1280) -> None:
if size > 1280:
logger.warning(
f"can not set cover size higher than 1280 (user set: {size})"
)
size = 1280
self.uid = uid
formatted_uid = uid.replace("-", "/")
self.url = (
f"https://resources.tidal.com/images/{formatted_uid}/{size}x{size}.jpg"
)
logger.debug((self.uid, self.url))
self.content = self.get()
def get(self) -> bytes:
req = requests.get(self.url)
if req.status_code != 200:
logger.error(f"could not download cover. ({req.status_code}) {self.url}")
return b""
logger.debug("got cover")
return req.content
def save(self, path: str):
if not self.content:
logger.error("cover file content is empty")
return
file = f"{path}/cover.jpg"
if os.path.isfile(file):
logger.debug(f"cover already exists ({file})")
return
try:
with open(file, "wb") as f:
logger.debug(file)
f.write(self.content)
except FileNotFoundError as e:
logger.error(f"could not save cover. {file} -> {e}")
return stream_data, file_extension
+21
View File
@@ -0,0 +1,21 @@
class AuthError(Exception):
def __init__(
self, status: int, error: str, sub_status: str, error_description: str
):
self.status = status
self.error = error
self.sub_status = sub_status
self.error_description = error_description
def __str__(self):
return f"{self.status}: {self.error} - {self.error_description}"
class ApiError(Exception):
def __init__(self, status: int, subStatus: str, userMessage: str):
self.status = status
self.sub_status = subStatus
self.user_message = userMessage
def __str__(self):
return f"{self.user_message} ({self.status} - {self.sub_status})"
+100
View File
@@ -0,0 +1,100 @@
import logging
import requests
from pathlib import Path
from mutagen.flac import FLAC as MutagenFLAC, Picture
from mutagen.easymp4 import EasyMP4 as MutagenEasyMP4
from mutagen.mp4 import MP4Cover, MP4 as MutagenMP4
from tiddl.models.resource import Track
logger = logging.getLogger(__name__)
def addMetadata(track_path: Path, track: Track, cover_data=b""):
extension = track_path.suffix
if extension == ".flac":
metadata = MutagenFLAC(track_path)
if cover_data:
picture = Picture()
picture.data = cover_data
picture.mime = "image/jpeg"
metadata.add_picture(picture)
elif extension == ".m4a":
if cover_data:
metadata = MutagenMP4(track_path)
metadata["covr"] = [MP4Cover(cover_data, imageformat=MP4Cover.FORMAT_JPEG)]
metadata.save(track_path)
metadata = MutagenEasyMP4(track_path)
else:
raise ValueError(f"Unknown file extension: {extension}")
new_metadata: dict[str, str] = {
"title": track.title,
"trackNumber": str(track.trackNumber),
"discnumber": str(track.volumeNumber),
"copyright": track.copyright,
"albumartist": track.artist.name if track.artist else "",
"artist": ";".join([artist.name.strip() for artist in track.artists]),
"album": track.album.title,
"date": str(track.streamStartDate) if track.streamStartDate else "",
}
metadata.update(new_metadata)
try:
metadata.save(track_path)
except Exception as e:
logger.error(f"Failed to set metadata for {extension}: {e}")
class Cover:
def __init__(self, uid: str, size=1280) -> None:
if size > 1280:
logger.warning(
f"can not set cover size higher than 1280 (user set: {size})"
)
size = 1280
self.uid = uid
formatted_uid = uid.replace("-", "/")
self.url = (
f"https://resources.tidal.com/images/{formatted_uid}/{size}x{size}.jpg"
)
logger.debug((self.uid, self.url))
self.content = self._get()
def _get(self) -> bytes:
req = requests.get(self.url)
if req.status_code != 200:
logger.error(f"could not download cover. ({req.status_code}) {self.url}")
return b""
logger.debug(f"got cover: {self.uid}")
return req.content
def save(self, directory_path: Path):
if not self.content:
logger.error("cover file content is empty")
return
file = directory_path / "cover.jpg"
if file.exists():
logger.debug(f"cover already exists ({file})")
return
try:
with file.open("wb") as f:
f.write(self.content)
except FileNotFoundError as e:
logger.error(f"could not save cover. {file} -> {e}")
View File
+137
View File
@@ -0,0 +1,137 @@
from pydantic import BaseModel
from typing import Optional, List, Literal, Union
from .resource import Album, Artist, Playlist, Track, TrackQuality, Video
__all__ = [
"SessionResponse",
"ArtistAlbumsItems",
"AlbumItems",
"PlaylistItems",
"Favorites",
"TrackStream",
"Search",
]
class SessionResponse(BaseModel):
class Client(BaseModel):
id: int
name: str
authorizedForOffline: bool
authorizedForOfflineDate: Optional[str]
sessionId: str
userId: int
countryCode: str
channelId: int
partnerId: int
client: Client
class Items(BaseModel):
limit: int
offset: int
totalNumberOfItems: int
class ArtistAlbumsItems(Items):
items: List[Album]
ItemType = Literal["track", "video"]
class AlbumItems(Items):
class VideoItem(BaseModel):
item: Video
type: ItemType = "video"
class TrackItem(BaseModel):
item: Track
type: ItemType = "track"
items: List[Union[TrackItem, VideoItem]]
class PlaylistItems(Items):
class PlaylistVideoItem(BaseModel):
class PlaylistVideo(Video):
dateAdded: str
index: int
itemUuid: str
item: PlaylistVideo
type: ItemType = "video"
cut: None
class PlaylistTrackItem(BaseModel):
class PlaylistTrack(Track):
dateAdded: str
index: int
itemUuid: str
item: PlaylistTrack
type: ItemType = "track"
cut: None
items: List[Union[PlaylistTrackItem, PlaylistVideoItem]]
class Favorites(BaseModel):
PLAYLIST: List[str]
ALBUM: List[str]
VIDEO: List[str]
TRACK: List[str]
ARTIST: List[str]
class TrackStream(BaseModel):
trackId: int
assetPresentation: Literal["FULL"]
audioMode: Literal["STEREO"]
audioQuality: TrackQuality
manifestMimeType: Literal[
"application/dash+xml", "application/vnd.tidal.bts"
]
manifestHash: str
manifest: str
albumReplayGain: float
albumPeakAmplitude: float
trackReplayGain: float
trackPeakAmplitude: float
bitDepth: Optional[int] = None
sampleRate: Optional[int] = None
class SearchAlbum(Album):
# TODO: remove the artist field instead of making it None
artist: None = None
class Search(BaseModel):
class Artists(Items):
items: List[Artist]
class Albums(Items):
items: List[SearchAlbum]
class Playlists(Items):
items: List[Playlist]
class Tracks(Items):
items: List[Track]
class Videos(Items):
items: List[Video]
class TopHit(BaseModel):
value: Union[Artist, Track, Playlist, SearchAlbum]
type: Literal["ARTISTS", "TRACKS", "PLAYLISTS", "ALBUMS"]
artists: Artists
albums: Albums
playlists: Playlists
tracks: Tracks
videos: Videos
topHit: TopHit
+6 -5
View File
@@ -1,7 +1,8 @@
from typing import TypedDict, Optional
from typing import Optional
from pydantic import BaseModel
class _User(TypedDict):
class AuthUser(BaseModel):
userId: int
email: str
countryCode: str
@@ -29,8 +30,8 @@ class _User(TypedDict):
newUser: bool
class AuthResponse(TypedDict):
user: _User
class AuthResponse(BaseModel):
user: AuthUser
scope: str
clientName: str
token_type: str
@@ -43,7 +44,7 @@ class AuthResponseWithRefresh(AuthResponse):
refresh_token: str
class AuthDeviceResponse(TypedDict):
class AuthDeviceResponse(BaseModel):
deviceCode: str
userCode: str
verificationUri: str
+13
View File
@@ -0,0 +1,13 @@
from typing import Literal
TrackQuality = Literal["LOW", "HIGH", "LOSSLESS", "HI_RES_LOSSLESS"]
TrackArg = Literal["low", "normal", "high", "master"]
ARG_TO_QUALITY: dict[TrackArg, TrackQuality] = {
"low": "LOW",
"normal": "HIGH",
"high": "LOSSLESS",
"master": "HI_RES_LOSSLESS",
}
QUALITY_TO_ARG = {v: k for k, v in ARG_TO_QUALITY.items()}
+191
View File
@@ -0,0 +1,191 @@
from pydantic import BaseModel
from datetime import datetime
from typing import Optional, List, Literal, Dict
from .constants import TrackQuality
__all__ = ["Track", "Video", "Album", "Playlist", "Artist"]
class Track(BaseModel):
class Artist(BaseModel):
id: int
name: str
type: str
picture: Optional[str] = None
class Album(BaseModel):
id: int
title: str
cover: Optional[str] = None
vibrantColor: Optional[str] = None
videoCover: Optional[str] = None
id: int
title: str
duration: int
replayGain: float
peak: float
allowStreaming: bool
streamReady: bool
adSupportedStreamReady: bool
djReady: bool
stemReady: bool
streamStartDate: Optional[datetime] = None
premiumStreamingOnly: bool
trackNumber: int
volumeNumber: int
version: Optional[str] = None
popularity: int
copyright: str
bpm: Optional[int] = None
url: str
isrc: str
editable: bool
explicit: bool
audioQuality: TrackQuality
audioModes: List[str]
mediaMetadata: Dict[str, List[str]]
# for real, artist can be None?
artist: Optional[Artist] = None
artists: List[Artist]
album: Album
mixes: Dict[str, str]
class Video(BaseModel):
class Arist(BaseModel):
id: int
name: str
type: str
picture: Optional[str] = None
class Album(BaseModel):
id: int
title: str
cover: str
vibrantColor: str
videoCover: Optional[str] = None
id: int
title: str
volumeNumber: int
trackNumber: int
releaseDate: str
imagePath: Optional[str] = None
imageId: str
vibrantColor: str
duration: int
quality: str
streamReady: bool
adSupportedStreamReady: bool
djReady: bool
stemReady: bool
streamStartDate: str
allowStreaming: bool
explicit: bool
popularity: int
type: str
adsUrl: Optional[str] = None
adsPrePaywallOnly: bool
artist: Optional[Arist] = None
artists: List[Arist]
album: Optional[Album] = None
class Album(BaseModel):
class Artist(BaseModel):
id: int
name: str
type: Literal["MAIN", "FEATURED"]
picture: Optional[str] = None
class MediaMetadata(BaseModel):
tags: List[Literal["LOSSLESS", "HIRES_LOSSLESS", "DOLBY_ATMOS"]]
id: int
title: str
duration: int
streamReady: bool
adSupportedStreamReady: bool
djReady: bool
stemReady: bool
streamStartDate: Optional[datetime] = None
allowStreaming: bool
premiumStreamingOnly: bool
numberOfTracks: int
numberOfVideos: int
numberOfVolumes: int
releaseDate: str
copyright: str
type: str
version: Optional[str] = None
url: str
cover: Optional[str] = None
vibrantColor: Optional[str] = None
videoCover: Optional[str] = None
explicit: bool
upc: str
popularity: int
audioQuality: str
audioModes: List[str]
mediaMetadata: MediaMetadata
artist: Artist
artists: List[Artist]
class Playlist(BaseModel):
class Creator(BaseModel):
id: int
uuid: str
title: str
numberOfTracks: int
numberOfVideos: int
creator: Creator | Dict
description: Optional[str] = None
duration: int
lastUpdated: str
created: str
type: str
publicPlaylist: bool
url: str
image: Optional[str] = None
popularity: int
squareImage: str
promotedArtists: List[Album.Artist]
lastItemAddedAt: Optional[str] = None
class Artist(BaseModel):
class Role(BaseModel):
categoryId: int
category: Literal[
"Artist",
"Songwriter",
"Performer",
"Producer",
"Engineer",
"Production team",
"Misc",
]
class Mix(BaseModel):
ARTIST_MIX: str
MASTER_ARTIST_MIX: Optional[str] = None
id: int
name: str
artistTypes: Optional[List[Literal["ARTIST", "CONTRIBUTOR"]]] = None
url: Optional[str] = None
picture: Optional[str] = None
# only in search i guess
selectedAlbumCoverFallback: Optional[str] = None
popularity: Optional[int] = None
artistRoles: Optional[List[Role]] = None
mixes: Optional[Mix | Dict] = None
-112
View File
@@ -1,112 +0,0 @@
import os
import argparse
from .types import TRACK_QUALITY
from .types.track import TrackQuality
def shouldNotColor() -> bool:
# TODO: add more checks ✨
checks = ["NO_COLOR" in os.environ]
return any(checks)
parser = argparse.ArgumentParser(
description="\033[4mTIDDL\033[0m - Tidal Downloader",
epilog="options defaults will be fetched from your config file.",
)
parser.add_argument(
"input",
type=str,
nargs="*",
help="track, album, playlist or artist - must be url, single id will be treated as track",
)
parser.add_argument(
"-o",
type=str,
nargs="?",
const=True,
help="output file template, more info https://github.com/oskvr37/tiddl?tab=readme-ov-file#file-formatting",
dest="file_template",
)
parser.add_argument(
"-p",
type=str,
nargs="?",
const=True,
help="download destination path",
dest="download_path",
)
parser.add_argument(
"-e",
type=str,
nargs="?",
const=True,
help="choose file extension",
dest="file_extension",
)
QUALITY_ARGS: dict[str, TrackQuality] = {
details["arg"]: quality for quality, details in TRACK_QUALITY.items()
}
parser.add_argument(
"-q",
nargs="?",
help="track quality",
dest="quality",
choices=QUALITY_ARGS.keys(),
)
parser.add_argument(
"-is",
help="include artist EPs and singles when downloading artist",
dest="include_singles",
action="store_true",
)
parser.add_argument(
"-s",
help="save options to config // show config file",
dest="save_options",
action="store_true",
)
parser.add_argument(
"-i",
type=str,
nargs="?",
const=True,
help="choose a file with urls (.txt file separated with newlines or .json list)",
dest="input_file",
default="",
)
parser.add_argument(
"--no-skip",
help="dont skip already downloaded tracks",
action="store_true",
)
parser.add_argument(
"--silent",
help="silent mode",
action="store_true",
)
parser.add_argument(
"--verbose",
help="show debug logs",
action="store_true",
)
parser.add_argument(
"--no-color",
help="suppress output colors",
action="store_true",
default=shouldNotColor(),
)
-200
View File
@@ -1,200 +0,0 @@
import unittest
import subprocess
import shutil
from .utils import parseURL, formatFilename
from .types.track import Track
class TestUtils(unittest.TestCase):
def test_parseURL(self):
self.assertEqual(
parseURL("https://tidal.com/browse/track/284165609"), ("track", "284165609")
)
self.assertEqual(
parseURL("https://tidal.com/browse/track/284165609/"),
("track", "284165609"),
)
self.assertEqual(
parseURL("https://tidal.com/browse/track/284165609?u"),
("track", "284165609"),
)
self.assertEqual(
parseURL(
"https://listen.tidal.com/album/284165608/track/284165609",
),
("track", "284165609"),
)
self.assertEqual(
parseURL("https://listen.tidal.com/album/284165608"), ("album", "284165608")
)
self.assertEqual(
parseURL("https://tidal.com/browse/album/284165608"), ("album", "284165608")
)
self.assertEqual(
parseURL("https://tidal.com/browse/album/284165608?u"),
("album", "284165608"),
)
self.assertEqual(
parseURL("https://listen.tidal.com/artist/7695548"), ("artist", "7695548")
)
self.assertEqual(
parseURL("https://tidal.com/browse/artist/7695548"), ("artist", "7695548")
)
self.assertEqual(
parseURL(
"https://tidal.com/browse/playlist/803be625-97e4-4cbb-88dd-43f0b1c61ed7"
),
("playlist", "803be625-97e4-4cbb-88dd-43f0b1c61ed7"),
)
self.assertEqual(
parseURL(
"https://listen.tidal.com/playlist/803be625-97e4-4cbb-88dd-43f0b1c61ed7"
),
("playlist", "803be625-97e4-4cbb-88dd-43f0b1c61ed7"),
)
self.assertEqual(
parseURL(
"https://listen.tidal.com/playlist/803be625-97e4-4cbb-88dd-43f0b1c61ed7"
),
("playlist", "803be625-97e4-4cbb-88dd-43f0b1c61ed7"),
)
self.assertEqual(parseURL("track/284165609"), ("track", "284165609"))
self.assertEqual(
parseURL("playlist/803be625-97e4-4cbb-88dd-43f0b1c61ed7"),
("playlist", "803be625-97e4-4cbb-88dd-43f0b1c61ed7"),
)
# we can also omit domain
self.assertEqual(
parseURL("playlist/803be625-97e4-4cbb-88dd-43f0b1c61ed7"),
("playlist", "803be625-97e4-4cbb-88dd-43f0b1c61ed7"),
)
self.assertRaises(ValueError, parseURL, "")
def test_formatFilename(self):
track: Track = {
"id": 133017101,
"title": "HAUTE COUTURE",
"duration": 243,
"replayGain": -7.7,
"peak": 0.944031,
"allowStreaming": True,
"streamReady": True,
"adSupportedStreamReady": True,
"djReady": True,
"stemReady": False,
"streamStartDate": "2020-03-05T00:00:00.000+0000",
"premiumStreamingOnly": False,
"trackNumber": 1,
"volumeNumber": 1,
"version": None,
"popularity": 29,
"copyright": "2020 TUZZA Globale",
"bpm": None,
"url": "http://www.tidal.com/track/133017101",
"isrc": "PL70D1900060",
"editable": False,
"explicit": False,
"audioQuality": "LOSSLESS",
"audioModes": ["STEREO"],
"mediaMetadata": {"tags": ["LOSSLESS"]},
"artist": {
"id": 9550100,
"name": "Tuzza Globale",
"type": "MAIN",
"picture": "125c9343-3257-407a-8285-5e9f1d283a2e",
},
"artists": [
{
"id": 9550100,
"name": "Tuzza Globale",
"type": "MAIN",
"picture": "125c9343-3257-407a-8285-5e9f1d283a2e",
},
{
"id": 6847736,
"name": "Taco Hemingway",
"type": "FEATURED",
"picture": "7a1f5193-5d96-452c-b8dd-5ff0f81d5335",
},
],
"album": {
"id": 133017100,
"title": "HAUTE COUTURE",
"cover": "efd381c2-a982-4d09-bb15-da872006cadf",
"vibrantColor": "#f6a285",
"videoCover": None,
},
"mixes": {"TRACK_MIX": "001ec78dae0d4a470999adefffd570"},
"playlistNumber": None
}
self.assertEqual(formatFilename("{title}", track), ("", "HAUTE COUTURE"))
self.assertEqual(
formatFilename("{artist} - {title}", track),
("", "Tuzza Globale - HAUTE COUTURE"),
)
self.assertEqual(
formatFilename("{album} - {title}", track),
("", "HAUTE COUTURE - HAUTE COUTURE"),
)
self.assertEqual(
formatFilename("{number}. {title}", track), ("", "1. HAUTE COUTURE")
)
self.assertEqual(
formatFilename("{artists} - {title}", track),
("", "Tuzza Globale, Taco Hemingway - HAUTE COUTURE"),
)
self.assertEqual(
formatFilename("{id}", track),
("", "133017101"),
)
self.assertEqual(
formatFilename("{album}/{title}", track),
("HAUTE COUTURE", "HAUTE COUTURE"),
)
TRACK_ID = "284165609"
DOWNLOAD_DIR = "download_test"
class TestTiddl(unittest.TestCase):
@classmethod
def setUpClass(cls):
try:
shutil.rmtree(DOWNLOAD_DIR)
except FileNotFoundError:
pass
@classmethod
def tearDownClass(cls):
try:
shutil.rmtree(DOWNLOAD_DIR)
except FileNotFoundError:
pass
def test_noInput(self):
result = subprocess.run(["tiddl"])
self.assertEqual(result.returncode, 0)
def test_downloadTrack(self):
result = subprocess.run(["tiddl", TRACK_ID, "-p", DOWNLOAD_DIR])
self.assertEqual(result.returncode, 0)
def test_downloadTrackExists(self):
result = subprocess.run(["tiddl", TRACK_ID, "-p", DOWNLOAD_DIR])
self.assertEqual(result.returncode, 0)
if __name__ == "__main__":
unittest.main()
-24
View File
@@ -1,24 +0,0 @@
from typing import TypedDict, Literal
from .api import *
from .track import *
TrackArg = Literal["low", "normal", "high", "master"]
class QualityDetails(TypedDict):
name: str
details: str
arg: TrackArg
TRACK_QUALITY: dict[TrackQuality, QualityDetails] = {
"LOW": {"name": "Low", "details": "96 kbps", "arg": "low"},
"HIGH": {"name": "Low", "details": "320 kbps", "arg": "normal"},
"LOSSLESS": {"name": "High", "details": "16-bit, 44.1 kHz", "arg": "high"},
"HI_RES_LOSSLESS": {
"name": "Max",
"details": "Up to 24-bit, 192 kHz",
"arg": "master",
},
}
-119
View File
@@ -1,119 +0,0 @@
from typing import TypedDict, Optional, List, Literal
from .track import Track
class ErrorResponse(TypedDict):
status: int
subStatus: int
userMessage: str
class Client(TypedDict):
id: int
name: str
authorizedForOffline: bool
authorizedForOfflineDate: Optional[str]
class SessionResponse(TypedDict):
sessionId: str
userId: int
countryCode: str
channelId: int
partnerId: int
client: Client
class Items(TypedDict):
limit: int
offset: int
totalNumberOfItems: int
class ArtistAlbum(TypedDict):
id: int
name: str
type: Literal["MAIN"]
class Album(TypedDict):
id: int
title: str
duration: int
streamReady: bool
streamStartDate: str
allowStreaming: bool
premiumStreamingOnly: bool
numberOfTracks: int
numberOfVideos: int
numberOfVolumes: int
releaseDate: str
copyright: str
type: str
version: Optional[str]
url: str
cover: str
videoCover: Optional[str]
explicit: bool
upc: str
popularity: int
audioQuality: str
audioModes: List[str]
artist: ArtistAlbum
artists: List[ArtistAlbum]
class AristAlbumsItems(Items):
items: List[Album]
class _AlbumTrack(TypedDict):
item: Track
type: Literal["track"]
class AlbumItems(Items):
items: List[_AlbumTrack]
class _Creator(TypedDict):
id: int
class Playlist(TypedDict):
uuid: str
title: str
numberOfTracks: int
numberOfVideos: int
creator: _Creator
description: str
duration: int
lastUpdated: str
created: str
type: str
publicPlaylist: bool
url: str
image: str
popularity: int
squareImage: str
promotedArtists: List[ArtistAlbum]
lastItemAddedAt: str
class _PlaylistItem(TypedDict):
item: Track
type: Literal["track"]
cut: Literal[None]
class PlaylistItems(Items):
items: List[_PlaylistItem]
class Favorites(TypedDict):
PLAYLIST: List[str]
ALBUM: List[str]
VIDEO: List[str]
TRACK: List[str]
ARTIST: List[str]
-71
View File
@@ -1,71 +0,0 @@
from typing import TypedDict, Optional, List, Dict, Literal, Optional
TrackQuality = Literal["LOW", "HIGH", "LOSSLESS", "HI_RES_LOSSLESS"]
ManifestMimeType = Literal["application/dash+xml", "application/vnd.tidal.bts"]
class TrackStream(TypedDict):
trackId: int
assetPresentation: Literal["FULL"]
audioMode: Literal["STEREO"]
audioQuality: TrackQuality
manifestMimeType: ManifestMimeType
manifestHash: str
manifest: str
albumReplayGain: float
albumPeakAmplitude: float
trackReplayGain: float
trackPeakAmplitude: float
bitDepth: Optional[int]
sampleRate: Optional[int]
class _Artist(TypedDict):
id: int
name: str
type: str
picture: Optional[str]
class _Album(TypedDict):
id: int
title: str
cover: str
vibrantColor: str
videoCover: Optional[str]
class Track(TypedDict):
id: int
title: str
duration: int
replayGain: float
peak: float
allowStreaming: bool
streamReady: bool
adSupportedStreamReady: bool
djReady: bool
stemReady: bool
streamStartDate: str
premiumStreamingOnly: bool
trackNumber: int
volumeNumber: int
version: Optional[str]
popularity: int
copyright: str
bpm: Optional[int]
url: str
isrc: str
editable: bool
explicit: bool
audioQuality: str
audioModes: List[str]
mediaMetadata: Dict[str, List[str]]
artist: _Artist
artists: List[_Artist]
album: _Album
mixes: Dict[str, str]
# this is used only when downloading playlist
playlistNumber: Optional[int]
+93 -296
View File
@@ -1,315 +1,112 @@
import re
import os
import json
import logging
import subprocess
from datetime import datetime
from typing import TypedDict, Literal, List, get_args
from mutagen.flac import FLAC as MutagenFLAC, Picture
from mutagen.easymp4 import EasyMP4 as MutagenEasyMP4
from mutagen.mp4 import MP4Cover, MP4 as MutagenMP4
from pydantic import BaseModel
from urllib.parse import urlparse
from pathlib import Path
from .types.track import Track
from .config import HOME_DIRECTORY
from typing import Literal, get_args
RESOURCE = Literal["track", "album", "artist", "playlist"]
RESOURCE_LIST: List[RESOURCE] = list(get_args(RESOURCE))
from tiddl.models.constants import TrackQuality, QUALITY_TO_ARG
from tiddl.models.resource import Track
ResourceTypeLiteral = Literal["track", "album", "playlist", "artist"]
logger = logging.getLogger("utils")
def parseFileInput(file: str) -> list[str]:
_, file_extension = os.path.splitext(file)
logger.debug(file, file_extension)
urls_set: set[str] = set()
if file_extension == ".txt":
with open(file) as f:
data = f.read()
urls_set.update(data.splitlines())
elif file_extension == ".json":
with open(file) as f:
data = json.load(f)
urls_set.update(data)
else:
logger.warning(f"a file with '{file_extension}' extension is not supported!")
filtered_urls = [url for url in urls_set if type(url) == str]
return filtered_urls
def parseURL(url: str) -> tuple[RESOURCE, str]:
# remove trailing slash
url = url.rstrip("/")
# remove params
url = url.split("?")[0]
fragments = url.split("/")
if len(fragments) < 2:
raise ValueError(f"Invalid input: {url}")
parsed_type, parsed_id = fragments[-2], fragments[-1]
if parsed_type not in RESOURCE_LIST:
raise ValueError(f"Invalid resource type: {parsed_type} ({url})")
return parsed_type, parsed_id
class FormattedTrack(TypedDict):
class TidalResource(BaseModel):
type: ResourceTypeLiteral
id: str
title: str
number: str
disc_number: str
artist: str
album: str
artists: str
playlist: str
released: str
year: str
playlist_number: str
version: str
@property
def url(self) -> str:
return f"https://listen.tidal.com/{self.type}/{self.id}"
@classmethod
def fromString(cls, string: str):
"""
Extracts the resource type (e.g., "track", "album")
and resource ID from a given input string.
The input string can either be a full URL or a shorthand string
in the format `resource_type/resource_id` (e.g., `track/12345678`).
"""
path = urlparse(string).path
resource_type, resource_id = path.split("/")[-2:]
if resource_type not in get_args(ResourceTypeLiteral):
raise ValueError(f"Invalid resource type: {resource_type}")
if not resource_id.isdigit() and resource_type != "playlist":
raise ValueError(f"Invalid resource id: {resource_id}")
return cls(type=resource_type, id=resource_id) # type: ignore
def __str__(self) -> str:
return f"{self.type}/{self.id}"
def formatFilename(template: str, track: Track, playlist=""):
artists = [artist["name"].strip() for artist in track["artists"]]
def sanitizeString(string: str) -> str:
pattern = r'[\\/:"*?<>|]+'
return re.sub(pattern, "", string)
release_date = datetime.strptime(
track["streamStartDate"] or "1970-01-01T00:00:00.000+0000",
"%Y-%m-%dT%H:%M:%S.000+0000",
)
version = track.get("version", "")
def formatTrack(
template: str, track: Track, album_artist="", playlist_title="", playlist_index=0
) -> str:
artist = sanitizeString(track.artist.name) if track.artist else ""
features = [
sanitizeString(track_artist.name)
for track_artist in track.artists
if track_artist.name != artist
]
formatted_track: FormattedTrack = {
"album": re.sub(r'[<>:"|?*/\\]', "_", track["album"]["title"].strip()),
"artist": track["artist"]["name"].strip(),
"artists": ", ".join(artists),
"id": str(track["id"]),
"title": track["title"].strip(),
"number": str(track["trackNumber"]),
"disc_number": str(track["volumeNumber"]),
"playlist": playlist.strip(),
"released": release_date.strftime("%m-%d-%Y"),
"year": release_date.strftime("%Y"),
"playlist_number": str(track.get("playlistNumber", "")),
"version": f"({version})" if version else "",
track_dict = {
"id": str(track.id),
"title": sanitizeString(track.title),
"version": sanitizeString(track.version or ""),
"artist": artist,
"artists": ", ".join(features + [artist]),
"features": ", ".join(features),
"album": sanitizeString(track.album.title),
"number": track.trackNumber,
"disc": track.volumeNumber,
"date": (track.streamStartDate if track.streamStartDate else ""),
# i think we can remove year as we are able to format date
"year": track.streamStartDate.strftime("%Y") if track.streamStartDate else "",
"playlist": sanitizeString(playlist_title),
"bpm": track.bpm or "",
"quality": QUALITY_TO_ARG[track.audioQuality],
"album_artist": sanitizeString(album_artist),
"playlist_number": playlist_index or 0,
}
dirs = template.split("/")
filename = dirs.pop()
formatted_track = template.format(**track_dict)
formatted_filename = filename.format(**formatted_track)
formatted_dir = "/".join(dirs).format(**formatted_track)
disallowed_chars = r'[\\:"*?<>|]+'
invalid_chars = re.findall(disallowed_chars, formatted_track)
return sanitizeDirName(formatted_dir), sanitizeFileName(formatted_filename)
def sanitizeDirName(dir_name: str):
# replace invalid characters with an underscore
sanitized = re.sub(r'[<>:"|?*]', "_", dir_name)
# strip whitespace
sanitized = sanitized.strip()
return sanitized
def sanitizeFileName(file_name: str):
# replace invalid characters with an underscore
sanitized = re.sub(r'[<>:"|?*/\\]', "_", file_name)
# strip whitespace
sanitized = sanitized.strip()
return sanitized
def loadingSymbol(i: int, text: str):
symbols = ["", "", "", "", "", "", "", "", "", ""]
symbol = symbols[i % len(symbols)]
print(f"\r{text} {symbol}", end="\r")
def setMetadata(file: str, extension: str, track: Track, cover_data=b""):
if extension == "flac":
metadata = MutagenFLAC(file)
if cover_data:
picture = Picture()
picture.data = cover_data
picture.mime = "image/jpeg"
metadata.add_picture(picture)
elif extension == "m4a":
if cover_data:
metadata = MutagenMP4(file)
metadata["covr"] = [MP4Cover(cover_data, imageformat=MP4Cover.FORMAT_JPEG)]
metadata.save(file)
metadata = MutagenEasyMP4(file)
else:
raise ValueError(f"Unknown file extension: {extension}")
new_metadata: dict[str, str] = {
"title": track["title"],
"trackNumber": str(track["trackNumber"]),
"discnumber": str(track["volumeNumber"]),
"copyright": track["copyright"],
"albumartist": track["artist"]["name"],
"artist": ";".join([artist["name"].strip() for artist in track["artists"]]),
"album": track["album"]["title"],
"date": track["streamStartDate"][:10],
}
metadata.update(new_metadata)
try:
metadata.save(file)
except Exception as e:
logger.error(f"Failed to set metadata for {extension}: {e}")
def convertFileExtension(source_path: str, file_extension: str, remove_source=True):
source_dir, source_extension = os.path.splitext(source_path)
dest_path = f"{source_dir}.{file_extension}"
logger.debug((source_path, source_dir, source_extension, dest_path))
if source_extension == f".{file_extension}":
return source_path
logger.debug(f"converting `{source_path}` to `{file_extension}`")
command = ["ffmpeg", "-i", source_path, dest_path]
result = subprocess.run(
command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
if result.returncode != 0:
logger.error(result.stderr)
return source_path
if remove_source:
os.remove(source_path)
return dest_path
class Colors:
def __init__(self, colored=True) -> None:
if colored:
self.BLACK = "\033[0;30m"
self.GRAY = "\033[1;30m"
self.RED = "\033[0;31m"
self.LIGHT_RED = "\033[1;31m"
self.GREEN = "\033[0;32m"
self.LIGHT_GREEN = "\033[1;32m"
self.YELLOW = "\033[0;33m"
self.LIGHT_YELLOW = "\033[1;33m"
self.BLUE = "\033[0;34m"
self.LIGHT_BLUE = "\033[1;34m"
self.PURPLE = "\033[0;35m"
self.LIGHT_PURPLE = "\033[1;35m"
self.CYAN = "\033[0;36m"
self.LIGHT_CYAN = "\033[1;36m"
self.LIGHT_GRAY = "\033[0;37m"
self.LIGHT_WHITE = "\033[1;37m"
self.RESET = "\033[0m"
self.BOLD = "\033[1m"
self.FAINT = "\033[2m"
self.ITALIC = "\033[3m"
self.UNDERLINE = "\033[4m"
self.BLINK = "\033[5m"
self.NEGATIVE = "\033[7m"
self.CROSSED = "\033[9m"
else:
self.BLACK = ""
self.GRAY = ""
self.RED = ""
self.LIGHT_RED = ""
self.GREEN = ""
self.LIGHT_GREEN = ""
self.YELLOW = ""
self.LIGHT_YELLOW = ""
self.BLUE = ""
self.LIGHT_BLUE = ""
self.PURPLE = ""
self.LIGHT_PURPLE = ""
self.CYAN = ""
self.LIGHT_CYAN = ""
self.LIGHT_GRAY = ""
self.LIGHT_WHITE = ""
self.RESET = ""
self.BOLD = ""
self.FAINT = ""
self.ITALIC = ""
self.UNDERLINE = ""
self.BLINK = ""
self.NEGATIVE = ""
self.CROSSED = ""
def initLogging(
silent: bool, verbose: bool, directory=HOME_DIRECTORY, colored_logging=True
):
c = Colors(colored_logging)
class StreamFormatter(logging.Formatter):
FORMATS = {
logging.DEBUG: f"{c.BLUE}[ %(name)s ] {c.CYAN}%(funcName)s {c.RESET}%(message)s",
logging.INFO: f"{c.GREEN}[ %(name)s ] {c.RESET}%(message)s",
logging.WARNING: f"{c.YELLOW}[ %(name)s ] {c.RESET}%(message)s",
logging.ERROR: f"{c.RED}[ %(name)s ] %(message)s",
logging.CRITICAL: f"{c.RED}[ %(name)s ] %(message)s",
}
def format(self, record):
log_fmt = self.FORMATS.get(record.levelno)
formatter = logging.Formatter(log_fmt)
return formatter.format(record) + c.RESET
stream_handler = logging.StreamHandler()
file_handler = logging.FileHandler(f"{directory}/tiddl.log", "a", "utf-8")
if silent:
log_level = logging.WARNING
elif verbose:
log_level = logging.DEBUG
else:
log_level = logging.INFO
stream_handler.setLevel(log_level)
stream_handler.setFormatter(StreamFormatter())
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(
logging.Formatter(
"%(asctime)s %(levelname)s\t%(name)s.%(funcName)s %(message)s",
datefmt="%x %X",
if invalid_chars:
raise ValueError(
f"Template '{template}' and formatted track '{formatted_track}' contains disallowed characters: {' '.join(sorted(set(invalid_chars)))}"
)
)
# suppress logs from third-party libraries
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
return formatted_track
logging.basicConfig(
level=logging.DEBUG,
handlers=[file_handler, stream_handler],
)
def trackExists(
track_quality: TrackQuality, download_quality: TrackQuality, file_name: Path
):
"""
Predict track extension and check if track file exists.
"""
FLAC_QUALITIES: list[TrackQuality] = ["LOSSLESS", "HI_RES_LOSSLESS"]
if download_quality in FLAC_QUALITIES and track_quality in FLAC_QUALITIES:
extension = ".flac"
else:
extension = ".m4a"
full_file_name = file_name.with_suffix(extension)
return full_file_name.exists()