Compare commits

...

12 Commits

Author SHA1 Message Date
Oskar Dudziński 98ece44efe deps have versions 2025-11-06 19:42:26 +01:00
Oskar Dudziński a4e5685566 ignore uv.lock 2025-11-06 19:41:53 +01:00
Oskar Dudziński 90194e3337 ignore ruff cache 2025-11-06 19:24:09 +01:00
Oskar Dudziński 02a5837fc0 python 3.13 is now required 2025-11-06 19:07:25 +01:00
Oskar Dudziński 06d3e8f334 migrate to uv 2025-11-06 19:06:48 +01:00
Oskar Dudziński fd533be0cd 3.0 core and cli 2025-11-06 17:58:18 +01:00
Oskar Dudziński 1e96837b67 add example config to docs 2025-11-06 17:57:56 +01:00
Oskar Dudziński c956890f4e update pyproject 2025-11-06 17:57:31 +01:00
Oskar Dudziński 22f759302e remove outdated entries 2025-11-06 17:57:00 +01:00
Oskar Dudziński cb839afcf1 add tests 2025-11-06 17:56:20 +01:00
Oskar Dudziński f1cd54a461 add new examples 2025-11-06 17:56:09 +01:00
Oskar Dudziński 219e9102c2 remove old demo.gif 2025-11-06 17:56:04 +01:00
78 changed files with 3813 additions and 2114 deletions
+17 -31
View File
@@ -1,39 +1,25 @@
# This workflow will upload a Python Package using Twine when a release is created
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python#publishing-to-package-registries
# This workflow uses actions that are not certified by GitHub.
# They are provided by a third-party and are governed by
# separate terms of service, privacy policy, and support
# documentation.
name: Upload Python Package
name: "Publish"
on:
release:
types: [published]
permissions:
contents: read
jobs:
deploy:
run:
runs-on: ubuntu-latest
environment:
name: pypi
permissions:
id-token: write
contents: read
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v3
with:
python-version: '3.x'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install build
- name: Build package
run: python -m build
- name: Publish package
uses: pypa/gh-action-pypi-publish@27b31702a0e7fc50959f5ad993c78deac1bdfc29
with:
user: __token__
password: ${{ secrets.PYPI_API_TOKEN }}
- name: Checkout
uses: actions/checkout@v5
- name: Install uv
uses: astral-sh/setup-uv@v6
- name: Install Python 3.13
run: uv python install 3.13
- name: Build
run: uv build
- name: Publish
run: uv publish
+7 -5
View File
@@ -1,7 +1,3 @@
# TIDDL
tidal_download/
.tiddl_config.json
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
@@ -163,4 +159,10 @@ cython_debug/
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
#.idea/
# Ruff
.ruff_cache
# UV
uv.lock
+141
View File
@@ -0,0 +1,141 @@
# this is `config.toml` file, it is used to configure your tiddl app.
# if you don't create one on your machine, then app will use default settings.
# this file must be saved as `config.toml` at APP_PATH which by default is in your home directory.
# APP_PATH will be created when you install and run `tiddl` for the first time.
# Windows: C:/users/<your_username>/.tiddl
# Linux: ~/.tiddl
# you can set custom APP_PATH by setting environment variable: `TIDDL_PATH`.
# cache API requests, used for improving speed of Tidal endpoints calls, recommended to leave it true.
# most of endpoints are cached for 1 hour, then they are called again.
# database for cached data is located at APP_PATH with filename `api_cache.sqlite`.
# sometimes you can delete the database to purge the cache, when the database file size is too large
# or something just broke.
enable_cache = true
# debug option is used to save the calls of Tidal API endpoints
# to the `api_debug` directory at your APP_PATH.
# they are saved as directories to these endpoints with json data.
debug = false
[templates]
# read more about templates at: TODO add templating docs
# if you don't specify the template for a resource
# then default template will be used.
default = "{album.artist}/{album.title}/{item.title}"
# track = "tracks/{item.id}"
# video = "videos/{item.title}"
# album = "artists/{album.artist}/{album.title}/{item.title}"
# playlist = "{playlist.title}/{playlist.index}. {item.artist} - {item.title}"
# mix = "mixes/{mix_id}/{item.artist} - {item.title}"
[download]
# low - 96 kbps, m4a
# normal - 320 kbps, m4a
# high - 16 bit, 44.1 kHz, flac
# max - up to 24 bit, 192 kHz, flac
track_quality = "high"
# sd - 360p
# hd - 720p
# fhd - 1080p
video_quality = "fhd"
# will skip already downloaded files
skip_existing = true
# how many items will be downloaded at once, recommended to keep it low
threads_count = 4
# base download directory, by default it is set to your home directory / Music / tiddl
# download_path = ""
# if you moved the downloaded files to other directory,
# then you should specify the destination directory there.
# otherwise `tiddl` will not detect them and `skip_existing` will not skip
# already downloaded files. by default scan path is set to your download path.
# scan_path = ""
# this option is used to determine if you want to include downloading singles from an artist.
# "none" download only full albums
# "only" download only singles
# "include" download both singles and full albums
singles_filter = "none"
# "none" to disallow downloading videos (mostly from playlists)
# "only" to download only videos - will get all vids from playlists and from artists.
# "allow" to download tracks and videos
videos_filter = "none"
# update the modification time of an existing file when `skip_existing` is on.
# this option is useful for user to automatically detect old local files
# that have been removed from a Tidal collection.
update_mtime = false
# when enabled, it will write metadata to files that are already downloaded.
# could be useful when data on Tidal has changed.
rewrite_metadata = false
[metadata]
# embed metadata in files
enable = true
# embed lyrics in metadata
embed_lyrics = false
# embed track cover in the track file
cover = false
[cover]
# please don't confuse the cover from metadata with cover as a distinct file.
# save cover to distinct file, default false
save = false
# size of cover, default and max is 1280x1280
size = 1280
# you can allow saving covers for tracks, albums and playlists.
# note that playlists max size is 1080x1080
# (it will be set to proper size automatically)
# by default allowed is set to empty []
allowed = [
# "track",
# "album",
# "playlist"
]
[cover.templates]
# you must set path templates if you want to save cover files.
# you can access: {item}, {album}
# track = "tracks/{item.id}"
# you can access: {album}
# album = "albums/{album.artist} - {album.title}"
# you can access: {playlist}
# playlist = "playlists/{title}"
[m3u]
# m3u is a text file that holds data about playlists.
save = false
# "album", "mix", "playlist"
allowed = ["album", "mix", "playlist"]
[m3u.templates]
# additional template values:
# {type} - album/playlist/mix
album = "m3u/{type}/{album.artist} - {album.title}"
playlist = "m3u/{type}/{playlist.title}"
mix = "m3u/{type}/{now:%x}"
BIN
View File
Binary file not shown.

Before

Width:  |  Height:  |  Size: 120 KiB

+37
View File
@@ -0,0 +1,37 @@
from pathlib import Path
from tiddl.core.utils import get_track_stream_data
from tiddl.core.metadata import add_track_metadata
from tiddl.core.api.models import TrackQuality
# we reuse Tidal API from another example
from .fetch_api import api
# Congratulations by Post Malone
TRACK_ID = 77662595
QUALITY: TrackQuality = "LOSSLESS"
if __name__ == "__main__":
# fetch track_stream
track_stream = api.get_track_stream(TRACK_ID, QUALITY)
# download bytes to stream_data and get the file extension
stream_data, file_extension = get_track_stream_data(track_stream)
filename = f"{TRACK_ID}_{track_stream.audioQuality}"
# get file path that is located at our current directory
# with filename: TRACK_ID_QUALITY.EXTENSION
track_path = Path(filename).with_suffix(file_extension)
# write data from the track_stream to our file
track_path.write_bytes(stream_data)
# fetch some informations about our track like title etc.
track = api.get_track(TRACK_ID)
# add the metadata to our saved file.
# note that not every data is added such as cover or lyrics.
add_track_metadata(track_path, track)
# Congratulations if it works on your machine
+43
View File
@@ -0,0 +1,43 @@
from pathlib import Path
from tiddl.core.metadata import add_video_metadata
from tiddl.core.api.models.base import VideoQuality
from tiddl.core.utils import get_video_stream_data
from tiddl.core.utils.ffmpeg import convert_to_mp4, is_ffmpeg_installed
# we reuse Tidal API from another example
from .fetch_api import api
# Old Town Road by Lil Nas X
VIDEO_ID = 113483426
QUALITY: VideoQuality = "HIGH"
if __name__ == "__main__":
print("fetching video_stream")
video_stream = api.get_video_stream(video_id=VIDEO_ID, quality=QUALITY)
# download bytes to stream_data and get the file extension
print("downloading video_stream data")
stream_data = get_video_stream_data(video_stream)
filename = f"{VIDEO_ID}_{QUALITY}"
# get file path that is located at our current directory
video_path = Path(filename).with_suffix(".ts")
# write data from the video_stream to our file
print(f"saving to {video_path}")
video_path.write_bytes(stream_data)
if is_ffmpeg_installed():
# convert the file from .ts to .mp4
print("converting to mp4")
video_path = convert_to_mp4(video_path)
# fetch some informations about our video like title etc.
print("getting video metadata")
video = api.get_video(VIDEO_ID)
# add the metadata to our saved file.
print("saving metadata")
add_video_metadata(video_path, video)
+47
View File
@@ -0,0 +1,47 @@
from tiddl.core.api import TidalAPI, TidalClient
# we will utilize some functions from tiddl cli
# and use `APP_PATH` that is located at our /home_directory/.tiddl
from tiddl.cli.utils.auth import load_auth_data
from tiddl.cli.const import APP_PATH
# !! remember to be logged in, use `tiddl auth login`
# it will save auth token in /home_directory/.tiddl/auth.json
# in case your token expired, then use `tiddl auth refresh`
# load our token, country code and user id from file
auth_data = load_auth_data()
# we make sure auth_data is not empty = we are logged in
assert auth_data.token
assert auth_data.country_code
assert auth_data.user_id
# we create Client for our API.
# this is custom client that can cache requests
# to make the API more efficient
client = TidalClient(
token=auth_data.token,
cache_name=APP_PATH / "api_cache", # path to cache api requests
debug_path=APP_PATH / "api_debug", # optional, used for debugging api
)
# this is our Tidal API that will call the endpoints
api = TidalAPI(
client,
country_code=auth_data.country_code,
user_id=auth_data.user_id,
)
if __name__ == "__main__":
# make the API call
session = api.get_session()
# every data from the api is `pydantic` model
print(f"session id: {session.sessionId}")
# see every available endpoint at `tiddl.core.api`
+26
View File
@@ -0,0 +1,26 @@
from tiddl.core.utils.format import format_template
# we reuse Tidal API from another example
from .fetch_api import api
ALBUM_ID = 465173294
if __name__ == "__main__":
album = api.get_album(ALBUM_ID)
album_items = api.get_album_items(ALBUM_ID)
TEMPLATE = "{album.artists}/{album.title}, {album.date:%Y}/{item.number:02d}. {item.artists} - {item.title} ({custom_field})"
for album_item in album_items.items:
track = album_item.item
print(
format_template(
template=TEMPLATE,
item=track,
album=album,
with_asterisk_ext=False,
custom_field="custom_field",
)
)
+20 -11
View File
@@ -1,13 +1,13 @@
[build-system]
requires = ["setuptools>=42", "wheel"]
requires = ["setuptools", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "tiddl"
version = "2.8.0"
version = "3.0.0a1"
description = "Download Tidal tracks with CLI downloader."
readme = "README.md"
requires-python = ">=3.11"
requires-python = ">=3.13"
authors = [{ name = "oskvr37" }]
classifiers = [
"Environment :: Console",
@@ -15,18 +15,27 @@ classifiers = [
"Operating System :: OS Independent",
]
dependencies = [
"pydantic>=2.9.2",
"requests>=2.20.0",
"requests-cache>=1.2.1",
"click>=8.1.7",
"mutagen>=1.47.0",
"ffmpeg-asyncio>=0.1.3",
"aiofiles>=25.1.0",
"aiohttp>=3.13.2",
"m3u8>=6.0.0",
"rich>=13.9.4"
"mutagen>=1.47.0",
"pydantic>=2.12.4",
"requests>=2.32.5",
"requests-cache>=1.2.1",
"typer>=0.20.0",
]
[project.urls]
homepage = "https://github.com/oskvr37/tiddl"
[project.scripts]
tiddl = "tiddl.cli:cli"
tiddl = "tiddl.cli.app:app"
[tool.coverage.run]
omit = ["*/models/*", "*/models.py"]
[dependency-groups]
dev = [
"pytest>=8.4.2",
"pytest-mock>=3.15.1",
]
+200
View File
@@ -0,0 +1,200 @@
import pytest
from unittest.mock import patch, MagicMock
from time import time
from typer.testing import CliRunner
from tiddl.core.auth import AuthClientError
from tiddl.cli.commands.auth import auth_command
from tiddl.cli.utils.auth import AuthData
runner = CliRunner()
def test_login_already_logged(monkeypatch: pytest.MonkeyPatch):
"""Should exit early if user is logged in."""
monkeypatch.setattr(
"tiddl.cli.commands.auth.load_auth_data", lambda: AuthData(token="token")
)
result = runner.invoke(auth_command, ["login"])
assert "Already logged in." in result.stdout
assert result.exit_code == 0
def test_login_success(monkeypatch: pytest.MonkeyPatch):
"""Should save user token."""
monkeypatch.setattr(
"tiddl.cli.commands.auth.load_auth_data", lambda: AuthData(token=None)
)
device_auth_mock = MagicMock()
device_auth_mock.verificationUriComplete = "verify.uri"
device_auth_mock.deviceCode = "device123"
device_auth_mock.expiresIn = 60
device_auth_mock.interval = 1
auth_mock = MagicMock()
auth_mock.access_token = "newtoken"
auth_mock.refresh_token = "refreshtoken"
auth_mock.expires_in = 3600
auth_mock.user_id = 123
auth_mock.user.countryCode = "US"
with (
patch("tiddl.cli.commands.auth.AuthAPI") as MockAuthAPI,
patch("tiddl.cli.commands.auth.typer.launch") as mock_launch,
patch("tiddl.cli.commands.auth.save_auth_data") as mock_save,
patch("tiddl.cli.commands.auth.time", side_effect=lambda: 1000),
patch("tiddl.cli.commands.auth.sleep"),
):
auth_api = MockAuthAPI.return_value
auth_api.get_device_auth.return_value = device_auth_mock
auth_api.get_auth.side_effect = [
AuthClientError(error="authorization_pending"),
auth_mock,
]
result = runner.invoke(auth_command, ["login"])
auth_api.get_device_auth.assert_called_once()
auth_api.get_auth.assert_called()
mock_launch.assert_called_once_with("https://verify.uri")
mock_save.assert_called_once()
saved_data = mock_save.call_args[0][0]
assert saved_data.token == "newtoken"
assert "Logged in!" in result.stdout
assert result.exit_code == 0
def test_login_expired(monkeypatch: pytest.MonkeyPatch):
"""Should not save token and exit."""
monkeypatch.setattr(
"tiddl.cli.commands.auth.load_auth_data", lambda: AuthData(token=None)
)
device_auth_mock = MagicMock()
device_auth_mock.verificationUriComplete = "verify.uri"
device_auth_mock.deviceCode = "device123"
device_auth_mock.expiresIn = 60
device_auth_mock.interval = 1
with (
patch("tiddl.cli.commands.auth.AuthAPI") as MockAuthAPI,
patch("tiddl.cli.commands.auth.typer.launch") as mock_launch,
patch("tiddl.cli.commands.auth.save_auth_data") as mock_save,
patch("tiddl.cli.commands.auth.time", side_effect=lambda: 1000),
patch("tiddl.cli.commands.auth.sleep"),
):
auth_api = MockAuthAPI.return_value
auth_api.get_device_auth.return_value = device_auth_mock
auth_api.get_auth.side_effect = [
AuthClientError(error="expired_token"),
]
result = runner.invoke(auth_command, ["login"])
auth_api.get_device_auth.assert_called_once()
auth_api.get_auth.assert_called()
mock_launch.assert_called_once_with("https://verify.uri")
mock_save.assert_not_called()
assert "Time for authentication has expired." in result.stdout
assert result.exit_code == 0
def test_logout_with_token(monkeypatch: pytest.MonkeyPatch):
"""Should clear auth data and logout token in API."""
monkeypatch.setattr(
"tiddl.cli.commands.auth.load_auth_data", lambda: AuthData(token="token")
)
with (
patch("tiddl.cli.commands.auth.AuthAPI") as MockAuthAPI,
patch("tiddl.cli.commands.auth.save_auth_data") as mock_save,
):
mock_api_instance = MockAuthAPI.return_value
result = runner.invoke(auth_command, ["logout"])
mock_api_instance.logout_token.assert_called_once_with("token")
mock_save.assert_called_once_with(AuthData())
assert "Logged out!" in result.stdout
assert result.exit_code == 0
def test_logout_no_token(monkeypatch: pytest.MonkeyPatch):
"""Should only clear auth data."""
monkeypatch.setattr(
"tiddl.cli.commands.auth.load_auth_data", lambda: AuthData(token=None)
)
with (
patch("tiddl.cli.commands.auth.AuthAPI") as MockAuthAPI,
patch("tiddl.cli.commands.auth.save_auth_data") as mock_save,
):
result = runner.invoke(auth_command, ["logout"])
mock_save.assert_called_once_with(AuthData())
MockAuthAPI.assert_not_called()
assert "Logged out!" in result.stdout
assert result.exit_code == 0
def test_refresh_not_logged_in(monkeypatch: pytest.MonkeyPatch):
"""Should exit early if refresh_token is missing."""
monkeypatch.setattr(
"tiddl.cli.commands.auth.load_auth_data", lambda: AuthData(refresh_token=None)
)
result = runner.invoke(auth_command, ["refresh"])
assert "Not logged in." in result.stdout
assert result.exit_code == 0
def test_refresh_not_expired(monkeypatch: pytest.MonkeyPatch):
"""Should exit early if token still valid."""
monkeypatch.setattr(
"tiddl.cli.commands.auth.load_auth_data",
lambda: AuthData(
token="abc", refresh_token="ref", expires_at=int(time()) + 3600
),
)
result = runner.invoke(auth_command, ["refresh"])
assert "Auth token expires in" in result.stdout
assert result.exit_code == 0
def test_refresh_success(monkeypatch: pytest.MonkeyPatch):
"""Should refresh token if expired."""
expired_data = AuthData(
token="oldtoken", refresh_token="refreshtoken", expires_at=0
)
monkeypatch.setattr("tiddl.cli.commands.auth.load_auth_data", lambda: expired_data)
mock_auth_response = MagicMock()
mock_auth_response.access_token = "newtoken"
with (
patch("tiddl.cli.commands.auth.AuthAPI") as MockAuthAPI,
patch("tiddl.cli.commands.auth.save_auth_data") as mock_save,
):
MockAuthAPI.return_value.refresh_token.return_value = mock_auth_response
result = runner.invoke(auth_command, ["refresh"])
mock_save.assert_called_once_with(expired_data)
assert "Auth token has been refreshed!" in result.stdout
assert result.exit_code == 0
@@ -0,0 +1,41 @@
from pathlib import Path
from tiddl.cli.utils.auth.core import load_auth_data, save_auth_data
from tiddl.cli.utils.auth.models import AuthData
def test_load_auth_data(tmp_path: Path):
file = tmp_path / "auth.json"
auth_data = AuthData(
token="token",
refresh_token="refresh_token",
expires_at=0,
user_id="user_id",
country_code="country_code",
)
file.write_text(auth_data.model_dump_json())
loaded_auth_data = load_auth_data(file)
assert isinstance(loaded_auth_data, AuthData)
assert loaded_auth_data.__dict__ == auth_data.__dict__
def test_save_auth_data(tmp_path: Path):
file = tmp_path / "auth.json"
auth_data = AuthData(
token="token",
refresh_token="refresh_token",
expires_at=0,
user_id="user_id",
country_code="country_code",
)
save_auth_data(auth_data=auth_data, file=file)
loaded_auth_data = AuthData.model_validate_json(file.read_text())
assert loaded_auth_data.__dict__ == auth_data.__dict__
+13
View File
@@ -0,0 +1,13 @@
import typer
from tiddl.cli.commands import register_commands, COMMANDS
def test_register_commands_adds_typers():
app = typer.Typer()
register_commands(app)
registered_names = [cmd.name for cmd in app.registered_groups + app.registered_commands]
for command in COMMANDS:
assert command.info.name in registered_names
+63
View File
@@ -0,0 +1,63 @@
from pathlib import Path
from pytest import raises
from tiddl.cli.config import load_config_file, Config, CONFIG_FILENAME
def write_config(tmp_path: Path, content: str) -> Path:
cfg_path = tmp_path / CONFIG_FILENAME
cfg_path.write_text(content)
return cfg_path
def test_missing_file_default_config(tmp_path: Path):
cfg_file = tmp_path / "nonexistent.toml"
cfg = load_config_file(cfg_file)
assert isinstance(cfg, Config)
def test_valid_config_file(tmp_path: Path):
cfg_file = write_config(
tmp_path,
"""
enable_cache = false
debug = true
[download]
track_quality = "max"
threads_count = 8
""",
)
cfg = load_config_file(cfg_file)
assert cfg.enable_cache is False
assert cfg.debug is True
assert cfg.download.track_quality == "max"
assert cfg.download.threads_count == 8
def test_invalid_type_raises(tmp_path: Path):
cfg_file = write_config(
tmp_path,
"""
enable_cache = "not_a_bool"
""",
)
with raises(Exception):
load_config_file(cfg_file)
def test_invalid_track_quality_raises(tmp_path: Path):
cfg_file = write_config(
tmp_path,
"""
[download]
track_quality = "ultra"
""",
)
with raises(Exception):
load_config_file(cfg_file)
+20
View File
@@ -0,0 +1,20 @@
import pytest
from pathlib import Path
from tiddl.cli.const import get_app_path, APP_DIR_NAME, ENV_KEY
def test_env_key_overrides(monkeypatch: pytest.MonkeyPatch, tmp_path: Path):
custom_path = tmp_path / "customdir"
monkeypatch.setenv(ENV_KEY, str(custom_path))
app_path = get_app_path(ENV_KEY)
assert app_path == custom_path
def test_default_path_if_unset(monkeypatch: pytest.MonkeyPatch):
monkeypatch.delenv(ENV_KEY, raising=False)
app_path = get_app_path(ENV_KEY)
assert str(Path.home()) in str(app_path)
assert app_path.name == APP_DIR_NAME
+68
View File
@@ -0,0 +1,68 @@
import pytest
from tiddl.cli.utils.resource import TidalResource, ResourceTypeLiteral
valid_test_data = [
("track", "12345"),
("album", "98765"),
("video", "11111"),
("artist", "22222"),
("playlist", "abcde"),
("mix", "xyz123"),
]
@pytest.mark.parametrize("resource_type, resource_id", valid_test_data)
def test_tidalresource_from_string_shorthand(
resource_type: ResourceTypeLiteral, resource_id: str
):
string = f"{resource_type}/{resource_id}"
res = TidalResource.from_string(string)
assert res.type == resource_type
assert res.id == resource_id
assert str(res) == string
assert res.url == f"https://listen.tidal.com/{resource_type}/{resource_id}"
@pytest.mark.parametrize("resource_type, resource_id", valid_test_data)
def test_tidalresource_from_string_url(
resource_type: ResourceTypeLiteral, resource_id: str
):
url = f"https://listen.tidal.com/{resource_type}/{resource_id}"
res = TidalResource.from_string(url)
assert res.type == resource_type
assert res.id == resource_id
assert str(res) == f"{resource_type}/{resource_id}"
assert res.url == url
def test_from_string_invalid_type():
with pytest.raises(ValueError, match="Invalid resource type"):
TidalResource.from_string("invalid/123")
invalid_test_data = [
("track", "abc"),
("album", "xyz"),
("video", "id123"),
("artist", "user1"),
]
@pytest.mark.parametrize("resource_type, invalid_id", invalid_test_data)
def test_from_string_invalid_digit_id(
resource_type: ResourceTypeLiteral, invalid_id: str
):
with pytest.raises(ValueError, match="Invalid resource id"):
TidalResource.from_string(f"{resource_type}/{invalid_id}")
def test_url_property():
res = TidalResource(type="track", id="12345")
assert res.url == "https://listen.tidal.com/track/12345"
def test_str_method():
res = TidalResource(type="album", id="67890")
assert str(res) == "album/67890"
+206
View File
@@ -0,0 +1,206 @@
import pytest
from pytest_mock import MockerFixture, MockType
from tiddl.core.api.api import (
TidalAPI,
TidalClient,
Limits,
DO_NOT_CACHE,
EXPIRE_IMMEDIATELY,
)
from tiddl.core.api.models import (
Album,
Artist,
Playlist,
Track,
Video,
AlbumItems,
AlbumItemsCredits,
ArtistAlbumsItems,
Favorites,
TrackLyrics,
PlaylistItems,
MixItems,
Search,
SessionResponse,
TrackStream,
VideoStream,
)
def test_tidal_api_init(mocker: MockerFixture):
mock_client = mocker.Mock(spec=TidalClient)
api = TidalAPI(client=mock_client, user_id="u123", country_code="US")
assert api.client is mock_client
assert api.user_id == "u123"
assert api.country_code == "US"
@pytest.fixture
def mock_client(mocker: MockerFixture):
return mocker.Mock(spec=TidalClient)
@pytest.fixture
def api(mock_client: MockType):
return TidalAPI(client=mock_client, user_id="u123", country_code="US")
def test_get_album(api: TidalAPI, mock_client: MockType):
api.get_album(album_id=1)
mock_client.fetch.assert_called_once_with(
Album, "albums/1", {"countryCode": "US"}, expire_after=3600
)
def test_get_album_items(api: TidalAPI, mock_client: MockType):
api.get_album_items(1)
mock_client.fetch.assert_called_once_with(
AlbumItems,
"albums/1/items",
{"countryCode": "US", "limit": Limits.ALBUM_ITEMS, "offset": 0},
expire_after=3600,
)
def test_get_album_items_credits(api: TidalAPI, mock_client: MockType):
api.get_album_items_credits(1)
mock_client.fetch.assert_called_once_with(
AlbumItemsCredits,
"albums/1/items/credits",
{"countryCode": "US", "limit": Limits.ALBUM_ITEMS, "offset": 0},
expire_after=3600,
)
def test_get_artist(api: TidalAPI, mock_client: MockType):
api.get_artist(1)
mock_client.fetch.assert_called_once_with(
Artist, "artists/1", {"countryCode": "US"}, expire_after=3600
)
def test_get_artist_albums(api: TidalAPI, mock_client: MockType):
api.get_artist_albums(1)
mock_client.fetch.assert_called_once_with(
ArtistAlbumsItems,
"artists/1/albums",
{
"countryCode": "US",
"limit": Limits.ARTIST_ALBUMS,
"offset": 0,
"filter": "ALBUMS",
},
expire_after=3600,
)
def test_get_mix(api: TidalAPI, mock_client: MockType):
api.get_mix_items("abcd-1234")
mock_client.fetch.assert_called_once_with(
MixItems,
"mixes/abcd-1234/items",
{"countryCode": "US", "limit": Limits.MIX_ITEMS, "offset": 0},
expire_after=3600,
)
def test_get_favorites(api: TidalAPI, mock_client: MockType):
api.get_favorites()
mock_client.fetch.assert_called_once_with(
Favorites,
"users/u123/favorites/ids",
{"countryCode": "US"},
expire_after=EXPIRE_IMMEDIATELY,
)
def test_get_playlist(api: TidalAPI, mock_client: MockType):
api.get_playlist("uuid")
mock_client.fetch.assert_called_once_with(
Playlist,
"playlists/uuid",
{"countryCode": "US"},
expire_after=EXPIRE_IMMEDIATELY,
)
def test_get_playlist_items(api: TidalAPI, mock_client: MockType):
api.get_playlist_items("uuid")
mock_client.fetch.assert_called_once_with(
PlaylistItems,
"playlists/uuid/items",
{"countryCode": "US", "limit": Limits.PLAYLIST_ITEMS, "offset": 0},
expire_after=EXPIRE_IMMEDIATELY,
)
def test_get_search(api: TidalAPI, mock_client: MockType):
api.get_search("query")
mock_client.fetch.assert_called_once_with(
Search,
"search",
{"countryCode": "US", "query": "query"},
expire_after=DO_NOT_CACHE,
)
def test_get_session(api: TidalAPI, mock_client: MockType):
api.get_session()
mock_client.fetch.assert_called_once_with(
SessionResponse, "sessions", expire_after=DO_NOT_CACHE
)
def test_get_track_lyrics(api: TidalAPI, mock_client: MockType):
api.get_track_lyrics(1)
mock_client.fetch.assert_called_once_with(
TrackLyrics,
"tracks/1/lyrics",
{"countryCode": "US"},
expire_after=3600,
)
def test_get_track(api: TidalAPI, mock_client: MockType):
api.get_track(1)
mock_client.fetch.assert_called_once_with(
Track,
"tracks/1",
{"countryCode": "US"},
expire_after=3600,
)
def test_get_track_stream(api: TidalAPI, mock_client: MockType):
api.get_track_stream(1, "HIGH")
mock_client.fetch.assert_called_once_with(
TrackStream,
"tracks/1/playbackinfopostpaywall",
{"audioquality": "HIGH", "playbackmode": "STREAM", "assetpresentation": "FULL"},
expire_after=DO_NOT_CACHE,
)
def test_get_video(api: TidalAPI, mock_client: MockType):
api.get_video(1)
mock_client.fetch.assert_called_once_with(
Video,
"videos/1",
{"countryCode": "US"},
expire_after=3600,
)
def test_get_video_stream(api: TidalAPI, mock_client: MockType):
api.get_video_stream(1, "HIGH")
mock_client.fetch.assert_called_once_with(
VideoStream,
"videos/1/playbackinfopostpaywall",
{"videoquality": "HIGH", "playbackmode": "STREAM", "assetpresentation": "FULL"},
expire_after=DO_NOT_CACHE,
)
+93
View File
@@ -0,0 +1,93 @@
import pytest
import json
from pydantic import BaseModel
from pytest_mock import MockerFixture
from pathlib import Path
from tiddl.core.api.client import TidalClient, ApiError
def test_tidal_client_init(mocker: MockerFixture):
mock_cached_session = mocker.patch("tiddl.core.api.client.CachedSession")
mock_session = mock_cached_session.return_value
client = TidalClient(
token="test-token",
cache_name="test_cache",
omit_cache=True,
debug_path=Path("/tmp/debug"),
)
mock_cached_session.assert_called_once_with(
cache_name="test_cache", always_revalidate=True
)
assert client.token == "test-token"
assert client.debug_path == Path("/tmp/debug")
assert client.session is mock_session
assert mock_session.headers["Authorization"] == "Bearer test-token"
assert mock_session.headers["Accept"] == "application/json"
@pytest.mark.parametrize("omit_cache", [True, False])
def test_omit_cache_flag(mocker: MockerFixture, omit_cache: bool):
mock_cached_session = mocker.patch("tiddl.core.api.client.CachedSession")
TidalClient("token", "cache", omit_cache=omit_cache)
mock_cached_session.assert_called_once_with(
cache_name="cache", always_revalidate=omit_cache
)
class DummyModel(BaseModel):
foo: str
def test_fetch_success(mocker: MockerFixture, tmp_path: Path):
mock_session = mocker.Mock()
mock_response = mocker.Mock()
mock_response.status_code = 200
mock_response.from_cache = False
mock_response.json.return_value = {"foo": "bar"}
mock_session.get.return_value = mock_response
mocker.patch("tiddl.core.api.client.API_URL", "https://api.test")
client = TidalClient("token", tmp_path / "cache", debug_path=tmp_path)
client.session = mock_session
result = client.fetch(DummyModel, "albums/123", {"limit": 10}, expire_after=999)
assert result.foo == "bar"
mock_session.get.assert_called_once_with(
"https://api.test/albums/123",
params={"limit": 10},
expire_after=999,
)
debug_file = tmp_path / "albums/123.json"
assert debug_file.exists()
content = json.loads(debug_file.read_text())
assert content["status_code"] == 200
assert content["endpoint"] == "albums/123"
assert content["params"]["limit"] == 10
assert content["data"]["foo"] == "bar"
def test_fetch_error_raises_api_error(mocker: MockerFixture, tmp_path: Path):
mock_session = mocker.Mock()
mock_response = mocker.Mock()
mock_response.status_code = 400
mock_response.from_cache = False
mock_response.json.return_value = {
"status": 400,
"subStatus": "Bad request",
"userMessage": "user_message",
}
mock_session.get.return_value = mock_response
client = TidalClient("token", tmp_path / "cache")
client.session = mock_session
with pytest.raises(ApiError):
client.fetch(DummyModel, "bad/endpoint")
+38
View File
@@ -0,0 +1,38 @@
import pytest
from typing import Any
from tiddl.core.api.exceptions import ApiError
def test_api_error_attributes():
data: dict[str, Any] = {
"status": 1,
"subStatus": "sub_status",
"userMessage": "user_message",
}
e = ApiError(**data)
assert isinstance(e, Exception)
assert e.status == data["status"]
assert e.sub_status == data["subStatus"]
assert e.user_message == data["userMessage"]
def test_api_error_raises():
with pytest.raises(ApiError) as exc:
raise ApiError(400, "bad_request", "invalid")
assert exc.value.status == 400
assert exc.value.sub_status == "bad_request"
def test_api_error_string():
data: dict[str, Any] = {
"status": 1,
"subStatus": "sub_status",
"userMessage": "user_message",
}
e = ApiError(**data)
assert str(e) == f"{e.user_message}, {e.status}/{e.sub_status}"
+105
View File
@@ -0,0 +1,105 @@
from typing import Any
import pytest
from pytest_mock import MockerFixture
from tiddl.core.auth.api import AuthAPI
from tiddl.core.auth.models import (
AuthDeviceResponse,
AuthResponseWithRefresh,
AuthResponse,
)
@pytest.fixture
def mock_auth_client(mocker: MockerFixture) -> Any:
client = mocker.Mock()
client.get_device_auth.return_value = {
"deviceCode": "abc",
"userCode": "123",
"verificationUri": "https://verify",
"verificationUriComplete": "https://verify?code=123",
"expiresIn": 300,
"interval": 5,
}
user_data: dict[str, Any] = {
"userId": 1,
"email": "test@example.com",
"countryCode": "US",
"fullName": None,
"firstName": None,
"lastName": None,
"nickname": None,
"username": "tester",
"address": None,
"city": None,
"postalcode": None,
"usState": None,
"phoneNumber": None,
"birthday": None,
"channelId": 0,
"parentId": 0,
"acceptedEULA": True,
"created": 0,
"updated": 0,
"facebookUid": 0,
"appleUid": None,
"googleUid": None,
"accountLinkCreated": True,
"emailVerified": True,
"newUser": True,
}
auth_base: dict[str, Any] = {
"access_token": "token123",
"refresh_token": "refresh123",
"expires_in": 3600,
"user_id": 1,
"scope": "r_usr",
"clientName": "tidal",
"token_type": "Bearer",
"user": user_data,
}
client.get_auth.return_value = auth_base.copy()
client.refresh_token.return_value = auth_base.copy()
client.logout_token.return_value = None
return client
def test_get_device_auth_returns_model(mock_auth_client: Any) -> None:
api: AuthAPI = AuthAPI(client=mock_auth_client)
result: AuthDeviceResponse = api.get_device_auth()
mock_auth_client.get_device_auth.assert_called_once()
assert isinstance(result, AuthDeviceResponse)
assert result.deviceCode == "abc"
assert result.interval == 5
def test_get_auth_returns_model(mock_auth_client: Any) -> None:
api: AuthAPI = AuthAPI(client=mock_auth_client)
result: AuthResponseWithRefresh = api.get_auth("device123")
mock_auth_client.get_auth.assert_called_once_with("device123")
assert isinstance(result, AuthResponseWithRefresh)
assert result.access_token == "token123"
assert result.refresh_token == "refresh123"
assert result.user.userId == 1
def test_refresh_token_returns_model(mock_auth_client: Any) -> None:
api: AuthAPI = AuthAPI(client=mock_auth_client)
result: AuthResponse = api.refresh_token("refresh123")
mock_auth_client.refresh_token.assert_called_once_with("refresh123")
assert isinstance(result, AuthResponse)
assert result.access_token == "token123"
def test_logout_token_calls_client(mock_auth_client: Any) -> None:
api: AuthAPI = AuthAPI(client=mock_auth_client)
api.logout_token("token123")
mock_auth_client.logout_token.assert_called_once_with("token123")
+128
View File
@@ -0,0 +1,128 @@
import pytest
from pytest_mock import MockerFixture
from tiddl.core.auth.client import AuthClient
from tiddl.core.auth.exceptions import AuthClientError
def test_get_device_auth_calls_request(mocker: MockerFixture):
mock_request = mocker.patch("tiddl.core.auth.client.request")
data = {"device_code": "abc"}
mock_response = mocker.Mock()
mock_response.json.return_value = data
mock_request.return_value = mock_response
client = AuthClient()
result = client.get_device_auth()
mock_request.assert_called_once_with(
"POST",
"https://auth.tidal.com/v1/oauth2/device_authorization",
data={"client_id": client.client_id, "scope": "r_usr+w_usr+w_sub"},
)
assert result == data
def test_get_auth_returns_json_on_200(mocker: MockerFixture):
mock_request = mocker.patch("tiddl.core.auth.client.request")
mock_response = mocker.Mock()
mock_response.status_code = 200
mock_response.json.return_value = {
"access_token": "token123",
"refresh_token": "refresh123",
"expires_in": 3600,
}
mock_request.return_value = mock_response
client = AuthClient()
result = client.get_auth("device123")
assert result["access_token"] == "token123"
assert result["refresh_token"] == "refresh123"
assert result["expires_in"] == 3600
mock_request.assert_called_once_with(
"POST",
"https://auth.tidal.com/v1/oauth2/token",
data={
"client_id": client.client_id,
"device_code": "device123",
"grant_type": "urn:ietf:params:oauth:grant-type:device_code",
"scope": "r_usr+w_usr+w_sub",
},
auth=(client.client_id, client.client_secret),
)
def test_get_auth_raises_on_non_200(mocker: MockerFixture):
mock_request = mocker.patch("tiddl.core.auth.client.request")
mock_response = mocker.Mock()
mock_response.status_code = 400
mock_response.json.return_value = {
"error": "error",
"status": 400,
"sub_status": 1001,
"error_description": "invalid",
}
mock_request.return_value = mock_response
client = AuthClient()
with pytest.raises(AuthClientError):
client.get_auth("device123")
mock_request.assert_called_once_with(
"POST",
"https://auth.tidal.com/v1/oauth2/token",
data={
"client_id": client.client_id,
"device_code": "device123",
"grant_type": "urn:ietf:params:oauth:grant-type:device_code",
"scope": "r_usr+w_usr+w_sub",
},
auth=(client.client_id, client.client_secret),
)
def test_refresh_token(mocker: MockerFixture):
mock_request = mocker.patch("tiddl.core.auth.client.request")
mock_response = mocker.Mock()
mock_response.status_code = 400
mock_response.json.return_value = {
"token": "abc",
}
mock_request.return_value = mock_response
refresh_token = "token"
client = AuthClient()
result = client.refresh_token(refresh_token)
mock_request.assert_called_once_with(
"POST",
"https://auth.tidal.com/v1/oauth2/token",
data={
"client_id": client.client_id,
"refresh_token": refresh_token,
"grant_type": "refresh_token",
"scope": "r_usr+w_usr+w_sub",
},
auth=(client.client_id, client.client_secret),
)
assert result["token"] == "abc"
def test_logout_token(mocker: MockerFixture):
mock_request = mocker.patch("tiddl.core.auth.client.request")
client = AuthClient()
client.logout_token("token")
mock_request.assert_called_once_with(
"POST",
"https://api.tidal.com/v1/logout",
headers={"authorization": "Bearer token"},
)
+41
View File
@@ -0,0 +1,41 @@
import pytest
from typing import Any
from tiddl.core.auth.exceptions import AuthClientError
def test_auth_client_error_attributes():
data: dict[str, Any] = {
"status": 1,
"error": "error",
"sub_status": "sub_status",
"error_description": "error_description",
}
e = AuthClientError(**data)
assert isinstance(e, Exception)
assert e.status == data["status"]
assert e.error == data["error"]
assert e.sub_status == data["sub_status"]
assert e.error_description == data["error_description"]
def test_auth_client_error_raises():
with pytest.raises(AuthClientError) as exc:
raise AuthClientError(400, "bad_request", "invalid", "Malformed input")
assert exc.value.status == 400
assert exc.value.error == "bad_request"
def test_auth_client_error_string():
data: dict[str, Any] = {
"status": 1,
"error": "error",
"sub_status": "sub_status",
"error_description": "error_description",
}
e = AuthClientError(**data)
assert str(e) == f"{e.error}, {e.error_description}, {e.status}/{e.sub_status}"
-291
View File
@@ -1,291 +0,0 @@
import json
import logging
from pathlib import Path
from typing import Any, Literal, Type, TypeVar
from pydantic import BaseModel
from requests_cache import (
CachedSession,
EXPIRE_IMMEDIATELY,
NEVER_EXPIRE,
DO_NOT_CACHE,
)
from tiddl.models.api import (
Album,
AlbumItems,
AlbumItemsCredits,
Artist,
ArtistAlbumsItems,
ArtistVideosItems,
Favorites,
Playlist,
PlaylistItems,
Search,
SessionResponse,
Track,
TrackStream,
Video,
VideoStream,
Lyrics,
MixItems,
)
from tiddl.models.constants import TrackQuality
from tiddl.exceptions import ApiError
from tiddl.config import HOME_PATH
DEBUG = False
T = TypeVar("T", bound=BaseModel)
logger = logging.getLogger(__name__)
def ensureLimit(limit: int, max_limit: int) -> int:
if limit > max_limit:
logger.warning(f"Max limit is {max_limit}")
return max_limit
return limit
class Limits:
ARTIST_ALBUMS = 50
ARTIST_VIDEOS = 50
ALBUM_ITEMS = 10
ALBUM_ITEMS_MAX = 100
PLAYLIST = 50
MIX_ITEMS = 100
class TidalApi:
URL = "https://api.tidal.com/v1"
LIMITS = Limits
def __init__(
self, token: str, user_id: str, country_code: str, omit_cache=False
) -> None:
self.user_id = user_id
self.country_code = country_code
# 3.0 TODO: change cache path
CACHE_NAME = "tiddl_api_cache"
self.session = CachedSession(
cache_name=HOME_PATH / CACHE_NAME, always_revalidate=omit_cache
)
self.session.headers = {
"Authorization": f"Bearer {token}",
"Accept": "application/json",
}
def fetch(
self,
model: Type[T],
endpoint: str,
params: dict[str, Any] = {},
expire_after=NEVER_EXPIRE,
) -> T:
"""Fetch data from the API and parse it into the given Pydantic model."""
req = self.session.get(
f"{self.URL}/{endpoint}", params=params, expire_after=expire_after
)
logger.debug(
(
endpoint,
params,
req.status_code,
"HIT" if req.from_cache else "MISS",
)
)
data = req.json()
if DEBUG:
debug_data = {
"status_code": req.status_code,
"endpoint": endpoint,
"params": params,
"data": data,
}
path = Path(f"debug_data/{endpoint}.json")
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as f:
json.dump(debug_data, f, indent=2)
if req.status_code != 200:
raise ApiError(**data)
return model.model_validate(data)
def getAlbum(self, album_id: str | int):
return self.fetch(
Album, f"albums/{album_id}", {"countryCode": self.country_code}
)
def getAlbumItems(self, album_id: str | int, limit=LIMITS.ALBUM_ITEMS, offset=0):
return self.fetch(
AlbumItems,
f"albums/{album_id}/items",
{
"countryCode": self.country_code,
"limit": ensureLimit(limit, self.LIMITS.ALBUM_ITEMS_MAX),
"offset": offset,
},
)
def getAlbumItemsCredits(
self, album_id: str | int, limit=LIMITS.ALBUM_ITEMS, offset=0
):
return self.fetch(
AlbumItemsCredits,
f"albums/{album_id}/items/credits",
{
"countryCode": self.country_code,
"limit": ensureLimit(limit, self.LIMITS.ALBUM_ITEMS_MAX),
"offset": offset,
},
)
def getArtist(self, artist_id: str | int):
return self.fetch(
Artist,
f"artists/{artist_id}",
{"countryCode": self.country_code},
expire_after=3600,
)
def getArtistAlbums(
self,
artist_id: str | int,
limit=LIMITS.ARTIST_ALBUMS,
offset=0,
filter: Literal["ALBUMS", "EPSANDSINGLES"] = "ALBUMS",
):
return self.fetch(
ArtistAlbumsItems,
f"artists/{artist_id}/albums",
{
"countryCode": self.country_code,
"limit": limit, # tested limit 10,000
"offset": offset,
"filter": filter,
},
expire_after=3600,
)
def getArtistVideos(
self,
artist_id: str | int,
limit: int = LIMITS.ARTIST_VIDEOS,
offset: int = 0,
):
return self.fetch(
ArtistVideosItems,
f"artists/{artist_id}/videos",
{
"countryCode": self.country_code,
"limit": limit,
"offset": offset,
},
expire_after=3600,
)
def getMix(
self,
mix_id: str | int,
limit=LIMITS.MIX_ITEMS,
offset=0,
):
return self.fetch(
MixItems,
f"mixes/{mix_id}/items",
{
"countryCode": self.country_code,
"limit": limit,
"offset": offset,
},
expire_after=3600,
)
def getFavorites(self):
return self.fetch(
Favorites,
f"users/{self.user_id}/favorites/ids",
{"countryCode": self.country_code},
expire_after=EXPIRE_IMMEDIATELY,
)
def getPlaylist(self, playlist_uuid: str):
return self.fetch(
Playlist,
f"playlists/{playlist_uuid}",
{"countryCode": self.country_code},
)
def getPlaylistItems(self, playlist_uuid: str, limit=LIMITS.PLAYLIST, offset=0):
return self.fetch(
PlaylistItems,
f"playlists/{playlist_uuid}/items",
{
"countryCode": self.country_code,
"limit": limit,
"offset": offset,
},
expire_after=EXPIRE_IMMEDIATELY,
)
def getSearch(self, query: str):
return self.fetch(
Search,
"search",
{"countryCode": self.country_code, "query": query},
expire_after=EXPIRE_IMMEDIATELY,
)
def getSession(self):
return self.fetch(SessionResponse, "sessions", expire_after=DO_NOT_CACHE)
def getLyrics(self, track_id: str | int):
return self.fetch(
Lyrics, f"tracks/{track_id}/lyrics", {"countryCode": self.country_code}
)
def getTrack(self, track_id: str | int):
return self.fetch(
Track, f"tracks/{track_id}", {"countryCode": self.country_code}
)
def getTrackStream(self, track_id: str | int, quality: TrackQuality):
return self.fetch(
TrackStream,
f"tracks/{track_id}/playbackinfo",
{
"audioquality": quality,
"playbackmode": "STREAM",
"assetpresentation": "FULL",
},
expire_after=DO_NOT_CACHE,
)
def getVideo(self, video_id: str | int):
return self.fetch(
Video, f"videos/{video_id}", {"countryCode": self.country_code}
)
def getVideoStream(self, video_id: str | int):
return self.fetch(
VideoStream,
f"videos/{video_id}/playbackinfo",
{
"videoquality": "HIGH",
"playbackmode": "STREAM",
"assetpresentation": "FULL",
},
expire_after=DO_NOT_CACHE,
)
-101
View File
@@ -1,101 +0,0 @@
import logging
import base64
from os import environ
from requests import request
from tiddl.exceptions import AuthError
from tiddl.models import auth
AUTH_URL = "https://auth.tidal.com/v1/oauth2"
def get_auth_credentials() -> tuple[str, str]:
ENV_KEY = "TIDDL_AUTH"
client_id, client_secret = (
base64.b64decode(
"ZlgySnhkbW50WldLMGl4VDsxTm45QWZEQWp4cmdKRkpiS05XTGVBeUtHVkdtSU51WFBQTEhWWEF2eEFnPQ=="
)
.decode()
.split(";")
)
env_value = environ.get(ENV_KEY, None)
if env_value:
client_id, client_secret = env_value.split(";")
return client_id, client_secret
CLIENT_ID, CLIENT_SECRET = get_auth_credentials()
logger = logging.getLogger(__name__)
def getDeviceAuth():
req = request(
"POST",
f"{AUTH_URL}/device_authorization",
data={"client_id": CLIENT_ID, "scope": "r_usr+w_usr+w_sub"},
)
data = req.json()
if req.status_code == 200:
return auth.AuthDeviceResponse(**data)
raise AuthError(**data)
def getToken(device_code: str):
req = request(
"POST",
f"{AUTH_URL}/token",
data={
"client_id": CLIENT_ID,
"device_code": device_code,
"grant_type": "urn:ietf:params:oauth:grant-type:device_code",
"scope": "r_usr+w_usr+w_sub",
},
auth=(CLIENT_ID, CLIENT_SECRET),
)
data = req.json()
if req.status_code == 200:
return auth.AuthResponseWithRefresh(**data)
raise AuthError(**data)
def refreshToken(refresh_token: str):
req = request(
"POST",
f"{AUTH_URL}/token",
data={
"client_id": CLIENT_ID,
"refresh_token": refresh_token,
"grant_type": "refresh_token",
"scope": "r_usr+w_usr+w_sub",
},
auth=(CLIENT_ID, CLIENT_SECRET),
)
data = req.json()
if req.status_code == 200:
return auth.AuthResponse(**data)
raise AuthError(**data)
def removeToken(access_token: str):
req = request(
"POST",
"https://api.tidal.com/v1/logout",
headers={"authorization": f"Bearer {access_token}"},
)
logger.debug((req.status_code, req.text))
+10 -68
View File
@@ -1,73 +1,15 @@
import click
import logging
from rich.logging import RichHandler
from tiddl.cli.const import APP_PATH
from tiddl.config import HOME_PATH
from tiddl.cli.ctx import ContextObj, passContext, Context
from tiddl.cli.auth import AuthGroup
from tiddl.cli.download import UrlGroup, FavGroup, SearchGroup, FileGroup
from tiddl.cli.config import ConfigCommand
from tiddl.cli.auth import refresh
@click.group()
@passContext
@click.option("--verbose", "-v", is_flag=True, help="Show debug logs.")
@click.option("--quiet", "-q", is_flag=True, help="Suppress logs.")
@click.option(
"--no-cache", "-nc", is_flag=True, help="Omit Tidal API requests caching."
file_handler = logging.FileHandler(APP_PATH / "latest.log", encoding="utf-8", mode="w")
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(
logging.Formatter(
"%(asctime)s %(levelname)s\t[%(name)s.%(funcName)s] %(message)s"
)
)
def cli(ctx: Context, verbose: bool, quiet: bool, no_cache: bool):
"""TIDDL - Tidal Downloader \u266b"""
ctx.obj = ContextObj()
# latest logs
file_handler = logging.FileHandler(
HOME_PATH / "tiddl.log", mode="w", encoding="utf-8"
)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(
logging.Formatter(
"%(levelname)s [%(name)s.%(funcName)s] %(message)s", datefmt="[%X]"
)
)
LEVEL = logging.DEBUG if verbose else logging.ERROR if quiet else logging.INFO
rich_handler = RichHandler(console=ctx.obj.console, rich_tracebacks=True)
rich_handler.setLevel(LEVEL)
if LEVEL == logging.DEBUG:
rich_handler.setFormatter(
logging.Formatter("[%(name)s.%(funcName)s] %(message)s", datefmt="[%X]")
)
logging.basicConfig(
level=logging.DEBUG,
handlers=[
rich_handler,
file_handler,
],
format="%(message)s",
datefmt="[%X]",
)
logging.getLogger("urllib3").setLevel(logging.ERROR)
if ctx.invoked_subcommand in ("fav", "file", "search", "url"):
ctx.invoke(refresh)
ctx.obj.initApi(omit_cache=no_cache)
cli.add_command(ConfigCommand)
cli.add_command(AuthGroup)
cli.add_command(UrlGroup)
cli.add_command(FavGroup)
cli.add_command(SearchGroup)
cli.add_command(FileGroup)
if __name__ == "__main__":
cli()
log = logging.getLogger("tiddl")
log.setLevel(logging.DEBUG)
log.addHandler(file_handler)
+33
View File
@@ -0,0 +1,33 @@
import typer
import logging
from rich.console import Console
from tiddl.cli.config import APP_PATH
from tiddl.cli.ctx import ContextObject, Context
from tiddl.cli.commands import register_commands
log = logging.getLogger("tiddl")
app = typer.Typer(name="tiddl", no_args_is_help=True, rich_markup_mode="rich")
register_commands(app)
@app.callback()
def callback(ctx: Context, omit_cache: bool = False, debug: bool = False):
"""
tiddl - download tidal tracks \u266b
[link=https://github.com/oskvr37/tiddl]github[/link]
[link=https://buymeacoffee.com/oskvr][yellow]buy me a coffee[/link] \u2764
"""
log.debug(f"{ctx.params=}")
if debug:
debug_path = APP_PATH / "api_debug"
else:
debug_path = None
ctx.obj = ContextObject(
api_omit_cache=omit_cache, console=Console(), debug_path=debug_path
)
-115
View File
@@ -1,115 +0,0 @@
import click
import logging
from time import sleep, time
from tiddl.config import AuthConfig
from tiddl.auth import (
getDeviceAuth,
getToken,
refreshToken,
removeToken,
AuthError,
)
from tiddl.cli.ctx import passContext, Context
logger = logging.getLogger(__name__)
@click.group("auth")
def AuthGroup():
"""Manage Tidal token."""
@AuthGroup.command("refresh")
@passContext
def refresh(ctx: Context):
"""Refresh auth token when is expired"""
logger.debug("Invoked refresh command")
auth = ctx.obj.config.auth
if auth.refresh_token and time() > auth.expires:
logger.info("Refreshing token...")
token = refreshToken(auth.refresh_token)
ctx.obj.config.auth.expires = token.expires_in + int(time())
ctx.obj.config.auth.token = token.access_token
ctx.obj.config.save()
logger.info("Refreshed auth token!")
@AuthGroup.command("login")
@passContext
def login(ctx: Context):
"""Add token to the config"""
logger.debug("Invoked login command")
if ctx.obj.config.auth.token:
logger.info("Already logged in.")
ctx.invoke(refresh)
return
auth = getDeviceAuth()
uri = f"https://{auth.verificationUriComplete}"
click.launch(uri)
logger.info(f"Go to {uri} and complete authentication!")
auth_end_at = time() + auth.expiresIn
while True:
sleep(auth.interval)
try:
token = getToken(auth.deviceCode)
except AuthError as e:
if e.error == "authorization_pending":
time_left = auth_end_at - time()
minutes, seconds = time_left // 60, int(time_left % 60)
click.echo(f"\rTime left: {minutes:.0f}:{seconds:02d}", nl=False)
continue
if e.error == "expired_token":
logger.info("\nTime for authentication has expired.")
break
ctx.obj.config.auth = AuthConfig(
token=token.access_token,
refresh_token=token.refresh_token,
expires=token.expires_in + int(time()),
user_id=str(token.user.userId),
country_code=token.user.countryCode,
)
ctx.obj.config.save()
logger.info("\nAuthenticated!")
break
@AuthGroup.command("logout")
@passContext
def logout(ctx: Context):
"""Remove token from config"""
logger.debug("Invoked logout command")
access_token = ctx.obj.config.auth.token
if not access_token:
logger.info("Not logged in.")
return
removeToken(access_token)
ctx.obj.config.auth = AuthConfig()
ctx.obj.config.save()
logger.info("Logged out!")
+16
View File
@@ -0,0 +1,16 @@
from typer import Typer
from .auth import auth_command
from .download import download_command
# from .export import export_command
COMMANDS = [
auth_command,
download_command,
# export_command
]
def register_commands(app: Typer):
for command in COMMANDS:
app.add_typer(command, name=command.info.name)
+108
View File
@@ -0,0 +1,108 @@
import typer
from datetime import datetime
from time import time, sleep
from rich.console import Console
from tiddl.cli.utils.auth.core import load_auth_data, save_auth_data, AuthData
from tiddl.core.auth import AuthAPI, AuthClientError
console = Console()
auth_command = typer.Typer(
name="auth", help="Manage Tidal authentication.", no_args_is_help=True
)
@auth_command.command(help="Login with your Tidal account.")
def login():
loaded_auth_data = load_auth_data()
if loaded_auth_data.token:
console.print("[cyan bold]Already logged in.")
raise typer.Exit()
auth_api = AuthAPI()
device_auth = auth_api.get_device_auth()
uri = f"https://{device_auth.verificationUriComplete}"
typer.launch(uri)
console.print(f"Go to '{uri}' and complete authentication!")
auth_end_at = time() + device_auth.expiresIn
status_text = "Authenticating..."
with console.status(status_text) as status:
while True:
sleep(device_auth.interval)
try:
auth = auth_api.get_auth(device_auth.deviceCode)
auth_data = AuthData(
token=auth.access_token,
refresh_token=auth.refresh_token,
expires_at=auth.expires_in + int(time()),
user_id=str(auth.user_id),
country_code=auth.user.countryCode,
)
save_auth_data(auth_data)
status.console.print("[bold green]Logged in!")
break
except AuthClientError as e:
if e.error == "authorization_pending":
time_left = auth_end_at - time()
minutes, seconds = time_left // 60, int(time_left % 60)
status.update(
f"{status_text} time left: {minutes:.0f}:{seconds:02d}"
)
continue
if e.error == "expired_token":
status.console.print(
"\n[bold red]Time for authentication has expired."
)
break
@auth_command.command(help="Logout and remove token from app.")
def logout():
loaded_auth_data = load_auth_data()
if loaded_auth_data.token:
auth_api = AuthAPI()
auth_api.logout_token(loaded_auth_data.token)
save_auth_data(AuthData())
console.print("[bold green]Logged out!")
@auth_command.command(help="Refreshes your token in app.")
def refresh():
loaded_auth_data = load_auth_data()
if loaded_auth_data.refresh_token is None:
console.print("[bold red]Not logged in.")
raise typer.Exit()
if time() < loaded_auth_data.expires_at:
expiry_time = datetime.fromtimestamp(loaded_auth_data.expires_at)
remaining = expiry_time - datetime.now()
hours, remainder = divmod(remaining.seconds, 3600)
minutes, _ = divmod(remainder, 60)
console.print(
f"[green]Auth token expires in {remaining.days}d {hours}h {minutes}m"
)
return
auth_api = AuthAPI()
auth_data = auth_api.refresh_token(loaded_auth_data.refresh_token)
loaded_auth_data.token = auth_data.access_token
loaded_auth_data.expires_at = auth_data.expires_in + int(time())
save_auth_data(loaded_auth_data)
console.print("[bold green]Auth token has been refreshed!")
+502
View File
@@ -0,0 +1,502 @@
import os
import typer
import asyncio
from pathlib import Path
from logging import getLogger
from rich.live import Live
from typing_extensions import Annotated
from tiddl.core.metadata import add_track_metadata, add_video_metadata, Cover
from tiddl.core.api.models import Album, Track, Video, AlbumItemsCredits
from tiddl.core.utils.format import format_template
from tiddl.core.utils.m3u import save_tracks_to_m3u
from tiddl.cli.config import (
CONFIG,
TRACK_QUALITY_LITERAL,
VIDEO_QUALITY_LITERAL,
ARTIST_SINGLES_FILTER_LITERAL,
VALID_M3U_RESOURCE_LITERAL,
VIDEOS_FILTER_LITERAL,
)
from tiddl.cli.utils.resource import TidalResource
from tiddl.cli.ctx import Context
from tiddl.cli.commands.auth import refresh
from tiddl.cli.commands.subcommands import url_subcommand
from .downloader import Downloader
from .output import RichOutput
download_command = typer.Typer(name="download")
download_command.add_typer(url_subcommand)
log = getLogger(__name__)
@download_command.callback(no_args_is_help=True)
def download_callback(
ctx: Context,
TRACK_QUALITY: Annotated[
TRACK_QUALITY_LITERAL,
typer.Option(
"--track-quality",
"-q",
),
] = CONFIG.download.track_quality,
VIDEO_QUALITY: Annotated[
VIDEO_QUALITY_LITERAL,
typer.Option(
"--video-quality",
"-vq",
),
] = CONFIG.download.video_quality,
SKIP_EXISTING: Annotated[
bool,
typer.Option(
"--no-skip",
"-ns",
help="Don't skip downloading existing files.",
),
] = not CONFIG.download.skip_existing,
REWRITE_METADATA: Annotated[
bool,
typer.Option(
"--rewrite-metadata",
"-r",
help="Rewrite metadata for already downloaded tracks.",
),
] = CONFIG.download.rewrite_metadata,
THREADS_COUNT: Annotated[
int,
typer.Option(
"--threads-count",
"-t",
help="Number of concurrent download threads.",
min=1,
),
] = CONFIG.download.threads_count,
DOWNLOAD_PATH: Annotated[
Path,
typer.Option(
"--path",
"-p",
help="Base directory path for all downloads.",
),
] = CONFIG.download.download_path,
SCAN_PATH: Annotated[
Path,
typer.Option(
"--scan-path",
"--sp",
help="Directory to search for your existing downloads.",
),
] = CONFIG.download.scan_path,
TEMPLATE: Annotated[
str,
typer.Option(
"--output",
"-o",
help="Format output file template.",
),
] = "",
SINGLES_FILTER: Annotated[
ARTIST_SINGLES_FILTER_LITERAL,
typer.Option(
"--singles",
"-s",
help="Filter for including artists' singles, used while downloading artist.",
),
] = CONFIG.download.singles_filter,
VIDEOS_FILTER: Annotated[
VIDEOS_FILTER_LITERAL,
typer.Option(
"--videos",
"-vid",
help="Videos handling: 'none' to exclude, 'allow' to include, 'only' to download videos only.",
),
] = CONFIG.download.videos_filter,
):
"""
Download Tidal resources.
"""
ctx.invoke(refresh)
log.debug(f"{ctx.params=}")
def save_m3u(
resource_type: VALID_M3U_RESOURCE_LITERAL,
filename: str,
tracks_with_path: list[tuple[Path, Track]],
):
if not CONFIG.m3u.save:
return
if resource_type not in CONFIG.m3u.allowed:
return
tracks_with_existing_paths = [
(path, track)
for (path, track) in tracks_with_path
if path and isinstance(track, Track)
]
log.debug(f"{resource_type=}, {filename=}, {len(tracks_with_existing_paths)=}")
save_tracks_to_m3u(
tracks_with_path=tracks_with_existing_paths, path=DOWNLOAD_PATH / filename
)
async def download_resources():
rich_output = RichOutput(ctx.obj.console)
downloader = Downloader(
tidal_api=ctx.obj.api,
threads_count=THREADS_COUNT,
rich_output=rich_output,
track_quality=TRACK_QUALITY,
video_quality=VIDEO_QUALITY,
videos_filter=VIDEOS_FILTER,
skip_existing=not SKIP_EXISTING,
download_path=DOWNLOAD_PATH,
scan_path=SCAN_PATH,
)
class Metadata:
def __init__(
self,
date: str = "",
artist: str = "",
credits: list[AlbumItemsCredits.ItemWithCredits.CreditsEntry] = [],
cover_data: bytes | None = None,
) -> None:
self.date = date
self.artist = artist
self.credits = credits
self.cover_data = cover_data
async def handle_resource(resource: TidalResource):
async def handle_item(
item: Track | Video,
file_path: str,
track_metadata: Metadata = Metadata(),
) -> tuple[Path | None, Track | Video]:
log.debug(f"{item.id=}, {file_path=}")
rich_output.total_increment()
download_path, was_downloaded = await downloader.download(
item=item, file_path=Path(file_path)
)
log.debug(f"{download_path=}, {was_downloaded=}")
if (
CONFIG.metadata.enable
and download_path
# rewrite metadata when track was skipped due to already existing
and (REWRITE_METADATA or was_downloaded)
):
if isinstance(item, Track):
lyrics_subtitles = ""
if CONFIG.metadata.lyrics:
try:
lyrics_subtitles = ctx.obj.api.get_track_lyrics(
item.id
).subtitles
except Exception as e:
log.error(e)
cover_data = track_metadata.cover_data
if not cover_data and item.album.cover:
cover_data = Cover(item.album.cover).data
add_track_metadata(
path=download_path,
track=item,
lyrics=lyrics_subtitles,
album_artist=track_metadata.artist,
cover_data=cover_data,
date=track_metadata.date,
credits=track_metadata.credits,
)
elif isinstance(item, Video):
add_video_metadata(path=download_path, video=item)
if download_path and CONFIG.download.update_mtime:
try:
os.utime(download_path, None)
except Exception:
log.warning(f"could not update mtime for {download_path}")
return download_path, item
async def download_album(album: Album):
offset = 0
futures = []
cover: Cover | None = None
save_cover = ("album" in CONFIG.cover.allowed) and CONFIG.cover.save
if album.cover and (CONFIG.metadata.cover or save_cover):
cover = Cover(album.cover, size=CONFIG.cover.size)
while True:
album_items = ctx.obj.api.get_album_items_credits(
album_id=album.id, offset=offset
)
for album_item in album_items.items:
futures.append(
handle_item(
item=album_item.item,
file_path=format_template(
template=TEMPLATE or CONFIG.templates.album,
item=album_item.item,
album=album,
),
track_metadata=Metadata(
cover_data=cover.data if cover else None,
date=str(album.releaseDate),
artist=album.artist.name if album.artist else "",
credits=album_item.credits,
),
)
)
offset += album_items.limit
if offset >= album_items.totalNumberOfItems:
break
tracks_with_path = await asyncio.gather(*futures)
save_m3u(
resource_type="album",
filename=format_template(
CONFIG.m3u.templates.album,
album=album,
type="album",
),
tracks_with_path=tracks_with_path,
)
if save_cover and cover:
cover.save_to_directory(
path=DOWNLOAD_PATH
/ format_template(
template=CONFIG.cover.templates.album, album=album
)
)
# resources should be collected from a distinct function
# that would yield the resources.
# then we would be able to reuse the logic in the export command
match resource.type:
case "track":
track = ctx.obj.api.get_track(resource.id)
album = ctx.obj.api.get_album(track.album.id)
await handle_item(
item=track,
file_path=format_template(
template=TEMPLATE or CONFIG.templates.track,
item=track,
album=album,
),
)
if (
CONFIG.cover.save
and ("track" in CONFIG.cover.allowed)
and track.album.cover
):
Cover(
track.album.cover, size=CONFIG.cover.size
).save_to_directory(
path=DOWNLOAD_PATH
/ format_template(
CONFIG.cover.templates.track, item=track, album=album
)
)
case "video":
video = ctx.obj.api.get_video(resource.id)
await handle_item(
item=video,
file_path=format_template(
template=TEMPLATE or CONFIG.templates.video,
item=video,
),
)
case "mix":
offset = 0
futures = []
while True:
mix_items = ctx.obj.api.get_mix_items(resource.id, offset=0)
for mix_item in mix_items.items:
futures.append(
handle_item(
item=mix_item.item,
file_path=format_template(
template=TEMPLATE or CONFIG.templates.mix,
item=mix_item.item,
mix_id=resource.id,
),
)
)
offset += mix_items.limit
if offset >= mix_items.totalNumberOfItems:
break
tracks_with_path = await asyncio.gather(*futures)
save_m3u(
resource_type="mix",
filename=format_template(
CONFIG.m3u.templates.mix,
type="mix",
),
tracks_with_path=tracks_with_path,
)
case "album":
album = ctx.obj.api.get_album(album_id=resource.id)
await download_album(album)
case "artist":
futures = []
def get_all_albums(singles: bool):
offset = 0
while True:
artist_albums = ctx.obj.api.get_artist_albums(
artist_id=resource.id,
offset=offset,
filter="EPSANDSINGLES" if singles else "ALBUMS",
)
for album in artist_albums.items:
futures.append(download_album(album))
offset += artist_albums.limit
if offset >= artist_albums.totalNumberOfItems:
break
def get_all_videos():
offset = 0
while True:
artist_videos = ctx.obj.api.get_artist_videos(
resource.id, offset=offset
)
for video in artist_videos.items:
futures.append(
handle_item(
item=video,
file_path=format_template(
template=TEMPLATE or CONFIG.templates.video,
item=video,
),
)
)
if offset > artist_videos.totalNumberOfItems:
break
offset += artist_videos.limit
if VIDEOS_FILTER != "none":
get_all_videos()
if VIDEOS_FILTER != "only":
if SINGLES_FILTER == "include":
get_all_albums(False)
get_all_albums(True)
else:
get_all_albums(SINGLES_FILTER == "only")
await asyncio.gather(*futures)
case "playlist":
offset = 0
futures = []
playlist_index = 0
playlist = ctx.obj.api.get_playlist(playlist_uuid=resource.id)
while True:
playlist_items = ctx.obj.api.get_playlist_items(
playlist_uuid=resource.id, offset=offset
)
for playlist_item in playlist_items.items:
playlist_index += 1
futures.append(
handle_item(
item=playlist_item.item,
file_path=format_template(
template=TEMPLATE or CONFIG.templates.playlist,
item=playlist_item.item,
playlist=playlist,
playlist_index=playlist_index,
),
)
)
offset += playlist_items.limit
if offset >= playlist_items.totalNumberOfItems:
break
tracks_with_path = await asyncio.gather(*futures)
save_m3u(
resource_type="playlist",
filename=format_template(
CONFIG.m3u.templates.playlist,
playlist=playlist,
type="playlist",
),
tracks_with_path=tracks_with_path,
)
if (
CONFIG.cover.save
and ("playlist" in CONFIG.cover.allowed)
and playlist.squareImage
):
Cover(
playlist.squareImage, size=max(CONFIG.cover.size, 1080)
).save_to_directory(
path=DOWNLOAD_PATH
/ format_template(
template=CONFIG.cover.templates.playlist,
playlist=playlist,
)
)
with Live(
rich_output.group,
refresh_per_second=10,
console=ctx.obj.console,
transient=True,
):
await asyncio.gather(*(handle_resource(r) for r in ctx.obj.resources))
rich_output.show_stats()
def run():
asyncio.run(download_resources())
ctx.call_on_close(run)
+195
View File
@@ -0,0 +1,195 @@
import asyncio
import aiohttp
import aiofiles
from logging import getLogger
from pathlib import Path
from tiddl.core.api.models import TrackQuality, VideoQuality, Track, Video
from tiddl.core.api import TidalAPI
from tiddl.core.utils import parse_track_stream, parse_video_stream
from tiddl.core.utils.ffmpeg import convert_to_mp4, extract_flac
from tiddl.cli.config import (
TRACK_QUALITY_LITERAL,
VIDEO_QUALITY_LITERAL,
VIDEOS_FILTER_LITERAL,
)
from tiddl.cli.utils.download import get_existing_track_filename
from .output import RichOutput
log = getLogger(__name__)
CHUNK_SIZE = 1024**2
track_qualities: dict[TRACK_QUALITY_LITERAL, TrackQuality] = {
"low": "LOW",
"normal": "HIGH",
"high": "LOSSLESS",
"max": "HI_RES_LOSSLESS",
}
track_qualities_color: dict[TrackQuality, str] = {
"LOW": "[gray]96 kbps",
"HIGH": "[gray]320 kbps",
"LOSSLESS": "[cyan]",
"HI_RES_LOSSLESS": "[yellow]",
}
video_qualities: dict[VIDEO_QUALITY_LITERAL, VideoQuality] = {
"sd": "LOW",
"hd": "MEDIUM",
"fhd": "HIGH",
}
video_qualities_color: dict[VideoQuality, str] = {
"LOW": "[gray]360p",
"MEDIUM": "[cyan]720p",
"HIGH": "[yellow]1080p",
}
class Downloader:
api: TidalAPI
rich_output: RichOutput
semaphore: asyncio.Semaphore
track_quality: TrackQuality
video_quality: VideoQuality
videos_filter: VIDEOS_FILTER_LITERAL
skip_existing: bool
download_path: Path
scan_path: Path
def __init__(
self,
tidal_api: TidalAPI,
threads_count: int,
rich_output: RichOutput,
track_quality: TRACK_QUALITY_LITERAL,
video_quality: VIDEO_QUALITY_LITERAL,
videos_filter: VIDEOS_FILTER_LITERAL,
skip_existing: bool,
download_path: Path,
scan_path: Path,
) -> None:
self.api = tidal_api
self.rich_output = rich_output
self.semaphore = asyncio.Semaphore(threads_count)
self.track_quality = track_qualities[track_quality]
self.video_quality = video_qualities[video_quality]
self.videos_filter = videos_filter
self.skip_existing = skip_existing
self.download_path = download_path
self.scan_path = scan_path
async def download(
self, item: Track | Video, file_path: Path
) -> tuple[Path | None, bool]:
"""
returns
- Path `item_path` path of existing/downloaded item
- bool `was_downloaded`
"""
if not item.allowStreaming:
self.rich_output.console.print(
f"[red]Can't stream[/] {item.title} ({item.id})"
)
return None, False
if isinstance(item, Track):
filename = get_existing_track_filename(
item.audioQuality, self.track_quality, file_path
)
vibrant_color = item.album.vibrantColor
elif isinstance(item, Video):
filename = file_path.with_suffix(".mp4")
vibrant_color = item.vibrantColor
vibrant_color = vibrant_color or "gray"
existing_file_path = self.scan_path / filename
log.debug(f"{file_path=}, {filename=}, {existing_file_path=}")
result_message = "[green]Downloaded"
if existing_file_path.exists():
result_message = "[cyan]Overwrited"
if self.skip_existing:
self.rich_output.console.print(
f"[yellow]Exists [{vibrant_color}][link={existing_file_path.as_uri()}]{item.title}[/link]"
)
return existing_file_path, False
elif (isinstance(item, Video) and self.videos_filter == "none") or (
isinstance(item, Track) and self.videos_filter == "only"
):
log.info(f"skipping {item.id} due to {self.videos_filter=}")
return None, False
should_extract_flac = False
async with self.semaphore:
if isinstance(item, Track):
stream = self.api.get_track_stream(
track_id=item.id, quality=self.track_quality
)
urls, _ = parse_track_stream(stream)
download_path = self.download_path / filename
quality = track_qualities_color[stream.audioQuality]
if stream.audioQuality in ["HI_RES_LOSSLESS", "LOSSLESS"]:
quality = f"{quality} {stream.bitDepth}-bit, {(stream.sampleRate or 0) / 1000:.1f} kHz"
if stream.audioQuality == "HI_RES_LOSSLESS":
should_extract_flac = True
elif isinstance(item, Video):
stream = self.api.get_video_stream(
video_id=item.id, quality=self.video_quality
)
urls, ext = parse_video_stream(stream), ".ts"
download_path = (self.download_path / filename).with_suffix(ext)
quality = video_qualities_color[stream.videoQuality]
task_id = self.rich_output.download_start(
f"[{vibrant_color}]{item.title} {quality}"
)
download_path.parent.mkdir(exist_ok=True, parents=True)
# TODO shouldnt session be reused instead of
# creating new one on every download?
async with aiohttp.ClientSession() as session:
async with aiofiles.open(download_path, "wb") as f:
for url in urls:
async with session.get(url) as resp:
async for chunk in resp.content.iter_chunked(CHUNK_SIZE):
await f.write(chunk)
self.rich_output.download_advance(
task_id, size=len(chunk)
)
try:
if isinstance(item, Track) and should_extract_flac:
download_path = extract_flac(download_path)
elif isinstance(item, Video):
download_path = convert_to_mp4(download_path)
except Exception as exc:
log.error(f"{should_extract_flac=}, {exc=}")
self.rich_output.download_finish(
task_id=task_id,
item_link=download_path.as_uri(),
result_message=result_message,
)
return download_path, True
+92
View File
@@ -0,0 +1,92 @@
from rich.console import Console, Group
from rich.progress import (
Progress,
TransferSpeedColumn,
SpinnerColumn,
FileSizeColumn,
MofNCompleteColumn,
ProgressColumn,
BarColumn,
Task,
TaskID,
)
from rich.text import Text
from rich.panel import Panel
class TimeElapsedColumn(ProgressColumn):
"""Renders time elapsed."""
def render(self, task: Task) -> Text:
"""Show time elapsed."""
elapsed = task.finished_time if task.finished else task.elapsed
if elapsed is None:
return Text("---", style="progress.elapsed")
return Text(f"{elapsed:.2f}s", style="progress.elapsed")
class RichOutput:
def __init__(self, console: Console, download_height: int | None = None) -> None:
self.console = console
self.download_progress = Progress(
SpinnerColumn(),
"{task.description}",
FileSizeColumn(),
TransferSpeedColumn(),
console=self.console,
)
self.total_progress = Progress(
TimeElapsedColumn(),
BarColumn(bar_width=None),
MofNCompleteColumn(),
console=self.console,
)
self.group = Group(
Panel(
self.download_progress,
title="Downloading",
border_style="magenta",
title_align="left",
height=download_height + 2 if download_height else None,
),
Panel(
self.total_progress,
title="Total Progress",
border_style="green",
title_align="left",
),
)
self.total_task = self.total_progress.add_task("Total", total=0, start=True)
self.total_downloads = 0
def total_increment(self, count: float = 1):
task = self.total_progress._tasks.get(self.total_task)
assert task is not None
assert task.total is not None
self.total_progress.update(self.total_task, total=task.total + count)
def download_start(self, description: str) -> TaskID:
return self.download_progress.add_task(description=description, total=None)
def download_advance(self, task_id: TaskID, size: float):
self.download_progress.update(task_id=task_id, advance=size, refresh=True)
def download_finish(self, task_id: TaskID, item_link: str, result_message: str):
task = self.download_progress._tasks.get(task_id)
assert task is not None
self.download_progress.remove_task(task_id=task_id)
self.total_progress.advance(self.total_task, advance=1)
self.console.print(
f"{result_message} [link={item_link}]{task.description}[/link]"
)
self.total_downloads += 1
def show_stats(self):
self.console.print(f"[green]Total downloads: {self.total_downloads}")
+40
View File
@@ -0,0 +1,40 @@
import typer
from logging import getLogger
from rich.console import Console
# from typing_extensions import Annotated
from tiddl.cli.ctx import Context
from tiddl.cli.commands.subcommands import url_subcommand
from tiddl.cli.commands.auth import refresh
export_command = typer.Typer(name="export")
export_command.add_typer(url_subcommand)
log = getLogger(__name__)
console = Console()
@export_command.callback(no_args_is_help=True)
def export_callback(ctx: Context):
"""
Export Tidal data.
You can export the data to json file
or pipe it to another process.
"""
ctx.invoke(refresh)
# TODO implement export functionality
# exported structure
# [{resource_type: str, resource_id: str|int, album: {...}, album_items: {...}}]
# export to single files like id.json
# or export all in one
def handle_export():
console.print(ctx.obj.resources)
ctx.call_on_close(handle_export)
@@ -0,0 +1,11 @@
from typer import Typer
from .url import url_subcommand
SUBCOMMANDS: list[Typer] = [url_subcommand]
def register_subcommands(app: Typer):
for sub_command in SUBCOMMANDS:
app.add_typer(sub_command)
+29
View File
@@ -0,0 +1,29 @@
import typer
from typing_extensions import Annotated
from tiddl.cli.ctx import Context
from tiddl.cli.utils.resource import TidalResource
url_subcommand = typer.Typer()
@url_subcommand.command(
no_args_is_help=True,
)
def url(
ctx: Context,
urls: Annotated[
list[TidalResource], typer.Argument(parser=TidalResource.from_string)
],
):
"""
Get Tidal URLs.
It can be Tidal link or `resource_type/resource_id` format
e.g. track/12345, album/67890.
Available resource types: track, video, album, playlist, artist, mix.
"""
ctx.obj.resources.extend(urls)
+105 -45
View File
@@ -1,54 +1,114 @@
import click
from logging import getLogger
from pathlib import Path
from pydantic import BaseModel
from tomllib import loads as parse_toml
from typing import Literal
from tiddl.config import CONFIG_PATH
from tiddl.cli.ctx import Context, passContext
from tiddl.cli.const import APP_PATH
CONFIG_FILENAME = "config.toml"
TRACK_QUALITY_LITERAL = Literal["low", "normal", "high", "max"]
VIDEO_QUALITY_LITERAL = Literal["sd", "hd", "fhd"]
ARTIST_SINGLES_FILTER_LITERAL = Literal["none", "only", "include"]
VALID_M3U_RESOURCE_LITERAL = Literal["album", "playlist", "mix"]
VALID_RESOURCE_COVER_SAVE_LITERAL = Literal["track", "album", "playlist"]
VIDEOS_FILTER_LITERAL = Literal["none", "only", "allow"]
log = getLogger(__name__)
@click.command("config")
@click.option(
"--open",
"-o",
"OPEN_CONFIG",
is_flag=True,
help="Open the configuration file with the default editor.",
)
@click.option(
"--locate",
"-l",
"LOCATE_CONFIG",
is_flag=True,
help="Launch a file manager with the located configuration file.",
)
@click.option(
"--print",
"-p",
"PRINT_CONFIG",
is_flag=True,
help="Show current configuration.",
)
@passContext
def ConfigCommand(
ctx: Context, OPEN_CONFIG: bool, LOCATE_CONFIG: bool, PRINT_CONFIG: bool
):
"""
Configuration file options.
class Config(BaseModel):
enable_cache: bool = True
debug: bool = False
By default it prints location of tiddl config file.
class MetadataConfig(BaseModel):
enable: bool = True
lyrics: bool = False
cover: bool = False
This command can be used in variable like `vim $(tiddl config)`
- this will open your config with vim editor.
"""
metadata: MetadataConfig = MetadataConfig()
if OPEN_CONFIG:
click.launch(str(CONFIG_PATH))
class CoverConfig(BaseModel):
save: bool = False
size: int = 1280
allowed: list[VALID_RESOURCE_COVER_SAVE_LITERAL] = []
elif LOCATE_CONFIG:
click.launch(str(CONFIG_PATH), locate=True)
class CoverTemplatesConfig(BaseModel):
track: str = ""
album: str = ""
playlist: str = ""
elif PRINT_CONFIG:
config_without_auth = ctx.obj.config.model_copy()
del config_without_auth.auth
ctx.obj.console.print(config_without_auth.model_dump_json(indent=2))
templates: CoverTemplatesConfig = CoverTemplatesConfig()
else:
click.echo(str(CONFIG_PATH))
cover: CoverConfig = CoverConfig()
class DownloadConfig(BaseModel):
track_quality: TRACK_QUALITY_LITERAL = "high"
video_quality: VIDEO_QUALITY_LITERAL = "fhd"
skip_existing: bool = True
threads_count: int = 4
download_path: Path = Path.home() / "Music" / "tiddl"
scan_path: Path = download_path
singles_filter: ARTIST_SINGLES_FILTER_LITERAL = "none"
videos_filter: VIDEOS_FILTER_LITERAL = "none"
update_mtime: bool = False
rewrite_metadata: bool = False
def model_post_init(self, __context):
# convert to absolute, expand ~, normalize
self.download_path = self.download_path.expanduser().resolve()
self.scan_path = self.scan_path.expanduser().resolve()
download: DownloadConfig = DownloadConfig()
class M3UConfig(BaseModel):
# m3u playlists
save: bool = False
allowed: list[VALID_M3U_RESOURCE_LITERAL] = []
class M3UTemplatesConfig(BaseModel):
album: str = ""
playlist: str = ""
mix: str = ""
templates: M3UTemplatesConfig = M3UTemplatesConfig()
m3u: M3UConfig = M3UConfig()
class TemplatesConfig(BaseModel):
default: str = "{album.artist}/{album.title}/{item.title}"
track: str = ""
video: str = ""
album: str = ""
playlist: str = ""
mix: str = ""
def model_post_init(self, __context):
assert self.default != "", "Default template cannot be empty."
# override templates to default
for field in ["track", "video", "album", "playlist", "mix"]:
if getattr(self, field) == "":
setattr(self, field, self.default)
templates: TemplatesConfig = TemplatesConfig()
def load_config_file(config_file: Path) -> Config:
log.debug(f"loading '{config_file}'")
if not config_file.exists():
log.debug("config file not found, loading default config")
return Config()
toml_dict = parse_toml(config_file.read_text())
config = Config.model_validate(toml_dict, strict=True)
log.debug("loaded config from file")
return config
CONFIG = load_config_file(APP_PATH / CONFIG_FILENAME)
log.debug(f"{CONFIG=}")
+23
View File
@@ -0,0 +1,23 @@
from os import environ
from pathlib import Path
ENV_KEY = "TIDDL_PATH"
APP_DIR_NAME = ".tiddl"
def get_app_path(env_key: str = ENV_KEY) -> Path:
if environ.get(env_key):
return Path(environ[env_key])
return Path.home() / APP_DIR_NAME
def create_app_path() -> Path:
app_path = get_app_path()
app_path.mkdir(exist_ok=True)
return app_path
APP_PATH = create_app_path()
+38 -45
View File
@@ -1,59 +1,52 @@
import functools
import click
import typer
from rich.console import Console
from pathlib import Path
from typing import Callable, TypeVar, cast
from tiddl.api import TidalApi
from tiddl.config import Config
from tiddl.utils import TidalResource
from tiddl.core.api import TidalClient, TidalAPI
from tiddl.cli.config import APP_PATH
from tiddl.cli.utils.auth.core import load_auth_data
from tiddl.cli.utils.resource import TidalResource
class ContextObj:
api: TidalApi | None
config: Config
resources: list[TidalResource]
class ContextObject:
console: Console
resources: list[TidalResource]
_api: TidalAPI | None
api_omit_cache: bool
debug_path: Path | None
def __init__(self) -> None:
self.config = Config.fromFile()
def __init__(
self, api_omit_cache: bool, debug_path: Path | None, console: Console
) -> None:
self.console = console
self.resources = []
self.api = None
self.console = Console()
self._api = None
self.api_omit_cache = api_omit_cache
self.debug_path = debug_path
def initApi(self, omit_cache=False):
auth = self.config.auth
@property
def api(self):
if self._api is not None:
return self._api
if auth.token and auth.user_id and auth.country_code:
self.api = TidalApi(
auth.token,
auth.user_id,
auth.country_code,
omit_cache=omit_cache or self.config.omit_cache,
)
auth_data = load_auth_data()
def getApi(self) -> TidalApi:
if self.api is None:
raise click.UsageError("You must login first")
assert auth_data.token, "Auth Token is missing. Use `tiddl auth login`"
assert auth_data.user_id, "User ID is missing. Use `tiddl auth login`"
assert auth_data.country_code, "Country Code is missing. Use `tiddl auth login`"
return self.api
client = TidalClient(
token=auth_data.token,
cache_name=APP_PATH / "api_cache",
omit_cache=self.api_omit_cache,
debug_path=self.debug_path,
)
self._api = TidalAPI(client, auth_data.user_id, auth_data.country_code)
return self._api
class Context(click.Context):
obj: ContextObj
F = TypeVar("F", bound=Callable[..., None])
def passContext(func: F) -> F:
"""Wrapper for @click.pass_context to use custom Context"""
@click.pass_context
@functools.wraps(func)
def wrapper(ctx: click.Context, *args, **kwargs):
custom_ctx = cast(Context, ctx)
return func(custom_ctx, *args, **kwargs)
return cast(F, wrapper)
class Context(typer.Context):
obj: ContextObject
-571
View File
@@ -1,571 +0,0 @@
import os
import logging
import click
import asyncio
from time import perf_counter
from concurrent.futures import ThreadPoolExecutor, Future
from pathlib import Path
from requests import Session
from rich.highlighter import ReprHighlighter
from rich.progress import (
SpinnerColumn,
Progress,
TextColumn,
)
from tiddl.download import parseTrackStream, parseVideoStream
from tiddl.exceptions import ApiError, AuthError
from tiddl.metadata import Cover, addMetadata, addVideoMetadata
from tiddl.models.api import AlbumItemsCredits
from tiddl.models.constants import ARG_TO_QUALITY, TrackArg, SinglesFilter
from tiddl.models.resource import Track, Video, Album
from tiddl.utils import (
TidalResource,
formatResource,
convertFileExtension,
savePlaylistM3U,
findTrackFilename,
)
from tiddl.cli.ctx import Context, passContext
from tiddl.cli.download.fav import FavGroup
from tiddl.cli.download.file import FileGroup
from tiddl.cli.download.search import SearchGroup
from tiddl.cli.download.url import UrlGroup
from typing import List, Union
logger = logging.getLogger(__name__)
@click.command("download")
@click.option(
"--quality",
"-q",
"QUALITY",
type=click.Choice(TrackArg.__args__),
help="Track quality.",
)
@click.option(
"--output",
"-o",
"TEMPLATE",
type=str,
help="Format output file template. "
"This will be used instead of your config templates.",
)
@click.option(
"--path",
"-p",
"PATH",
type=str,
help="Base path of download directory. Default is ~/Music/Tiddl.",
)
@click.option(
"--threads",
"-t",
"THREADS_COUNT",
type=int,
help="Number of threads to use in concurrent download; use with caution.",
)
@click.option(
"--noskip",
"-ns",
"DO_NOT_SKIP",
is_flag=True,
default=False,
help="Do not skip already downloaded files.",
)
@click.option(
"--singles",
"-s",
"SINGLES_FILTER",
type=click.Choice(SinglesFilter.__args__),
help="Defines how to treat artist EPs and singles, used while downloading artist.",
)
@click.option(
"--lyrics",
"-l",
"EMBED_LYRICS",
is_flag=True,
help="Embed track lyrics in file metadata.",
)
@click.option(
"--video",
"-V",
"DOWNLOAD_VIDEO",
is_flag=True,
help="Enable downloading videos",
)
@click.option(
"--only-video",
"-ov",
"ONLY_VIDEO",
is_flag=True,
help="Download only videos from an artist.",
)
@click.option(
"--scan-path",
"SCAN_PATH",
type=str,
help="Base directory to scan for existing tracks. Default is 'path'",
)
@click.option(
"--save-m3u",
"-m3u",
"SAVE_M3U",
is_flag=True,
help="Save M3U file for playlists.",
)
@passContext
def DownloadCommand(
ctx: Context,
QUALITY: TrackArg | None,
TEMPLATE: str | None,
PATH: str | None,
THREADS_COUNT: int | None,
DO_NOT_SKIP: bool,
SINGLES_FILTER: SinglesFilter,
EMBED_LYRICS: bool,
DOWNLOAD_VIDEO: bool,
ONLY_VIDEO: bool,
SCAN_PATH: str | None,
SAVE_M3U: bool,
):
"""Download resources"""
DOWNLOAD_VIDEO = DOWNLOAD_VIDEO or ctx.obj.config.download.download_video
SINGLES_FILTER = SINGLES_FILTER or ctx.obj.config.download.singles_filter
EMBED_LYRICS = EMBED_LYRICS or ctx.obj.config.download.embed_lyrics
# TODO: pretty print
logger.debug(
(
QUALITY,
TEMPLATE,
PATH,
THREADS_COUNT,
DO_NOT_SKIP,
SINGLES_FILTER,
EMBED_LYRICS,
DOWNLOAD_VIDEO,
SCAN_PATH,
SAVE_M3U,
)
)
DOWNLOAD_QUALITY = ARG_TO_QUALITY[QUALITY or ctx.obj.config.download.quality]
api = ctx.obj.getApi()
progress = Progress(
SpinnerColumn(),
TextColumn(
"{task.description}"
"{task.fields[speed]:.2f} Mbps • {task.fields[size]:.2f} MB",
highlighter=ReprHighlighter(),
),
console=ctx.obj.console,
transient=True,
auto_refresh=True,
)
def handleItemDownload(
item: Union[Track, Video],
path: Path,
cover_data=b"",
credits: List[AlbumItemsCredits.ItemWithCredits.CreditsEntry] = [],
album_artist="",
) -> Path:
if isinstance(item, Track):
track_stream = api.getTrackStream(item.id, quality=DOWNLOAD_QUALITY)
description = (
f"Track '{item.title}' "
f"{(str(track_stream.bitDepth) + ' bit') if track_stream.bitDepth else ''} "
f"{str(track_stream.sampleRate) + ' kHz' if track_stream.sampleRate else ''}"
)
urls, extension = parseTrackStream(track_stream)
elif isinstance(item, Video):
video_stream = api.getVideoStream(item.id)
description = f"Video '{item.title}' {video_stream.videoQuality} quality"
urls = parseVideoStream(video_stream)
extension = ".ts"
else:
raise TypeError(
f"Invalid item type: expected an instance of Track or Video, "
f"received an instance of {type(item).__name__}. "
)
task_id = progress.add_task(
description=description,
start=True,
visible=True,
total=None,
# fields
speed=0,
size=0,
)
with Session() as s:
stream_data = b""
time_start = perf_counter()
for url in urls:
req = s.get(url)
assert req.status_code == 200, (
f"Could not download stream data for: "
f"{type(item).__name__} '{item.title}', "
f"status code: {req.status_code}"
)
stream_data += req.content
speed = len(stream_data) / (perf_counter() - time_start) / (1024 * 128)
size = len(stream_data) / 1024**2
progress.update(
task_id,
advance=len(req.content),
speed=speed,
size=size,
)
path = path.with_suffix(extension)
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("wb") as f:
f.write(stream_data)
if isinstance(item, Track):
if not cover_data and item.album.cover:
cover_data = Cover(
item.album.cover, size=ctx.obj.config.cover.size
).content
lyrics_subtitles = ""
if EMBED_LYRICS:
try:
lyrics_subtitles = api.getLyrics(item.id).subtitles
except Exception as e:
logger.error(e)
if track_stream.audioQuality in ["HI_RES_LOSSLESS"]:
path = asyncio.run(
convertFileExtension(
source_file=path,
extension=".flac",
remove_source=True,
is_video=False,
copy_audio=True, # extract flac from m4a container
)
)
try:
addMetadata(
path,
item,
cover_data,
credits,
album_artist=album_artist,
lyrics=lyrics_subtitles,
)
except Exception as e:
logger.error(f"Can not add metadata to: {path}, {e}")
elif isinstance(item, Video):
path = asyncio.run(
convertFileExtension(
source_file=path,
extension=".mp4",
remove_source=True,
is_video=True,
copy_audio=True,
)
)
try:
addVideoMetadata(path, item)
except Exception as e:
logger.error(f"Can not add metadata to: {path}, {e}")
progress.remove_task(task_id)
logger.info(f"{item.title!r}{speed:.2f} Mbps • {size:.2f} MB")
return path
pool = ThreadPoolExecutor(
max_workers=THREADS_COUNT or ctx.obj.config.download.threads
)
def submitItem(
item: Union[Track, Video],
filename: str,
cover_data=b"",
credits: List[AlbumItemsCredits.ItemWithCredits.CreditsEntry] = [],
album_artist="",
) -> Future[Path] | None:
if not item.allowStreaming:
logger.warning(
f"{type(item).__name__} '{item.title}' does not allow streaming"
)
return
download_path = Path(PATH) if PATH else ctx.obj.config.download.path
download_path /= f"{filename}.*"
scan_path = Path(SCAN_PATH) if SCAN_PATH else ctx.obj.config.download.scan_path
if scan_path:
scan_path /= f"{filename}.*"
else:
scan_path = download_path
if isinstance(item, Track):
existing_filename = findTrackFilename(
item.audioQuality, DOWNLOAD_QUALITY, scan_path
)
elif isinstance(item, Video):
existing_filename = scan_path.with_suffix(".mp4")
if existing_filename.exists():
if ctx.obj.config.update_mtime:
try:
os.utime(existing_filename, None)
except Exception:
logger.warning(f"Could not update mtime for {existing_filename}")
if not DO_NOT_SKIP:
logger.info(f"Item '{item.title}' skipped - exists")
future = Future()
future.set_result(existing_filename)
return future
if not DOWNLOAD_VIDEO and isinstance(item, Video):
logger.warning(
f"Video '{item.title}' skipped - video download is not allowed"
)
return
future = pool.submit(
handleItemDownload,
item=item,
path=download_path,
cover_data=cover_data,
credits=credits,
album_artist=album_artist,
)
return future
def downloadAlbum(album: Album):
logger.info(f"Album {album.title!r}")
cover = (
Cover(uid=album.cover, size=ctx.obj.config.cover.size)
if album.cover
else None
)
is_cover_saved = False
offset = 0
while True:
album_items = api.getAlbumItemsCredits(album.id, offset=offset)
for item in album_items.items:
filename = formatResource(
template=TEMPLATE or ctx.obj.config.template.album,
resource=item.item,
album_artist=album.artist.name,
)
if cover and not is_cover_saved and ctx.obj.config.cover.save:
path = Path(PATH) if PATH else ctx.obj.config.download.path
cover_path = path / Path(filename).parent
cover.save(cover_path, ctx.obj.config.cover.filename)
is_cover_saved = True
submitItem(
item.item,
filename,
cover.content if cover else b"",
item.credits,
album.artist.name,
)
if album_items.limit + album_items.offset > album_items.totalNumberOfItems:
break
offset += album_items.limit
def handleResource(resource: TidalResource) -> None:
logger.debug(f"'{resource}'")
match resource.type:
case "track":
track = api.getTrack(resource.id)
filename = formatResource(
TEMPLATE or ctx.obj.config.template.track, track
)
submitItem(track, filename)
case "video":
video = api.getVideo(resource.id)
filename = formatResource(
TEMPLATE or ctx.obj.config.template.video, video
)
submitItem(video, filename)
case "album":
album = api.getAlbum(resource.id)
downloadAlbum(album)
case "mix":
mix = api.getMix(resource.id)
for mix_item in mix.items:
filename = formatResource(
TEMPLATE or ctx.obj.config.template.track, mix_item.item
)
submitItem(mix_item.item, filename)
case "artist":
artist = api.getArtist(resource.id)
logger.info(f"Artist {artist.name!r}")
if ONLY_VIDEO:
offset = 0
while True:
artist_videos = api.getArtistVideos(resource.id, offset=offset)
for video in artist_videos.items:
filename = formatResource(
TEMPLATE or ctx.obj.config.template.video, video
)
submitItem(video, filename)
if offset > artist_videos.totalNumberOfItems:
break
offset += artist_videos.limit
return
def getAllAlbums(singles: bool):
offset = 0
while True:
artist_albums = api.getArtistAlbums(
resource.id,
offset=offset,
filter="EPSANDSINGLES" if singles else "ALBUMS",
)
for album in artist_albums.items:
downloadAlbum(album)
if (
artist_albums.limit + artist_albums.offset
> artist_albums.totalNumberOfItems
):
break
offset += artist_albums.limit
if SINGLES_FILTER == "include":
getAllAlbums(False)
getAllAlbums(True)
else:
getAllAlbums(SINGLES_FILTER == "only")
case "playlist":
playlist = api.getPlaylist(resource.id)
logger.info(f"downloading playlist {playlist.title!r}")
offset = 0
playlist_path = None
futures: list[tuple[Future[Path], Track]] = []
while True:
playlist_items = api.getPlaylistItems(playlist.uuid, offset=offset)
for item in playlist_items.items:
filename = formatResource(
template=TEMPLATE or ctx.obj.config.template.playlist,
resource=item.item,
playlist_title=playlist.title,
playlist_index=item.item.index // 100000,
)
future = submitItem(item.item, filename)
if future:
futures.append((future, item.item))
playlist_path = Path(filename).parent
if (
playlist_items.limit + playlist_items.offset
> playlist_items.totalNumberOfItems
):
break
offset += playlist_items.limit
playlist_tracks: list[tuple[Path, Track]] = []
for future, track in futures:
track_path = future.result()
playlist_tracks.append((track_path, track))
path = Path(PATH) if PATH else ctx.obj.config.download.path
if playlist_path and (
SAVE_M3U or ctx.obj.config.download.save_playlist_m3u
):
savePlaylistM3U(
playlist_tracks=playlist_tracks,
path=path / playlist_path,
filename=f"{playlist.title}.m3u",
)
if playlist.squareImage and playlist_path:
cover = Cover(
uid=playlist.squareImage,
size=1080, # playlist cover must be 1080x1080
)
cover.save(path / playlist_path, ctx.obj.config.cover.filename)
progress.start()
# TODO: make sure every resource is unique
for resource in ctx.obj.resources:
try:
handleResource(resource)
except AuthError as e:
logger.error(e)
break
except ApiError as e:
logger.error(e)
# session does not have streaming privileges
if e.sub_status == 4006:
break
pool.shutdown(wait=True)
progress.stop()
UrlGroup.add_command(DownloadCommand)
SearchGroup.add_command(DownloadCommand)
FavGroup.add_command(DownloadCommand)
FileGroup.add_command(DownloadCommand)
-52
View File
@@ -1,52 +0,0 @@
import click
from tiddl.utils import TidalResource, ResourceTypeLiteral
from tiddl.cli.ctx import Context, passContext
ResourceTypeList: list[ResourceTypeLiteral] = [
"track",
"video",
"album",
"artist",
"playlist",
]
@click.group("fav")
@click.option(
"--resource",
"-r",
"resource_types",
multiple=True,
type=click.Choice(ResourceTypeList),
)
@passContext
def FavGroup(ctx: Context, resource_types: list[ResourceTypeLiteral]):
"""Get your Tidal favorites."""
api = ctx.obj.getApi()
favorites = api.getFavorites()
favorites_dict = favorites.model_dump()
click.echo(type(resource_types))
if not resource_types:
resource_types = ResourceTypeList
stats: dict[ResourceTypeLiteral, int] = dict()
for resource_type in resource_types:
resources = favorites_dict[resource_type.upper()]
stats[resource_type] = len(resources)
for resource_id in resources:
ctx.obj.resources.append(TidalResource(id=resource_id, type=resource_type))
# TODO: show pretty message
click.echo(click.style(f"Loaded {len(ctx.obj.resources)} resources", "green"))
for resource_type, count in stats.items():
click.echo(f"{resource_type} - {count}")
-40
View File
@@ -1,40 +0,0 @@
import click
import json
from io import TextIOWrapper
from os.path import splitext
from tiddl.utils import TidalResource
from tiddl.cli.ctx import Context, passContext
@click.group("file")
@click.argument("filename", type=click.File(mode="r"))
@passContext
def FileGroup(ctx: Context, filename: TextIOWrapper):
"""Parse txt or JSON file with urls."""
_, extension = splitext(filename.name)
resource_strings: list[str]
match extension:
case ".json":
try:
resource_strings = json.load(filename)
except json.JSONDecodeError as e:
raise click.UsageError(f"Cant decode JSON file - {e.msg}")
case ".txt":
resource_strings = [line.strip() for line in filename.readlines()]
case _:
raise click.UsageError(f"Unsupported file extension - {extension}")
for string in resource_strings:
try:
ctx.obj.resources.append(TidalResource.fromString(string))
except ValueError as e:
click.echo(click.style(e, "red"))
click.echo(click.style(f"Loaded {len(ctx.obj.resources)} resources", "green"))
-48
View File
@@ -1,48 +0,0 @@
import click
from tiddl.utils import TidalResource
from tiddl.models.resource import Artist, Album, Playlist, Track, Video
from tiddl.cli.ctx import Context, passContext
@click.group("search")
@click.argument("query")
@passContext
def SearchGroup(ctx: Context, query: str):
"""Search on Tidal."""
# TODO: give user interactive choice what to select
api = ctx.obj.getApi()
search = api.getSearch(query)
# issue is that we get resource data in search api call,
# in download we refetch that data.
# it's not that big deal as we refetch one resource at most,
# but it should be redesigned
if not search.topHit:
click.echo(f"No search results for '{query}'")
return
value = search.topHit.value
icon = click.style("\u2bcc", "magenta")
if isinstance(value, Album):
resource = TidalResource(type="album", id=str(value.id))
click.echo(f"{icon} Album {value.title}")
elif isinstance(value, Artist):
resource = TidalResource(type="artist", id=str(value.id))
click.echo(f"{icon} Artist {value.name}")
elif isinstance(value, Track):
resource = TidalResource(type="track", id=str(value.id))
click.echo(f"{icon} Track {value.title}")
elif isinstance(value, Playlist):
resource = TidalResource(type="playlist", id=str(value.uuid))
click.echo(f"{icon} Playlist {value.title}")
elif isinstance(value, Video):
resource = TidalResource(type="video", id=str(value.id))
click.echo(f"{icon} Video {value.title}")
ctx.obj.resources.append(resource)
-26
View File
@@ -1,26 +0,0 @@
import click
from tiddl.utils import TidalResource
from tiddl.cli.ctx import Context, passContext
class TidalURL(click.ParamType):
def convert(self, value: str, param, ctx) -> TidalResource:
try:
return TidalResource.fromString(value)
except ValueError as e:
self.fail(message=str(e), param=param, ctx=ctx)
@click.group("url")
@click.argument("url", type=TidalURL())
@passContext
def UrlGroup(ctx: Context, url: TidalResource):
"""
Get Tidal URL.
It can be Tidal link or `resource_type/resource_id` format.
The resource can be a track, video, album, playlist or artist.
"""
ctx.obj.resources.append(url)
+5
View File
@@ -0,0 +1,5 @@
from .core import load_auth_data, save_auth_data
from .models import AuthData
__all__ = ["load_auth_data", "save_auth_data", "AuthData"]
+31
View File
@@ -0,0 +1,31 @@
from pathlib import Path
from logging import getLogger
from tiddl.cli.config import APP_PATH
from .models import AuthData
AUTH_DATA_FILE = APP_PATH / "auth.json"
log = getLogger(__name__)
def load_auth_data(file: Path = AUTH_DATA_FILE) -> AuthData:
log.debug(f"loading from '{AUTH_DATA_FILE}'")
try:
file_content = file.read_text()
except FileNotFoundError:
return AuthData()
auth_data = AuthData.model_validate_json(file_content)
return auth_data
def save_auth_data(auth_data: AuthData, file: Path = AUTH_DATA_FILE):
log.debug(f"saving to '{file}'")
with file.open("w") as f:
f.write(auth_data.model_dump_json())
+9
View File
@@ -0,0 +1,9 @@
from pydantic import BaseModel
class AuthData(BaseModel):
token: str | None = None
refresh_token: str | None = None
expires_at: int = 0
user_id: str | None = None
country_code: str | None = None
+26
View File
@@ -0,0 +1,26 @@
from logging import getLogger
from pathlib import Path
from tiddl.core.api.models import TrackQuality
log = getLogger(__name__)
def get_existing_track_filename(
track_quality: TrackQuality, download_quality: TrackQuality, file_name: Path
) -> Path:
"""
Predict track extension.
"""
FLAC_QUALITIES: list[TrackQuality] = ["LOSSLESS", "HI_RES_LOSSLESS"]
if download_quality in FLAC_QUALITIES and track_quality in FLAC_QUALITIES:
extension = ".flac"
else:
extension = ".m4a"
full_file_name = file_name.with_suffix(extension)
log.debug(f"{track_quality=}, {download_quality=}, {file_name=}, {full_file_name=}")
return full_file_name
+47
View File
@@ -0,0 +1,47 @@
from pydantic import BaseModel
from urllib.parse import urlparse
from typing import Literal, get_args
ResourceTypeLiteral = Literal["track", "video", "album", "playlist", "artist", "mix"]
class TidalResource(BaseModel):
type: ResourceTypeLiteral
id: str
@property
def url(self) -> str:
return f"https://listen.tidal.com/{self.type}/{self.id}"
@classmethod
def from_string(cls, string: str):
"""
Extracts the resource type (e.g., "track", "album")
and resource ID from a given input string.
The input string can either be a full URL or a shorthand string
in the format `resource_type/resource_id` (e.g., `track/12345678`).
"""
path = urlparse(string).path
resource_type, resource_id = path.split("/")[-2:]
if resource_type not in get_args(ResourceTypeLiteral):
raise ValueError(f"Invalid resource type: {resource_type}")
digit_resource_types: list[ResourceTypeLiteral] = [
"track",
"album",
"video",
"artist",
]
if resource_type in digit_resource_types and not resource_id.isdigit():
raise ValueError(f"Invalid resource id: {resource_id}")
return cls(type=resource_type, id=resource_id) # type: ignore
def __str__(self) -> str:
return f"{self.type}/{self.id}"
-72
View File
@@ -1,72 +0,0 @@
from os import environ, makedirs
from pydantic import BaseModel
from pathlib import Path
from tiddl.models.constants import TrackArg, SinglesFilter
TIDDL_ENV_KEY = "TIDDL_PATH"
# 3.0 TODO: rename HOME_PATH to TIDDL_PATH
# 3.0 TODO: add /tiddl to Path.home()
HOME_PATH = Path(environ[TIDDL_ENV_KEY]) if environ.get(TIDDL_ENV_KEY) else Path.home()
makedirs(HOME_PATH, exist_ok=True)
CONFIG_PATH = HOME_PATH / "tiddl.json"
CONFIG_INDENT = 2
class TemplateConfig(BaseModel):
track: str = "{artist} - {title}"
video: str = "{artist} - {title}"
album: str = "{album_artist}/{album}/{number:02d}. {title}"
playlist: str = "{playlist}/{playlist_number:02d}. {artist} - {title}"
class DownloadConfig(BaseModel):
quality: TrackArg = "high"
path: Path = Path.home() / "Music" / "Tiddl"
threads: int = 4
singles_filter: SinglesFilter = "none"
embed_lyrics: bool = False
download_video: bool = False
scan_path: Path | None = path
save_playlist_m3u: bool = False
class AuthConfig(BaseModel):
token: str = ""
refresh_token: str = ""
expires: int = 0
user_id: str = ""
country_code: str = ""
class CoverConfig(BaseModel):
save: bool = False
size: int = 1280
filename: str = "cover.jpg"
class Config(BaseModel):
template: TemplateConfig = TemplateConfig()
download: DownloadConfig = DownloadConfig()
cover: CoverConfig = CoverConfig()
auth: AuthConfig = AuthConfig()
omit_cache: bool = False
update_mtime: bool = False
def save(self):
with open(CONFIG_PATH, "w") as f:
f.write(self.model_dump_json(indent=CONFIG_INDENT))
@classmethod
def fromFile(cls):
try:
with CONFIG_PATH.open() as f:
config = cls.model_validate_json(f.read())
except FileNotFoundError:
config = cls()
config.save()
return config
View File
+5
View File
@@ -0,0 +1,5 @@
from .api import TidalAPI
from .client import TidalClient
from .exceptions import ApiError
__all__ = ["TidalAPI", "TidalClient", "ApiError"]
+247
View File
@@ -0,0 +1,247 @@
from requests_cache import DO_NOT_CACHE, EXPIRE_IMMEDIATELY
from typing import Literal, TypeAlias
from .client import TidalClient
from .models.resources import (
Album,
Artist,
Playlist,
Track,
Video,
TrackQuality,
VideoQuality,
)
from .models.base import (
AlbumItems,
AlbumItemsCredits,
ArtistAlbumsItems,
ArtistVideosItems,
Favorites,
TrackLyrics,
PlaylistItems,
MixItems,
Search,
SessionResponse,
TrackStream,
VideoStream,
)
ID: TypeAlias = str | int
class Limits:
# TODO test every max limit
ARTIST_ALBUMS = 50
ARTIST_ALBUMS_MAX = 200
ARTIST_VIDEOS = 50
ARTIST_VIDEOS_MAX = 200
ALBUM_ITEMS = 100
ALBUM_ITEMS_MAX = 100
PLAYLIST_ITEMS = 50
PLAYLIST_ITEMS_MAX = 200
MIX_ITEMS = 100
MIX_ITEMS_MAX = 200
class TidalAPI:
client: TidalClient
user_id: str
country_code: str
def __init__(self, client: TidalClient, user_id: str, country_code: str) -> None:
self.client = client
self.user_id = user_id
self.country_code = country_code
def get_album(self, album_id: ID):
return self.client.fetch(
Album,
f"albums/{album_id}",
{"countryCode": self.country_code},
expire_after=3600,
)
def get_album_items(
self, album_id: ID, limit: int = Limits.ALBUM_ITEMS, offset: int = 0
):
return self.client.fetch(
AlbumItems,
f"albums/{album_id}/items",
{
"countryCode": self.country_code,
"limit": min(limit, Limits.ALBUM_ITEMS_MAX),
"offset": offset,
},
expire_after=3600,
)
def get_album_items_credits(
self, album_id: ID, limit: int = Limits.ALBUM_ITEMS, offset: int = 0
):
return self.client.fetch(
AlbumItemsCredits,
f"albums/{album_id}/items/credits",
{
"countryCode": self.country_code,
"limit": min(limit, Limits.ALBUM_ITEMS_MAX),
"offset": offset,
},
expire_after=3600,
)
def get_artist(self, artist_id: ID):
return self.client.fetch(
Artist,
f"artists/{artist_id}",
{"countryCode": self.country_code},
expire_after=3600,
)
def get_artist_videos(
self,
artist_id: ID,
limit: int = Limits.ARTIST_VIDEOS,
offset: int = 0,
):
return self.client.fetch(
ArtistVideosItems,
f"artists/{artist_id}/videos",
{
"countryCode": self.country_code,
"limit": limit,
"offset": offset,
},
expire_after=3600,
)
def get_artist_albums(
self,
artist_id: ID,
limit: int = Limits.ARTIST_ALBUMS,
offset: int = 0,
filter: Literal["ALBUMS", "EPSANDSINGLES"] = "ALBUMS",
):
return self.client.fetch(
ArtistAlbumsItems,
f"artists/{artist_id}/albums",
{
"countryCode": self.country_code,
"limit": min(limit, Limits.ARTIST_ALBUMS_MAX),
"offset": offset,
"filter": filter,
},
expire_after=3600,
)
def get_mix_items(
self,
mix_id: str,
limit: int = Limits.MIX_ITEMS,
offset: int = 0,
):
return self.client.fetch(
MixItems,
f"mixes/{mix_id}/items",
{
"countryCode": self.country_code,
"limit": min(limit, Limits.MIX_ITEMS_MAX),
"offset": offset,
},
expire_after=3600,
)
def get_favorites(self):
return self.client.fetch(
Favorites,
f"users/{self.user_id}/favorites/ids",
{"countryCode": self.country_code},
expire_after=EXPIRE_IMMEDIATELY,
)
def get_playlist(self, playlist_uuid: str):
return self.client.fetch(
Playlist,
f"playlists/{playlist_uuid}",
{"countryCode": self.country_code},
expire_after=EXPIRE_IMMEDIATELY,
)
def get_playlist_items(
self, playlist_uuid: str, limit: int = Limits.PLAYLIST_ITEMS, offset: int = 0
):
return self.client.fetch(
PlaylistItems,
f"playlists/{playlist_uuid}/items",
{
"countryCode": self.country_code,
"limit": min(limit, Limits.PLAYLIST_ITEMS_MAX),
"offset": offset,
},
expire_after=EXPIRE_IMMEDIATELY,
)
def get_search(self, query: str):
return self.client.fetch(
Search,
"search",
{"countryCode": self.country_code, "query": query},
expire_after=DO_NOT_CACHE,
)
def get_session(self):
return self.client.fetch(SessionResponse, "sessions", expire_after=DO_NOT_CACHE)
def get_track_lyrics(self, track_id: ID):
return self.client.fetch(
TrackLyrics,
f"tracks/{track_id}/lyrics",
{"countryCode": self.country_code},
expire_after=3600,
)
def get_track(self, track_id: ID):
return self.client.fetch(
Track,
f"tracks/{track_id}",
{"countryCode": self.country_code},
expire_after=3600,
)
def get_track_stream(self, track_id: ID, quality: TrackQuality):
return self.client.fetch(
TrackStream,
f"tracks/{track_id}/playbackinfopostpaywall",
{
"audioquality": quality,
"playbackmode": "STREAM",
"assetpresentation": "FULL",
},
expire_after=DO_NOT_CACHE,
)
def get_video(self, video_id: ID):
return self.client.fetch(
Video,
f"videos/{video_id}",
{"countryCode": self.country_code},
expire_after=3600,
)
def get_video_stream(self, video_id: ID, quality: VideoQuality):
return self.client.fetch(
VideoStream,
f"videos/{video_id}/playbackinfopostpaywall",
{
"videoquality": quality,
"playbackmode": "STREAM",
"assetpresentation": "FULL",
},
expire_after=DO_NOT_CACHE,
)
+86
View File
@@ -0,0 +1,86 @@
import json
from logging import getLogger
from pathlib import Path
from typing import Any, Type, TypeVar
from pydantic import BaseModel
from requests_cache import (
CachedSession,
StrOrPath,
NEVER_EXPIRE,
)
from .exceptions import ApiError
T = TypeVar("T", bound=BaseModel)
API_URL = "https://api.tidal.com/v1"
log = getLogger(__name__)
class TidalClient:
token: str
debug_path: Path | None
session: CachedSession
def __init__(
self,
token: str,
cache_name: StrOrPath,
omit_cache: bool = False,
debug_path: Path | None = None,
) -> None:
self.token = token
self.debug_path = debug_path
self.session = CachedSession(
cache_name=cache_name, always_revalidate=omit_cache
)
self.session.headers = {
"Authorization": f"Bearer {token}",
"Accept": "application/json",
}
def fetch(
self,
model: Type[T],
endpoint: str,
params: dict[str, Any] = {},
expire_after: int = NEVER_EXPIRE,
) -> T:
"""
Fetch data from the API endpoint
and parse it into the given Pydantic model.
"""
res = self.session.get(
f"{API_URL}/{endpoint}", params=params, expire_after=expire_after
)
log.debug(
f"{endpoint} {params} '{'HIT' if res.from_cache else 'MISS'}' [{res.status_code}]",
)
data = res.json()
if self.debug_path:
file = self.debug_path / f"{endpoint}.json"
file.parent.mkdir(parents=True, exist_ok=True)
file.write_text(
json.dumps(
{
"status_code": res.status_code,
"endpoint": endpoint,
"params": params,
"data": data,
},
indent=2,
)
)
if res.status_code != 200:
raise ApiError(**data)
return model.model_validate(data)
+8
View File
@@ -0,0 +1,8 @@
class ApiError(Exception):
def __init__(self, status: int, subStatus: str, userMessage: str):
self.status = status
self.sub_status = subStatus
self.user_message = userMessage
def __str__(self):
return f"{self.user_message}, {self.status}/{self.sub_status}"
+35
View File
@@ -0,0 +1,35 @@
from .resources import Album, Artist, Playlist, Track, Video, TrackQuality, VideoQuality
from .base import (
AlbumItems,
AlbumItemsCredits,
ArtistAlbumsItems,
Favorites,
TrackLyrics,
PlaylistItems,
MixItems,
Search,
SessionResponse,
TrackStream,
VideoStream,
)
__all__ = [
"Album",
"Artist",
"Playlist",
"Track",
"Video",
"TrackQuality",
"VideoQuality",
"AlbumItems",
"AlbumItemsCredits",
"ArtistAlbumsItems",
"Favorites",
"TrackLyrics",
"PlaylistItems",
"MixItems",
"Search",
"SessionResponse",
"TrackStream",
"VideoStream"
]
@@ -1,19 +1,7 @@
from pydantic import BaseModel
from typing import Optional, List, Literal, Union
from tiddl.models.resource import Album, Artist, Playlist, Track, TrackQuality, Video
__all__ = [
"SessionResponse",
"ArtistAlbumsItems",
"ArtistVideosItems",
"AlbumItems",
"PlaylistItems",
"Favorites",
"TrackStream",
"Search",
"Lyrics",
]
from .resources import Album, Artist, Playlist, Track, TrackQuality, Video, VideoQuality
class SessionResponse(BaseModel):
@@ -97,6 +85,8 @@ class PlaylistItems(Items):
dateAdded: str
index: int
itemUuid: str
# playlist tracks albums have releasedate,
# but tracks alone do not lol
item: PlaylistTrack
type: ItemType = "track"
@@ -112,6 +102,7 @@ class MixItems(Items):
items: List[MixItem]
class Favorites(BaseModel):
PLAYLIST: List[str]
ALBUM: List[str]
@@ -140,24 +131,20 @@ class VideoStream(BaseModel):
videoId: int
streamType: Literal["ON_DEMAND"]
assetPresentation: Literal["FULL"]
videoQuality: Literal["HIGH", "MEDIUM"]
videoQuality: VideoQuality
# streamingSessionId: str # only in web?
manifestMimeType: Literal["application/vnd.tidal.emu"]
manifestMimeType: Literal["application/dash+xml", "application/vnd.tidal.emu"]
manifestHash: str
manifest: str
class SearchAlbum(Album):
# TODO: remove the artist field instead of making it None
artist: None = None
class Search(BaseModel):
class Artists(Items):
items: List[Artist]
class Albums(Items):
items: List[SearchAlbum]
items: List[Album]
class Playlists(Items):
items: List[Playlist]
@@ -169,7 +156,7 @@ class Search(BaseModel):
items: List[Video]
class TopHit(BaseModel):
value: Union[Artist, Track, Playlist, SearchAlbum]
value: Union[Artist, Track, Playlist, Album]
type: Literal["ARTISTS", "TRACKS", "PLAYLISTS", "ALBUMS"]
artists: Artists
@@ -180,7 +167,7 @@ class Search(BaseModel):
topHit: Optional[TopHit] = None
class Lyrics(BaseModel):
class TrackLyrics(BaseModel):
isRightToLeft: bool
lyrics: str
lyricsProvider: str
@@ -1,11 +1,11 @@
from pydantic import BaseModel
from datetime import datetime
from typing import Optional, List, Literal, Dict
from typing import Optional, List, Literal, Dict, Any
from tiddl.models.constants import TrackQuality
TrackQuality = Literal["LOW", "HIGH", "LOSSLESS", "HI_RES_LOSSLESS"]
__all__ = ["Track", "Video", "Album", "Playlist", "Artist"]
# audio_only is not stable
VideoQuality = Literal["AUDIO_ONLY", "LOW", "MEDIUM", "HIGH"]
class Track(BaseModel):
@@ -23,6 +23,9 @@ class Track(BaseModel):
vibrantColor: Optional[str] = None
videoCover: Optional[str] = None
class MediaMetadata(BaseModel):
tags: list[str]
id: int
title: str
duration: int
@@ -47,8 +50,7 @@ class Track(BaseModel):
explicit: bool
audioQuality: TrackQuality
audioModes: List[str]
mediaMetadata: Dict[str, List[str]]
# for real, artist can be None?
mediaMetadata: MediaMetadata
artist: Optional[Artist] = None
artists: List[Artist]
album: Album
@@ -120,7 +122,7 @@ class Album(BaseModel):
numberOfTracks: int
numberOfVideos: int
numberOfVolumes: int
releaseDate: Optional[str] = None
releaseDate: datetime
copyright: Optional[str] = None
type: str
version: Optional[str] = None
@@ -134,7 +136,8 @@ class Album(BaseModel):
audioQuality: str
audioModes: List[str]
mediaMetadata: MediaMetadata
artist: Artist
# artist is none in search query
artist: Optional[Artist] = None
artists: List[Artist]
@@ -147,7 +150,7 @@ class Playlist(BaseModel):
title: str
numberOfTracks: int
numberOfVideos: int
creator: Creator | Dict
creator: Creator | Dict[Any, Any]
description: Optional[str] = None
duration: int
lastUpdated: str
@@ -185,8 +188,7 @@ class Artist(BaseModel):
artistTypes: Optional[List[Literal["ARTIST", "CONTRIBUTOR"]]] = None
url: Optional[str] = None
picture: Optional[str] = None
# only in search i guess
selectedAlbumCoverFallback: Optional[str] = None
popularity: Optional[int] = None
artistRoles: Optional[List[Role]] = None
mixes: Optional[Mix | Dict] = None
mixes: Optional[Mix | Dict[Any, Any]] = None
+4
View File
@@ -0,0 +1,4 @@
from .api import AuthAPI
from .exceptions import AuthClientError
__all__ = ["AuthAPI", "AuthClientError"]
+26
View File
@@ -0,0 +1,26 @@
from tiddl.core.auth.client import AuthClient
from tiddl.core.auth.models import (
AuthDeviceResponse,
AuthResponse,
AuthResponseWithRefresh,
)
class AuthAPI:
def __init__(self, client: AuthClient | None = None) -> None:
self._client = client or AuthClient()
def get_device_auth(self) -> AuthDeviceResponse:
json_data = self._client.get_device_auth()
return AuthDeviceResponse.model_validate(json_data)
def get_auth(self, device_code: str) -> AuthResponseWithRefresh:
json_data = self._client.get_auth(device_code)
return AuthResponseWithRefresh.model_validate(json_data)
def refresh_token(self, refresh_token: str) -> AuthResponse:
json_data = self._client.refresh_token(refresh_token)
return AuthResponse.model_validate(json_data)
def logout_token(self, access_token: str) -> None:
self._client.logout_token(access_token)
+96
View File
@@ -0,0 +1,96 @@
import base64
from os import environ
from requests import request
from typing import Any, TypeAlias
from tiddl.core.auth.exceptions import AuthClientError
def get_auth_credentials() -> tuple[str, str]:
ENV_KEY = "TIDDL_AUTH"
client_id, client_secret = (
base64.b64decode(
"ZlgySnhkbW50WldLMGl4VDsxTm45QWZEQWp4cmdKRkpiS05XTGVBeUtHVkdtSU51WFBQTEhWWEF2eEFnPQ=="
)
.decode()
.split(";")
)
env_value = environ.get(ENV_KEY, None)
if env_value:
client_id, client_secret = env_value.split(";")
return client_id, client_secret
AUTH_URL = "https://auth.tidal.com/v1/oauth2"
CLIENT_ID, CLIENT_SECRET = get_auth_credentials()
JSON: TypeAlias = dict[str, Any]
class AuthClient:
def __init__(self) -> None:
self.auth_url = AUTH_URL
self.client_id = CLIENT_ID
self.client_secret = CLIENT_SECRET
def get_device_auth(self) -> JSON:
res = request(
"POST",
f"{self.auth_url}/device_authorization",
data={"client_id": self.client_id, "scope": "r_usr+w_usr+w_sub"},
)
res.raise_for_status()
return res.json()
def get_auth(self, device_code: str) -> JSON:
res = request(
"POST",
f"{self.auth_url}/token",
data={
"client_id": self.client_id,
"device_code": device_code,
"grant_type": "urn:ietf:params:oauth:grant-type:device_code",
"scope": "r_usr+w_usr+w_sub",
},
auth=(self.client_id, self.client_secret),
)
json_data = res.json()
if res.status_code != 200:
raise AuthClientError(**json_data)
return json_data
def refresh_token(self, refresh_token: str) -> JSON:
res = request(
"POST",
f"{self.auth_url}/token",
data={
"client_id": self.client_id,
"refresh_token": refresh_token,
"grant_type": "refresh_token",
"scope": "r_usr+w_usr+w_sub",
},
auth=(self.client_id, self.client_secret),
)
res.raise_for_status()
return res.json()
def logout_token(self, access_token: str) -> None:
res = request(
"POST",
"https://api.tidal.com/v1/logout",
headers={"authorization": f"Bearer {access_token}"},
)
res.raise_for_status()
+17
View File
@@ -0,0 +1,17 @@
class AuthClientError(Exception):
def __init__(
self,
status: int | None = None,
error: str | None = None,
sub_status: str | None = None,
error_description: str | None = None,
):
self.status = status
self.error = error
self.sub_status = sub_status
self.error_description = error_description
def __str__(self):
return (
f"{self.error}, {self.error_description}, {self.status}/{self.sub_status}"
)
+52
View File
@@ -0,0 +1,52 @@
from typing import Optional
from pydantic import BaseModel
class AuthResponse(BaseModel):
class User(BaseModel):
userId: int
email: str
countryCode: str
fullName: Optional[str]
firstName: Optional[str]
lastName: Optional[str]
nickname: Optional[str]
username: str
address: Optional[str]
city: Optional[str]
postalcode: Optional[str]
usState: Optional[str]
phoneNumber: Optional[str]
birthday: Optional[int]
channelId: int
parentId: int
acceptedEULA: bool
created: int
updated: int
facebookUid: int
appleUid: Optional[str]
googleUid: Optional[str]
accountLinkCreated: bool
emailVerified: bool
newUser: bool
user: User
scope: str
clientName: str
token_type: str
access_token: str
expires_in: int
user_id: int
class AuthResponseWithRefresh(AuthResponse):
refresh_token: str
class AuthDeviceResponse(BaseModel):
deviceCode: str
userCode: str
verificationUri: str
verificationUriComplete: str
expiresIn: int
interval: int
+5
View File
@@ -0,0 +1,5 @@
from .track import add_track_metadata
from .video import add_video_metadata
from .cover import Cover
__all__ = ["add_track_metadata", "add_video_metadata", "Cover"]
+55
View File
@@ -0,0 +1,55 @@
import requests
from pathlib import Path
from logging import getLogger
log = getLogger(__name__)
class Cover:
uid: str
url: str
data: bytes | None
def __init__(self, uid: str, size=1280) -> None:
self.uid = uid
if size > 1280:
log.warning(f"can not set cover size higher than 1280 (user set: {size})")
size = 1280
formatted_uid = uid.replace("-", "/")
self.url = (
f"https://resources.tidal.com/images/{formatted_uid}/{size}x{size}.jpg"
)
self.data = None
def _get_data(self) -> bytes:
req = requests.get(self.url)
if req.status_code != 200:
log.error(f"could not download cover. ({req.status_code}) {self.url}")
return b""
log.debug(f"got cover {self.url}")
return req.content
def save_to_directory(self, path: Path):
file = path.with_suffix(".jpg")
if file.exists():
log.debug(f"cover exists ({file})")
return
if not self.data:
self.data = self._get_data()
file.parent.mkdir(parents=True, exist_ok=True)
try:
file.write_bytes(self.data)
except FileNotFoundError as e:
log.error(f"could not save cover. {file} -> {e}")
+140
View File
@@ -0,0 +1,140 @@
from dataclasses import dataclass, field
from pathlib import Path
from datetime import datetime
from mutagen.flac import FLAC as MutagenFLAC, Picture
from mutagen.easymp4 import EasyMP4 as MutagenEasyMP4
from mutagen.mp4 import MP4 as MutagenMP4, MP4Cover
from tiddl.core.api.models import AlbumItemsCredits, Track
@dataclass(slots=True)
class Metadata:
title: str
track_number: str
disc_number: str
copyright: str | None
album_artist: str
artists: str
album_title: str
date: str
isrc: str
bpm: str | None = None
lyrics: str | None = None
credits: list[AlbumItemsCredits.ItemWithCredits.CreditsEntry] = field(
default_factory=list
)
cover_data: bytes | None = None
def add_flac_metadata(track_path: Path, metadata: Metadata) -> None:
mutagen = MutagenFLAC(track_path)
if metadata.cover_data:
picture = Picture()
picture.data = metadata.cover_data
picture.mime = "image/jpeg"
picture.type = 3 # front cover
mutagen.add_picture(picture)
if metadata.date:
date = datetime.fromisoformat(metadata.date)
else:
date = None
mutagen.update(
{
"TITLE": metadata.title,
"TRACKNUMBER": metadata.track_number,
"DISCNUMBER": metadata.disc_number,
"ALBUM": metadata.album_title,
"ALBUMARTIST": metadata.album_artist,
"ARTIST": metadata.artists,
"DATE": str(date) if date else "",
"YEAR": (str(date.year) if date else ""),
"COPYRIGHT": metadata.copyright or "",
"ISRC": metadata.isrc,
}
)
if metadata.bpm:
mutagen["BPM"] = metadata.bpm
if metadata.lyrics:
mutagen["LYRICS"] = metadata.lyrics
for entry in metadata.credits:
mutagen[entry.type.upper()] = [c.name for c in entry.contributors]
mutagen.save()
def add_m4a_metadata(track_path: Path, metadata: Metadata) -> None:
mutagen = MutagenMP4(track_path)
if metadata.cover_data:
mutagen["covr"] = [
MP4Cover(metadata.cover_data, imageformat=MP4Cover.FORMAT_JPEG)
]
if metadata.lyrics:
mutagen["\xa9lyr"] = [metadata.lyrics]
mutagen.save()
mutagen = MutagenEasyMP4(track_path)
mutagen.update(
{
"title": metadata.title,
"tracknumber": metadata.track_number,
"discnumber": metadata.disc_number,
"album": metadata.album_title,
"albumartist": metadata.album_artist,
"artist": metadata.artists,
"date": metadata.date,
"copyright": metadata.copyright or "",
}
)
if metadata.bpm:
mutagen["bpm"] = metadata.bpm
mutagen.save()
def add_track_metadata(
path: Path,
track: Track,
date: str = "",
album_artist: str = "",
lyrics: str = "",
cover_data: bytes | None = None,
credits: list[AlbumItemsCredits.ItemWithCredits.CreditsEntry] | None = None,
) -> None:
"""Add FLAC or M4A metadata based on file extension."""
metadata = Metadata(
title=track.title,
track_number=str(track.trackNumber),
disc_number=str(track.volumeNumber),
copyright=track.copyright,
album_artist=album_artist,
artists=", ".join(sorted(a.name.strip() for a in track.artists)),
album_title=track.album.title,
date=date,
isrc=track.isrc,
bpm=str(track.bpm or ""),
lyrics=lyrics or None,
cover_data=cover_data,
credits=credits or [],
)
ext = path.suffix.lower()
if ext == ".flac":
add_flac_metadata(path, metadata)
elif ext == ".m4a":
add_m4a_metadata(path, metadata)
else:
raise ValueError(f"Unsupported file extension: {ext}")
+31
View File
@@ -0,0 +1,31 @@
from pathlib import Path
from mutagen.easymp4 import EasyMP4 as MutagenEasyMP4
from tiddl.core.api.models import Video
def add_video_metadata(path: Path, video: Video):
mutagen = MutagenEasyMP4(path)
mutagen.update(
{
"title": video.title,
"artist": ";".join([artist.name.strip() for artist in video.artists]),
}
)
if video.artist:
mutagen["albumartist"] = video.artist.name
if video.album:
mutagen["album"] = video.album.title
if video.streamStartDate:
mutagen["date"] = str(video.streamStartDate)
if video.trackNumber:
mutagen["tracknumber"] = str(video.trackNumber)
if video.volumeNumber:
mutagen["discnumber"] = str(video.volumeNumber)
mutagen.save(path)
+11
View File
@@ -0,0 +1,11 @@
from .parse import parse_track_stream, parse_video_stream
from .download import get_track_stream_data, get_video_stream_data
from .format import format_template
__all__ = [
"parse_track_stream",
"parse_video_stream",
"get_track_stream_data",
"get_video_stream_data",
"format_template",
]
+39
View File
@@ -0,0 +1,39 @@
from requests import Session
from tiddl.core.api.models import TrackStream, VideoStream
from .parse import parse_track_stream, parse_video_stream
def download(urls: list[str]) -> bytes:
with Session() as s:
stream_data = b""
for url in urls:
req = s.get(url)
stream_data += req.content
return stream_data
def get_track_stream_data(track_stream: TrackStream) -> tuple[bytes, str]:
"""Download data from track stream and return it with file extension."""
urls, file_extension = parse_track_stream(track_stream)
stream_data = download(urls)
return stream_data, file_extension
def get_video_stream_data(video_stream: VideoStream) -> bytes:
"""Download data from video stream"""
# there can be issue with memory.
# currently we are loading data into ram
# instead of writing it to file right away.
urls = parse_video_stream(video_stream)
stream_data = download(urls)
return stream_data
+39
View File
@@ -0,0 +1,39 @@
import subprocess
from pathlib import Path
def run(cmd: list[str]):
"""Run process without printing to terminal"""
subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
def is_ffmpeg_installed() -> bool:
try:
run(["ffmpeg", "-version"])
return True
except FileNotFoundError:
return False
def convert_to_mp4(source: Path) -> Path:
output_path = source.with_suffix(".mp4")
run(["ffmpeg", "-y", "-i", str(source), "-c", "copy", str(output_path)])
source.unlink()
return output_path
def extract_flac(source: Path) -> Path:
"""
Extracts flac audio from mp4 container
"""
tmp = source.with_suffix(".tmp.flac")
run(["ffmpeg", "-y", "-i", str(source), "-c", "copy", str(tmp)])
tmp.replace(source.with_suffix(".flac"))
return source.with_suffix(".flac")
+151
View File
@@ -0,0 +1,151 @@
from dataclasses import dataclass
from datetime import datetime
from tiddl.core.api.models import Track, Video, Album, Playlist
from tiddl.core.utils.sanitize import sanitize_string
@dataclass(slots=True)
class AlbumTemplate:
id: int
title: str
artist: str
artists: str
date: datetime
@dataclass(slots=True)
class ItemTemplate:
id: int
title: str
title_version: str
number: int
volume: int
version: str
copyright: str
bpm: int
isrc: str
quality: str
artist: str
artists: str
features: str
artists_with_features: str
@dataclass(slots=True)
class PlaylistTemplate:
uuid: str
title: str
index: int
created: datetime
updated: datetime
def generate_template_data(
item: Track | Video | None = None,
album: Album | None = None,
playlist: Playlist | None = None,
playlist_index: int = 0,
) -> dict[str, ItemTemplate | AlbumTemplate | PlaylistTemplate | None]:
"""Normalize Tidal API Track/Video + Album data into safe templates."""
item_template = None
if item:
main_artists = sorted(
[a.name for a in (item.artists or []) if a.type == "MAIN"]
)
featured_artists = sorted(
[a.name for a in (item.artists or []) if a.type == "FEATURED"]
)
if isinstance(item, Track):
version = item.version or ""
copyright_ = item.copyright or ""
bpm = item.bpm or 0
isrc = item.isrc or ""
quality = item.audioQuality or ""
else: # Video
version = ""
copyright_ = ""
bpm = 0
isrc = ""
quality = item.quality or ""
item_template = ItemTemplate(
id=item.id,
title=item.title,
title_version=f"{item.title} ({version})" if version else item.title,
number=item.trackNumber,
volume=item.volumeNumber,
version=version,
copyright=copyright_,
bpm=bpm,
isrc=isrc,
quality=quality,
artist=item.artist.name if item.artist else "",
artists=", ".join(main_artists),
features=", ".join(featured_artists),
artists_with_features=", ".join(main_artists + featured_artists),
)
album_template = None
if album:
album_template = AlbumTemplate(
id=album.id,
title=album.title,
artist=album.artist.name if album.artist else "",
artists=", ".join(
a.name for a in (album.artists or []) if a.type == "MAIN"
),
date=album.releaseDate,
)
playlist_template = None
if playlist:
playlist_template = PlaylistTemplate(
uuid=playlist.uuid,
title=playlist.title,
index=playlist_index,
created=datetime.fromisoformat(playlist.created),
updated=datetime.fromisoformat(playlist.lastUpdated),
)
templates: dict[str, ItemTemplate | AlbumTemplate | PlaylistTemplate | None] = {
"item": item_template,
"album": album_template,
"playlist": playlist_template,
}
return templates
def format_template(
template: str,
item: Track | Video | None = None,
album: Album | None = None,
playlist: Playlist | None = None,
playlist_index: int = 0,
with_asterisk_ext=True,
**extra,
) -> str:
custom_fields = {"now": datetime.now()}
data = (
generate_template_data(
item=item,
album=album,
playlist=playlist,
playlist_index=playlist_index,
)
| extra
| custom_fields
)
path: str = "/".join(
[sanitize_string(segment.format(**data)) for segment in template.split("/")]
)
if with_asterisk_ext:
path += ".*"
return path
+38
View File
@@ -0,0 +1,38 @@
from logging import getLogger
from pathlib import Path
from tiddl.core.api.models import Track
log = getLogger(__name__)
def save_tracks_to_m3u(
tracks_with_path: list[tuple[Path, Track]], path: Path
):
"""
tracks_with_path: [track_path, Track]
path: m3u file location
filename: name of the m3u file
"""
file = path.with_suffix(".m3u")
log.debug(f"{path=}, {file=}")
if not tracks_with_path:
log.warning(f"can't save '{file}', no tracks")
return
try:
file.parent.mkdir(parents=True, exist_ok=True)
with file.open("w", encoding="utf-8") as f:
f.write("#EXTM3U\n")
for track_path, track in tracks_with_path:
f.write(
f"#EXTINF:{track.duration},{track.artist.name if track.artist else ''} - {track.title}\n{track_path}\n"
)
log.debug(f"saved m3u file as '{file}' with {len(tracks_with_path)} tracks")
except Exception as e:
log.error(f"can't save m3u file: {e}")
+19 -35
View File
@@ -1,18 +1,13 @@
import logging
from m3u8 import M3U8
from requests import Session
from pydantic import BaseModel
from base64 import b64decode
from xml.etree.ElementTree import fromstring
from tiddl.models.api import TrackStream, VideoStream
from tiddl.core.api.models import TrackStream, VideoStream
logger = logging.getLogger(__name__)
def parseManifestXML(xml_content: str):
def parse_manifest_XML(xml_content: str):
"""
Parses XML manifest file of the track.
"""
@@ -53,15 +48,23 @@ def parseManifestXML(xml_content: str):
return urls, codecs
class TrackManifest(BaseModel):
mimeType: str
codecs: str
encryptionType: str
urls: list[str]
def parse_track_stream(track_stream: TrackStream) -> tuple[list[str], str]:
"""
Parse URLs and file extension from `track_stream`
| Quality Level | Codec Type | Manifest MIME Type | MIME Type |
| --------------- | ---------- | ------------------------- | ---------- |
| LOW | m4a | application/vnd.tidal.bts | audio/mp4 |
| HIGH | m4a | application/vnd.tidal.bts | audio/mp4 |
| LOSSLESS | flac | application/vnd.tidal.bts | audio/flac |
| HI_RES_LOSSLESS | m4a | application/dash+xml | audio/mp4 |
"""
def parseTrackStream(track_stream: TrackStream) -> tuple[list[str], str]:
"""Parse URLs and file extension from `track_stream`"""
class TrackManifest(BaseModel):
mimeType: str
codecs: str
encryptionType: str
urls: list[str]
decoded_manifest = b64decode(track_stream.manifest).decode()
@@ -71,7 +74,7 @@ def parseTrackStream(track_stream: TrackStream) -> tuple[list[str], str]:
urls, codecs = track_manifest.urls, track_manifest.codecs
case "application/dash+xml":
urls, codecs = parseManifestXML(decoded_manifest)
urls, codecs = parse_manifest_XML(decoded_manifest)
if codecs == "flac":
file_extension = ".flac"
@@ -85,28 +88,9 @@ def parseTrackStream(track_stream: TrackStream) -> tuple[list[str], str]:
return urls, file_extension
def downloadTrackStream(track_stream: TrackStream) -> tuple[bytes, str]:
"""Download data from track stream and return it with file extension."""
urls, file_extension = parseTrackStream(track_stream)
with Session() as s:
stream_data = b""
for url in urls:
req = s.get(url)
stream_data += req.content
return stream_data, file_extension
def parseVideoStream(video_stream: VideoStream) -> list[str]:
def parse_video_stream(video_stream: VideoStream) -> list[str]:
"""Parse `video_stream` manifest and return video urls"""
# TODO: add video quality arg,
# for now we download the highest quality.
# -vq option in download command
class VideoManifest(BaseModel):
mimeType: str
urls: list[str]
+12
View File
@@ -0,0 +1,12 @@
import re
def sanitize_string(string: str) -> str:
"""
Function used to sanitize file paths.
Sometimes resources from Tidal contain
forbidden characters that we need to remove.
"""
pattern = r'[\\/:"*?<>|]+'
return re.sub(pattern, "", string)
-21
View File
@@ -1,21 +0,0 @@
class AuthError(Exception):
def __init__(
self, status: int, error: str, sub_status: str, error_description: str
):
self.status = status
self.error = error
self.sub_status = sub_status
self.error_description = error_description
def __str__(self):
return f"{self.status}: {self.error} - {self.error_description}"
class ApiError(Exception):
def __init__(self, status: int, subStatus: str, userMessage: str):
self.status = status
self.sub_status = subStatus
self.user_message = userMessage
def __str__(self):
return f"{self.user_message} ({self.status} - {self.sub_status})"
-202
View File
@@ -1,202 +0,0 @@
import logging
import requests
from os import makedirs
from pathlib import Path
from mutagen.easymp4 import EasyMP4 as MutagenEasyMP4
from mutagen.flac import FLAC as MutagenFLAC
from mutagen.flac import Picture
from mutagen.mp4 import MP4 as MutagenMP4
from mutagen.mp4 import MP4Cover
from tiddl.models.resource import Track, Video
from tiddl.models.api import AlbumItemsCredits
from typing import List
logger = logging.getLogger(__name__)
def addMetadata(
track_path: Path,
track: Track,
cover_data=b"",
credits: List[AlbumItemsCredits.ItemWithCredits.CreditsEntry] = [],
album_artist="",
lyrics="",
):
logger.debug((track_path, track.id))
extension = track_path.suffix
# TODO: handle mutagen exceptions
if extension == ".flac":
metadata = MutagenFLAC(track_path)
if cover_data:
picture = Picture()
picture.data = cover_data
picture.mime = "image/jpeg"
picture.type = 3
metadata.clear_pictures()
metadata.add_picture(picture)
metadata["TITLE"] = track.title + (
" ({})".format(track.version) if track.version else ""
)
metadata["WORK"] = track.title + (
" ({})".format(track.version) if track.version else ""
)
metadata["TRACKNUMBER"] = str(track.trackNumber)
metadata["DISCNUMBER"] = str(track.volumeNumber)
metadata["ALBUM"] = track.album.title
metadata["ARTIST"] = "; ".join(
[artist.name.strip() for artist in track.artists]
)
if album_artist:
metadata["ALBUMARTIST"] = album_artist
elif track.artist:
metadata["ALBUMARTIST"] = track.artist.name
if track.streamStartDate:
metadata["DATE"] = track.streamStartDate.strftime("%Y-%m-%d")
metadata["ORIGINALDATE"] = track.streamStartDate.strftime("%Y-%m-%d")
metadata["YEAR"] = str(track.streamStartDate.strftime("%Y"))
metadata["ORIGINALYEAR"] = str(track.streamStartDate.strftime("%Y"))
if track.copyright:
metadata["COPYRIGHT"] = track.copyright
metadata["ISRC"] = track.isrc
if track.bpm:
metadata["BPM"] = str(track.bpm)
for entry in credits:
metadata[entry.type.upper()] = [
contributor.name for contributor in entry.contributors
]
if lyrics:
metadata["LYRICS"] = lyrics
elif extension == ".m4a":
if lyrics or cover_data:
metadata = MutagenMP4(track_path)
if lyrics:
metadata["\xa9lyr"] = [lyrics]
if cover_data:
metadata["covr"] = [
MP4Cover(cover_data, imageformat=MP4Cover.FORMAT_JPEG)
]
metadata.save()
metadata = MutagenEasyMP4(track_path)
metadata.update(
{
"title": track.title,
"tracknumber": str(track.trackNumber),
"discnumber": str(track.volumeNumber),
"copyright": track.copyright if track.copyright else "",
"albumartist": track.artist.name if track.artist else "",
"artist": ", ".join(
sorted([artist.name.strip() for artist in track.artists])
),
"album": track.album.title,
"date": str(track.streamStartDate) if track.streamStartDate else "",
"bpm": str(track.bpm or 0),
}
)
else:
raise ValueError(f"Unknown file extension: {extension}")
try:
metadata.save(track_path)
except Exception as e:
logger.error(f"Failed to add metadata to {track_path}: {e}")
def addVideoMetadata(path: Path, video: Video):
metadata = MutagenEasyMP4(path)
metadata.update(
{
"title": video.title,
"albumartist": video.artist.name if video.artist else "",
"artist": ";".join([artist.name.strip() for artist in video.artists]),
"album": video.album.title if video.album else "",
"date": str(video.streamStartDate) if video.streamStartDate else "",
}
)
if video.trackNumber:
metadata["tracknumber"] = str(video.trackNumber)
if video.volumeNumber:
metadata["discnumber"] = str(video.volumeNumber)
try:
metadata.save(path)
except Exception as e:
logger.error(f"Failed to add metadata to {path}: {e}")
class Cover:
# TODO: cache covers
def __init__(self, uid: str, size=1280) -> None:
if size > 1280:
logger.warning(
f"can not set cover size higher than 1280 (user set: {size})"
)
size = 1280
self.uid = uid
formatted_uid = uid.replace("-", "/")
self.url = (
f"https://resources.tidal.com/images/{formatted_uid}/{size}x{size}.jpg"
)
logger.debug((self.uid, self.url))
self.content = self._get()
def _get(self) -> bytes:
req = requests.get(self.url)
if req.status_code != 200:
logger.error(f"could not download cover. ({req.status_code}) {self.url}")
return b""
logger.debug(f"got cover: {self.uid}")
return req.content
def save(self, directory_path: Path, filename="cover.jpg"):
if not self.content:
logger.error("cover file content is empty")
return
file = directory_path / filename
if file.exists():
logger.debug(f"cover already exists ({file})")
return
makedirs(directory_path, exist_ok=True)
try:
with file.open("wb") as f:
f.write(self.content)
except FileNotFoundError as e:
logger.error(f"could not save cover. {file} -> {e}")
-53
View File
@@ -1,53 +0,0 @@
from typing import Optional
from pydantic import BaseModel
class AuthUser(BaseModel):
userId: int
email: str
countryCode: str
fullName: Optional[str]
firstName: Optional[str]
lastName: Optional[str]
nickname: Optional[str]
username: str
address: Optional[str]
city: Optional[str]
postalcode: Optional[str]
usState: Optional[str]
phoneNumber: Optional[str]
birthday: Optional[int]
channelId: int
parentId: int
acceptedEULA: bool
created: int
updated: int
facebookUid: int
appleUid: Optional[str]
googleUid: Optional[str]
accountLinkCreated: bool
emailVerified: bool
newUser: bool
class AuthResponse(BaseModel):
user: AuthUser
scope: str
clientName: str
token_type: str
access_token: str
expires_in: int
user_id: int
class AuthResponseWithRefresh(AuthResponse):
refresh_token: str
class AuthDeviceResponse(BaseModel):
deviceCode: str
userCode: str
verificationUri: str
verificationUriComplete: str
expiresIn: int
interval: int
-14
View File
@@ -1,14 +0,0 @@
from typing import Literal
TrackQuality = Literal["LOW", "HIGH", "LOSSLESS", "HI_RES_LOSSLESS"]
TrackArg = Literal["low", "normal", "high", "master"]
SinglesFilter = Literal["none", "only", "include"]
ARG_TO_QUALITY: dict[TrackArg, TrackQuality] = {
"low": "LOW",
"normal": "HIGH",
"high": "LOSSLESS",
"master": "HI_RES_LOSSLESS",
}
QUALITY_TO_ARG = {v: k for k, v in ARG_TO_QUALITY.items()}
-234
View File
@@ -1,234 +0,0 @@
import re
import os
import logging
from ffmpeg_asyncio import FFmpeg
from ffmpeg_asyncio.types import Option as FFmpegOption
from pydantic import BaseModel
from urllib.parse import urlparse
from pathlib import Path
from typing import Literal, Union, get_args
from tiddl.models.constants import TrackQuality, QUALITY_TO_ARG
from tiddl.models.resource import Track, Video
ResourceTypeLiteral = Literal["track", "video", "album", "playlist", "artist", "mix"]
class TidalResource(BaseModel):
type: ResourceTypeLiteral
id: str
@property
def url(self) -> str:
return f"https://listen.tidal.com/{self.type}/{self.id}"
@classmethod
def fromString(cls, string: str):
"""
Extracts the resource type (e.g., "track", "album")
and resource ID from a given input string.
The input string can either be a full URL or a shorthand string
in the format `resource_type/resource_id` (e.g., `track/12345678`).
"""
path = urlparse(string).path
resource_type, resource_id = path.split("/")[-2:]
if resource_type not in get_args(ResourceTypeLiteral):
raise ValueError(f"Invalid resource type: {resource_type}")
digit_resource_types: list[ResourceTypeLiteral] = [
"track",
"album",
"video",
"artist",
]
if resource_type in digit_resource_types and not resource_id.isdigit():
raise ValueError(f"Invalid resource id: {resource_id}")
return cls(type=resource_type, id=resource_id) # type: ignore
def __str__(self) -> str:
return f"{self.type}/{self.id}"
def sanitizeString(string: str) -> str:
pattern = r'[\\/:"*?<>|]+'
return re.sub(pattern, "", string)
def formatResource(
template: str,
resource: Union[Track, Video],
album_artist="",
playlist_title="",
playlist_index=0,
) -> str:
artist = sanitizeString(resource.artist.name) if resource.artist else ""
features = [
sanitizeString(item_artist.name)
for item_artist in resource.artists
if item_artist.name != artist
]
resource_dict = {
"id": str(resource.id),
"title": sanitizeString(resource.title),
"artist": artist,
"artists": ", ".join(sorted(features + [artist])),
"features": ", ".join(features),
"album": sanitizeString(resource.album.title if resource.album else ""),
"album_id": str(resource.album.id if resource.album else ""),
"number": resource.trackNumber,
"disc": resource.volumeNumber,
"date": (resource.streamStartDate if resource.streamStartDate else ""),
# i think we can remove year as we are able to format date
"year": (
resource.streamStartDate.strftime("%Y") if resource.streamStartDate else ""
),
"playlist": sanitizeString(playlist_title),
"album_artist": sanitizeString(album_artist),
"playlist_number": playlist_index or 0,
"quality": "",
"version": "",
"bpm": "",
}
if isinstance(resource, Track):
resource_dict.update(
{
"version": sanitizeString(resource.version or ""),
"quality": QUALITY_TO_ARG[resource.audioQuality],
"bpm": resource.bpm or "",
}
)
elif isinstance(resource, Video):
resource_dict.update({"quality": resource.quality})
formatted_template = template.format(**resource_dict).strip()
disallowed_chars = r'[\\:"*?<>|]+'
invalid_chars = re.findall(disallowed_chars, formatted_template)
if invalid_chars:
raise ValueError(
f"Template '{template}' and formatted resource '{formatted_template}'"
f"contains disallowed characters: {' '.join(sorted(set(invalid_chars)))}"
)
return formatted_template
def findTrackFilename(
track_quality: TrackQuality, download_quality: TrackQuality, file_name: Path
) -> Path:
"""
Predict track extension.
"""
FLAC_QUALITIES: list[TrackQuality] = ["LOSSLESS", "HI_RES_LOSSLESS"]
if download_quality in FLAC_QUALITIES and track_quality in FLAC_QUALITIES:
extension = ".flac"
else:
extension = ".m4a"
full_file_name = file_name.with_suffix(extension)
return full_file_name
async def convertFileExtension(
source_file: Path,
extension: str,
remove_source=False,
is_video=False,
copy_audio=False,
) -> Path:
"""
Converts `source_file` extension and returns `Path` of file with new `extension`.
Removes `source_file` when `remove_source` is truthy.
"""
try:
output_file = source_file.with_suffix(extension)
except ValueError as e:
logging.error(e)
return source_file
logging.debug((source_file, output_file, extension, copy_audio, is_video))
if extension == source_file.suffix:
logging.debug("Conversion not required, already %s", extension)
return source_file
ffmpeg_args: dict[str, FFmpegOption | None] = {"loglevel": "error"}
if copy_audio:
ffmpeg_args["acodec"] = "copy"
if is_video:
ffmpeg_args["vcodec"] = "copy"
try:
logging.debug("Trying conversion")
ffmpeg = FFmpeg().option("y")
ffmpeg.input(str(source_file))
ffmpeg.output(str(output_file), ffmpeg_args)
@ffmpeg.on("completed")
def on_completed():
logging.debug(f"converted {output_file}")
if remove_source:
try:
os.remove(source_file)
except OSError as e:
logging.error(f"can't remove source file {source_file}: {e}")
await ffmpeg.execute()
except Exception as e:
logging.error(f"can't convert file {source_file}: {e}")
return source_file
return output_file
def savePlaylistM3U(
playlist_tracks: list[tuple[Path, Track]], path: Path, filename="playlist.m3u"
):
"""
playlist_tracks: [track_path, Track]
path: m3u file location
filename: name of the m3u file
"""
file = path / sanitizeString(filename)
logging.debug(f"saving m3u file at {file}")
if not playlist_tracks:
logging.warning(f"playlist {file} is empty")
return
try:
with file.open("w", encoding="utf-8") as f:
f.write("#EXTM3U\n")
for track_path, track in playlist_tracks:
f.write(
f"#EXTINF:{track.duration},{track.artist.name if track.artist else ''} - {track.title}\n{track_path}\n"
)
logging.debug(
f"saved m3u file as {file} with {len(playlist_tracks)} tracks"
)
except Exception as e:
logging.error(f"can't save playlist m3u file: {e}")