Compare commits

...

38 Commits

Author SHA1 Message Date
Oskar Dudziński 40f82b51a2 🚀 bump to 2.2.2 2025-02-24 14:55:07 +01:00
Oskar Dudziński a3c744b06c 🐛 ALBUMARTIST metadata tag is now correct (#97) 2025-02-24 14:53:18 +01:00
Oskar Dudziński ab57b700f0 🐛 API token is now refreshing correctly (#99) 2025-02-24 14:47:18 +01:00
Oskar Dudziński e41181e502 📝 Update bug_report.md 2025-02-22 13:25:59 +01:00
Oskar Dudziński e2777faa89 🚀 bump to 2.2.1 2025-02-13 20:18:13 +01:00
Oskar Dudziński 680b9b9760 CLI is now displaying download speed and file size (#93) 2025-02-13 20:01:38 +01:00
Oskar Dudziński 56968be9a2 🐛 refreshing token should now work with context (#91) 2025-02-13 12:40:50 +01:00
oskvr37 92f3feda2e 💬 enable new debug logging format in rich handler 2025-02-13 12:39:33 +01:00
oskvr37 4289875599 ♻️ logger instead of echo 2025-02-13 12:38:21 +01:00
Oskar Dudziński 01b06b480c 🐛 fix copyright can be None (#87) 2025-02-10 18:07:12 +01:00
oskvr37 1908b81334 Merge branch 'main' of https://github.com/oskvr37/tiddl 2025-02-10 18:04:49 +01:00
oskvr37 970dfd016a 📝 update options docs 2025-02-10 17:35:26 +01:00
oskvr37 733b51dd33 automatically refresh token 2025-02-09 17:44:05 +01:00
oskvr37 84358f3537 🐛 fix auth time left 2025-02-09 17:16:55 +01:00
Oskar Dudziński 7a6a742cbb 📝 Update README.md 2025-02-09 16:47:13 +01:00
oskvr37 c4e5486372 ♻️ use console in print 2025-02-09 16:42:35 +01:00
oskvr37 8518e69a9f config command, add --show option 2025-02-09 16:41:05 +01:00
oskvr37 6565ff19c7 Merge branch 'main' of https://github.com/oskvr37/tiddl 2025-02-09 15:22:41 +01:00
oskvr37 3158f795cc config docs, add --locate to config 2025-02-09 15:22:36 +01:00
oskvr37 7f7cfe6b4c add --path option 2025-02-09 15:21:56 +01:00
oskvr37 87e7073f62 💡 add TODO 2025-02-09 15:21:30 +01:00
Oskar Dudziński 02cee273a6 📝 add demo.gif to README 2025-02-09 01:46:44 +01:00
oskvr37 78a382e83e 📝 add demo.gif 2025-02-09 01:40:18 +01:00
oskvr37 4386f781cd 🚀 bump to 2.2.0 2025-02-09 00:15:06 +01:00
oskvr37 5a07a8aefb 📝 make CLI docs consistent 2025-02-09 00:14:44 +01:00
oskvr37 9de7e3224b 📝 update readme 2025-02-09 00:14:03 +01:00
Oskar Dudziński 3a14939f15 Threaded Track Download & Videos Support (#85) 2025-02-09 00:04:07 +01:00
Oskar Dudziński 0604c9fd71 📝 Update issue templates 2025-02-08 17:38:04 +01:00
oskvr37 c183c37124 add video to TidalResource 2025-02-08 16:28:52 +01:00
Oskar Dudziński 993aa08e7e add Tidal API cache (#84)
*  add cache to api

*  add cache omit option

*  add `expire_after` to `TidalApi.fetch`

* ♻️ prepare new download command
2025-02-08 14:57:52 +01:00
oskvr37 a9e105150f add rich package 2025-02-07 20:34:29 +01:00
oskvr37 91297c238a Merge branch 'main' of https://github.com/oskvr37/tiddl 2025-02-07 19:19:03 +01:00
oskvr37 2ee04ced61 download tracks between fetching 2025-02-07 19:18:55 +01:00
oskvr37 43c756904e 🐛 fix invalid int literal 2025-02-07 19:18:09 +01:00
oskvr37 0122e68afe exit command on AuthError 2025-02-07 17:17:15 +01:00
oskvr37 a6be191fbe add formatResource 2025-02-07 17:04:21 +01:00
oskvr37 b3b4bdf036 📝 fix typo 2025-02-07 16:22:25 +01:00
Oskar Dudziński 93bb73c8ba 📝 Update README.md
* monthly downloads
* download usage
* quality table
2025-02-07 15:43:16 +01:00
23 changed files with 706 additions and 247 deletions
+22
View File
@@ -0,0 +1,22 @@
---
name: Bug report
about: Create a report to help us improve
title: ''
labels: bug
assignees: oskvr37
---
**Describe the bug**
Describe what happened.
**To Reproduce**
What command was used?
**Screenshots**
If applicable, add screenshots to help explain your problem.
**Software (please complete the following information):**
- tiddl version: [e.g. v2.0.1]
- python version: [e.g. 3.11]
- OS: [e.g. Linux, Windows, iOS]
+18
View File
@@ -0,0 +1,18 @@
---
name: Feature request
about: Suggest an idea for this project
title: ''
labels: new feature
assignees: oskvr37
---
**Is your feature request related to a problem? Please describe.**
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...], It would be cool to [...]
**Describe the solution you'd like**
**Describe alternatives you've considered**
**Additional context**
Add any other context or screenshots about the feature request here.
+37 -15
View File
@@ -1,12 +1,14 @@
# Tidal Downloader
TIDDL is Python CLI application that allows downloading Tidal tracks.
![GitHub top language](https://img.shields.io/github/languages/top/oskvr37/tiddl?style=for-the-badge)
![PyPI - Downloads](https://img.shields.io/pypi/dm/tiddl?style=for-the-badge&color=%2332af64)
![PyPI - Version](https://img.shields.io/pypi/v/tiddl?style=for-the-badge)
![GitHub commits since latest release](https://img.shields.io/github/commits-since/oskvr37/tiddl/latest?style=for-the-badge)
[<img src="https://img.shields.io/badge/gitmoji-%20😜%20😍-FFDD67.svg?style=for-the-badge" />](https://gitmoji.dev)
TIDDL is the Python CLI application that allows downloading Tidal tracks and videos!
<img src="https://raw.githubusercontent.com/oskvr37/tiddl/refs/heads/main/docs/demo.gif" alt="tiddl album download in 6 seconds" />
It's inspired by [Tidal-Media-Downloader](https://github.com/yaronzz/Tidal-Media-Downloader) - currently not mantained project.
This repository will contain features requests from that project and will be the enhanced version.
@@ -25,31 +27,40 @@ Run the package cli with `tiddl`
```bash
$ tiddl
Usage: tiddl [OPTIONS] COMMAND [ARGS]...
TIDDL - Download Tidal tracks ✨
TIDDL - Tidal Downloader ♫
Options:
-v, --verbose Show debug logs
--help Show this message and exit.
-v, --verbose Show debug logs.
-q, --quiet Suppress logs.
-nc, --no-cache Omit Tidal API requests caching.
--help Show this message and exit.
Commands:
...
auth Manage Tidal token.
config Print path to the configuration file.
fav Get your Tidal favorites.
file Parse txt or JSON file with urls.
search Search on Tidal.
url Get Tidal URL.
```
# Basic usage
Login with Tidal account
## Login with Tidal account
```bash
tiddl auth login
```
Download track / album / artist / playlist
## Download resource
You can download track / video / album / artist / playlist
```bash
tiddl url https://listen.tidal.com/track/103805726 download
tiddl url https://listen.tidal.com/video/25747442 download
tiddl url https://listen.tidal.com/album/103805723 download
tiddl url https://listen.tidal.com/artist/25022 download
tiddl url https://listen.tidal.com/playlist/84974059-76af-406a-aede-ece2b78fa372 download
@@ -58,18 +69,29 @@ tiddl url https://listen.tidal.com/playlist/84974059-76af-406a-aede-ece2b78fa372
> [!TIP]
> You don't have to paste full urls, track/103805726, album/103805723 etc. will also work
Set download quality and output format
## Download options
```bash
tiddl ... download -q master -o "{artist}/{title} ({album})"
tiddl url track/103805726 download -q master -o "{artist}/{title} ({album})"
```
This command will:
- download with highest quality
- download with highest quality (master)
- save track with title and album name in artist folder
> [!NOTE]
> More about file templating [on wiki](https://github.com/oskvr37/tiddl/wiki/Template-formatting).
### Download quality
| Quality | File extension | Details |
| :-----: | :------------: | :-------------------: |
| LOW | .m4a | 96 kbps |
| NORMAL | .m4a | 320 kbps |
| HIGH | .flac | 16-bit, 44.1 kHz |
| MASTER | .flac | Up to 24-bit, 192 kHz |
### Output format
More about file templating [on wiki](https://github.com/oskvr37/tiddl/wiki/Template-formatting).
# Development
BIN
View File
Binary file not shown.

After

Width:  |  Height:  |  Size: 120 KiB

-2
View File
@@ -52,8 +52,6 @@ progress = Progress(
def handleItemDownload(item: Union[Track, Video]):
# TODO: check if item is already downloaded
if isinstance(item, Track):
track_stream = api.getTrackStream(item.id, quality=QUALITY)
urls, extension = parseTrackStream(track_stream)
+4 -2
View File
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "tiddl"
version = "2.1.0"
version = "2.2.2"
description = "Download Tidal tracks with CLI downloader."
readme = "README.md"
requires-python = ">=3.11"
@@ -17,10 +17,12 @@ classifiers = [
dependencies = [
"pydantic>=2.9.2",
"requests>=2.20.0",
"requests-cache>=1.2.1",
"click>=8.1.7",
"mutagen>=1.47.0",
"ffmpeg-python>=0.2.0",
"m3u8>=6.0.0"
"m3u8>=6.0.0",
"rich>=13.9.4"
]
[project.urls]
+2
View File
@@ -9,6 +9,8 @@ class TestTidalResource(unittest.TestCase):
positive_cases = [
("https://tidal.com/browse/track/12345678", "track", "12345678"),
("track/12345678", "track", "12345678"),
("https://tidal.com/browse/video/12345678", "video", "12345678"),
("video/12345678", "video", "12345678"),
("https://tidal.com/browse/album/12345678", "album", "12345678"),
("album/12345678", "album", "12345678"),
("https://tidal.com/browse/playlist/12345678", "playlist", "12345678"),
+49 -9
View File
@@ -4,7 +4,12 @@ from pathlib import Path
from typing import Any, Literal, Type, TypeVar
from pydantic import BaseModel
from requests import Session
from requests_cache import (
CachedSession,
EXPIRE_IMMEDIATELY,
NEVER_EXPIRE,
DO_NOT_CACHE,
)
from tiddl.models.api import (
Album,
@@ -25,8 +30,10 @@ from tiddl.models.api import (
from tiddl.models.constants import TrackQuality
from tiddl.exceptions import ApiError
from tiddl.config import HOME_PATH
DEBUG = False
T = TypeVar("T", bound=BaseModel)
logger = logging.getLogger(__name__)
@@ -51,24 +58,44 @@ class TidalApi:
URL = "https://api.tidal.com/v1"
LIMITS = Limits
def __init__(self, token: str, user_id: str, country_code: str) -> None:
def __init__(
self, token: str, user_id: str, country_code: str, omit_cache=False
) -> None:
self.user_id = user_id
self.country_code = country_code
self.session = Session()
# 3.0 TODO: change cache path
CACHE_NAME = "tiddl_api_cache"
self.session = CachedSession(
cache_name=HOME_PATH / CACHE_NAME, always_revalidate=omit_cache
)
self.session.headers = {
"Authorization": f"Bearer {token}",
"Accept": "application/json",
}
def fetch(
self, model: Type[T], endpoint: str, params: dict[str, Any] = {}
self,
model: Type[T],
endpoint: str,
params: dict[str, Any] = {},
expire_after=NEVER_EXPIRE,
) -> T:
"""Fetch data from the API and parse it into the given Pydantic model."""
req = self.session.get(f"{self.URL}/{endpoint}", params=params)
req = self.session.get(
f"{self.URL}/{endpoint}", params=params, expire_after=expire_after
)
logger.debug((endpoint, params, req.status_code))
logger.debug(
(
endpoint,
params,
req.status_code,
"HIT" if req.from_cache else "MISS",
)
)
data = req.json()
@@ -124,7 +151,10 @@ class TidalApi:
def getArtist(self, artist_id: str | int):
return self.fetch(
Artist, f"artists/{artist_id}", {"countryCode": self.country_code}
Artist,
f"artists/{artist_id}",
{"countryCode": self.country_code},
expire_after=3600,
)
def getArtistAlbums(
@@ -143,6 +173,7 @@ class TidalApi:
"offset": offset,
"filter": filter,
},
expire_after=3600,
)
def getFavorites(self):
@@ -150,6 +181,7 @@ class TidalApi:
Favorites,
f"users/{self.user_id}/favorites/ids",
{"countryCode": self.country_code},
expire_after=EXPIRE_IMMEDIATELY,
)
def getPlaylist(self, playlist_uuid: str):
@@ -170,15 +202,21 @@ class TidalApi:
"limit": limit,
"offset": offset,
},
expire_after=EXPIRE_IMMEDIATELY,
)
def getSearch(self, query: str):
return self.fetch(
Search, "search", {"countryCode": self.country_code, "query": query}
Search,
"search",
{"countryCode": self.country_code, "query": query},
expire_after=EXPIRE_IMMEDIATELY,
)
def getSession(self):
return self.fetch(SessionResponse, "sessions")
return self.fetch(
SessionResponse, "sessions", expire_after=DO_NOT_CACHE
)
def getTrack(self, track_id: str | int):
return self.fetch(
@@ -194,6 +232,7 @@ class TidalApi:
"playbackmode": "STREAM",
"assetpresentation": "FULL",
},
expire_after=DO_NOT_CACHE,
)
def getVideo(self, video_id: str | int):
@@ -210,4 +249,5 @@ class TidalApi:
"playbackmode": "STREAM",
"assetpresentation": "FULL",
},
expire_after=DO_NOT_CACHE,
)
+42 -13
View File
@@ -1,6 +1,8 @@
import click
import logging
from rich.logging import RichHandler
from .ctx import ContextObj, passContext, Context
from .auth import AuthGroup
from .download import UrlGroup, FavGroup, SearchGroup, FileGroup
@@ -8,36 +10,63 @@ from .config import ConfigCommand
from tiddl.config import HOME_PATH
from .auth import refresh
@click.group()
@passContext
@click.option("--verbose", "-v", is_flag=True, help="Show debug logs")
def cli(ctx: Context, verbose: bool):
"""TIDDL - Download Tidal tracks \u266b"""
@click.option("--verbose", "-v", is_flag=True, help="Show debug logs.")
@click.option("--quiet", "-q", is_flag=True, help="Suppress logs.")
@click.option(
"--no-cache", "-nc", is_flag=True, help="Omit Tidal API requests caching."
)
def cli(ctx: Context, verbose: bool, quiet: bool, no_cache: bool):
"""TIDDL - Tidal Downloader \u266b"""
ctx.obj = ContextObj()
# TODO: add rich console to ctx.obj, edit logging config,
# add more verbosity options (silent, info, debug),
# maybe logging format configuration
# latest logs
file_handler = logging.FileHandler(HOME_PATH / "tiddl.log", mode="w", encoding="utf-8")
file_handler.setLevel(logging.DEBUG)
file_handler = logging.FileHandler(
HOME_PATH / "tiddl.log", mode="w", encoding="utf-8"
)
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.DEBUG if verbose else logging.INFO)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(
logging.Formatter(
"%(levelname)s [%(name)s.%(funcName)s] %(message)s", datefmt="[%X]"
)
)
LEVEL = (
logging.DEBUG if verbose else logging.ERROR if quiet else logging.INFO
)
rich_handler = RichHandler(console=ctx.obj.console, rich_tracebacks=True)
rich_handler.setLevel(LEVEL)
if LEVEL == logging.DEBUG:
rich_handler.setFormatter(
logging.Formatter(
"[%(name)s.%(funcName)s] %(message)s", datefmt="[%X]"
)
)
logging.basicConfig(
level=logging.DEBUG,
handlers=[
stream_handler,
rich_handler,
file_handler,
],
format="%(levelname)s [%(name)s.%(funcName)s] %(message)s",
format="%(message)s",
datefmt="[%X]",
)
logging.getLogger("urllib3").setLevel(logging.ERROR)
if ctx.invoked_subcommand in ("fav", "file", "search", "url"):
ctx.invoke(refresh)
ctx.obj.initApi(omit_cache=no_cache)
cli.add_command(ConfigCommand)
cli.add_command(AuthGroup)
+48 -28
View File
@@ -1,11 +1,17 @@
import click
import logging
from click import style
from time import sleep, time
from tiddl.auth import getDeviceAuth, getToken, refreshToken, removeToken, AuthError
from tiddl.config import AuthConfig
from tiddl.auth import (
getDeviceAuth,
getToken,
refreshToken,
removeToken,
AuthError,
)
from .ctx import passContext, Context
@@ -14,7 +20,27 @@ logger = logging.getLogger(__name__)
@click.group("auth")
def AuthGroup():
"""Manage Tidal token"""
"""Manage Tidal token."""
@AuthGroup.command("refresh")
@passContext
def refresh(ctx: Context):
"""Refresh auth token when is expired"""
logger.debug("Invoked refresh command")
auth = ctx.obj.config.auth
if auth.refresh_token and time() > auth.expires:
logger.info("Refreshing token...")
token = refreshToken(auth.refresh_token)
ctx.obj.config.auth.expires = token.expires_in + int(time())
ctx.obj.config.auth.token = token.access_token
ctx.obj.config.save()
logger.info("Refreshed auth token!")
@AuthGroup.command("login")
@@ -22,28 +48,21 @@ def AuthGroup():
def login(ctx: Context):
"""Add token to the config"""
auth = ctx.obj.config.auth
logger.debug("Invoked login command")
if auth.token:
if auth.refresh_token and time() > auth.expires:
click.echo(style("Refreshing token...", fg="yellow"))
token = refreshToken(auth.refresh_token)
ctx.obj.config.auth.expires = token.expires_in + int(time())
ctx.obj.config.auth.token = token.access_token
ctx.obj.config.save()
click.echo(style("Authenticated!", fg="green"))
if ctx.obj.config.auth.token:
logger.info("Already logged in.")
refresh(ctx)
return
auth = getDeviceAuth()
uri = f"https://{auth.verificationUriComplete}"
click.launch(uri)
click.echo(f"Go to {style(uri, fg='cyan')} and complete authentication!")
time_left = time() + auth.expiresIn
logger.info(f"Go to {uri} and complete authentication!")
auth_end_at = time() + auth.expiresIn
while True:
sleep(auth.interval)
@@ -52,29 +71,28 @@ def login(ctx: Context):
token = getToken(auth.deviceCode)
except AuthError as e:
if e.error == "authorization_pending":
# FIX: `Time left: 0 secondsss` 🐍
time_left = auth_end_at - time()
minutes, seconds = time_left // 60, int(time_left % 60)
click.echo(f"\rTime left: {time_left - time():.0f} seconds", nl=False)
click.echo(
f"\rTime left: {minutes:.0f}:{seconds:02d}", nl=False
)
continue
if e.error == "expired_token":
click.echo(
f"\nTime for authentication {style('has expired', fg='red')}."
)
logger.info("\nTime for authentication has expired.")
break
new_auth = AuthConfig(
ctx.obj.config.auth = AuthConfig(
token=token.access_token,
refresh_token=token.refresh_token,
expires=token.expires_in + int(time()),
user_id=str(token.user.userId),
country_code=token.user.countryCode,
)
ctx.obj.config.auth = new_auth
ctx.obj.config.save()
click.echo(style("\nAuthenticated!", fg="green"))
logger.info("\nAuthenticated!")
break
@@ -84,10 +102,12 @@ def login(ctx: Context):
def logout(ctx: Context):
"""Remove token from config"""
logger.debug("Invoked logout command")
access_token = ctx.obj.config.auth.token
if not access_token:
click.echo(style("Not logged in", fg="yellow"))
logger.info("Not logged in.")
return
removeToken(access_token)
@@ -95,4 +115,4 @@ def logout(ctx: Context):
ctx.obj.config.auth = AuthConfig()
ctx.obj.config.save()
click.echo(style("Logged out!", fg="green"))
logger.info("Logged out!")
+41 -5
View File
@@ -2,18 +2,54 @@ import click
from tiddl.config import CONFIG_PATH
from .ctx import Context, passContext
@click.command("config")
@click.option(
"--open",
"-o",
"OPEN_CONFIG",
is_flag=True,
help="Open the configuration file with the default editor",
help="Open the configuration file with the default editor.",
)
def ConfigCommand(open: bool):
"""Print path to the configuration file"""
@click.option(
"--locate",
"-l",
"LOCATE_CONFIG",
is_flag=True,
help="Launch a file manager with the located configuration file.",
)
@click.option(
"--print",
"-p",
"PRINT_CONFIG",
is_flag=True,
help="Show current configuration.",
)
@passContext
def ConfigCommand(
ctx: Context, OPEN_CONFIG: bool, LOCATE_CONFIG: bool, PRINT_CONFIG: bool
):
"""
Configuration file options.
click.echo(str(CONFIG_PATH))
By default it prints location of tiddl config file.
if open:
This command can be used in variable like `vim $(tiddl config)`
- this will open your config with vim editor.
"""
if OPEN_CONFIG:
click.launch(str(CONFIG_PATH))
elif LOCATE_CONFIG:
click.launch(str(CONFIG_PATH), locate=True)
elif PRINT_CONFIG:
config_without_auth = ctx.obj.config.model_copy()
del config_without_auth.auth
ctx.obj.console.print(config_without_auth.model_dump_json(indent=2))
else:
click.echo(str(CONFIG_PATH))
+11 -1
View File
@@ -1,6 +1,8 @@
import functools
import click
from rich.console import Console
from typing import Callable, TypeVar, cast
from tiddl.api import TidalApi
@@ -12,16 +14,24 @@ class ContextObj:
api: TidalApi | None
config: Config
resources: list[TidalResource]
console: Console
def __init__(self) -> None:
self.config = Config.fromFile()
self.resources = []
self.api = None
self.console = Console()
def initApi(self, omit_cache=False):
auth = self.config.auth
if auth.token and auth.user_id and auth.country_code:
self.api = TidalApi(auth.token, auth.user_id, auth.country_code)
self.api = TidalApi(
auth.token,
auth.user_id,
auth.country_code,
omit_cache=omit_cache or self.config.omit_cache,
)
def getApi(self) -> TidalApi:
if self.api is None:
+290 -141
View File
@@ -1,5 +1,33 @@
import logging
import click
from time import perf_counter
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from requests import Session
from rich.highlighter import ReprHighlighter
from rich.progress import (
SpinnerColumn,
Progress,
TextColumn,
)
from tiddl.download import parseTrackStream, parseVideoStream
from tiddl.exceptions import ApiError, AuthError
from tiddl.metadata import Cover, addMetadata, addVideoMetadata
from tiddl.models.api import AlbumItemsCredits
from tiddl.models.constants import ARG_TO_QUALITY, TrackArg
from tiddl.models.resource import Track, Video, Album
from tiddl.utils import (
TidalResource,
formatResource,
convertFileExtension,
trackExists,
)
from typing import List, Literal, Union
from .fav import FavGroup
from .file import FileGroup
from .search import SearchGroup
@@ -7,131 +35,263 @@ from .url import UrlGroup
from ..ctx import Context, passContext
from typing import List, Union, Literal
from tiddl.download import downloadTrackStream
from tiddl.utils import (
formatTrack,
trackExists,
TidalResource,
convertFileExtension,
)
from tiddl.metadata import addMetadata, Cover
from tiddl.exceptions import ApiError, AuthError
from tiddl.models.constants import TrackArg, ARG_TO_QUALITY
from tiddl.models.resource import Track, Album
from tiddl.models.api import PlaylistItems, AlbumItemsCredits
SinglesFilter = Literal["none", "only", "include"]
@click.command("download")
@click.option(
"--quality", "-q", "quality", type=click.Choice(TrackArg.__args__)
"--quality",
"-q",
"QUALITY",
type=click.Choice(TrackArg.__args__),
help="Track quality.",
)
@click.option(
"--output", "-o", "template", type=str, help="Format track file template."
"--output",
"-o",
"TEMPLATE",
type=str,
help="Format output file template. "
"This will be used instead of your config templates.",
)
@click.option(
"--path",
"-p",
"PATH",
type=str,
help="Base path of download directory. Default is ~/Music/Tiddl.",
)
@click.option(
"--threads",
"-t",
"THREADS_COUNT",
type=int,
help="Number of threads to use in concurrent download; use with caution.",
)
@click.option(
"--noskip",
"-ns",
"noskip",
"DO_NOT_SKIP",
is_flag=True,
default=False,
help="Dont skip downloaded tracks.",
help="Do not skip already downloaded files.",
)
@click.option(
"--singles",
"-s",
"singles_filter",
"SINGLES_FILTER",
type=click.Choice(SinglesFilter.__args__),
default="none",
help="Defines how to treat artist EPs and singles.",
help="Defines how to treat artist EPs and singles, used while downloading artist.",
)
@passContext
def DownloadCommand(
ctx: Context,
quality: TrackArg | None,
template: str | None,
noskip: bool,
singles_filter: SinglesFilter = "none",
QUALITY: TrackArg | None,
TEMPLATE: str | None,
PATH: str | None,
THREADS_COUNT: int,
DO_NOT_SKIP: bool,
SINGLES_FILTER: SinglesFilter,
):
"""Download the tracks"""
"""Download resources"""
# TODO: pretty print
logging.debug(
(QUALITY, TEMPLATE, PATH, THREADS_COUNT, DO_NOT_SKIP, SINGLES_FILTER)
)
DOWNLOAD_QUALITY = ARG_TO_QUALITY[
QUALITY or ctx.obj.config.download.quality
]
api = ctx.obj.getApi()
def downloadTrack(
track: Track,
file_name: str,
progress = Progress(
SpinnerColumn(),
TextColumn(
"{task.description}"
"{task.fields[speed]:.2f} Mbps • {task.fields[size]:.2f} MB",
highlighter=ReprHighlighter(),
),
console=ctx.obj.console,
transient=True,
auto_refresh=True,
)
def handleItemDownload(
item: Union[Track, Video],
path: Path,
cover_data=b"",
credits: List[AlbumItemsCredits.ItemWithCredits.CreditsEntry] = [],
album_artist="",
):
if not track.allowStreaming:
click.echo(
f"{click.style('', 'yellow')} Track {click.style(file_name, 'yellow')} does not allow streaming"
if isinstance(item, Track):
track_stream = api.getTrackStream(item.id, quality=DOWNLOAD_QUALITY)
description = (
f"Track '{item.title}' "
f"{(str(track_stream.bitDepth) + ' bit') if track_stream.bitDepth else ''} "
f"{str(track_stream.sampleRate) + ' kHz' if track_stream.sampleRate else ''}"
)
return
download_quality = ARG_TO_QUALITY[
quality or ctx.obj.config.download.quality
]
# .suffix is needed because the Path.with_suffix method will replace any content after dot
# for example: 'album/01. title' becomes 'album/01.m4a'
path = ctx.obj.config.download.path / f"{file_name}.suffix"
if not noskip and trackExists(
track.audioQuality, download_quality, path
):
click.echo(
f"{click.style('', 'cyan')} Skipping track {click.style(file_name, 'cyan')}"
urls, extension = parseTrackStream(track_stream)
elif isinstance(item, Video):
video_stream = api.getVideoStream(item.id)
description = (
f"Video '{item.title}' {video_stream.videoQuality} quality"
)
return
click.echo(
f"{click.style('', 'green')} Downloading track {click.style(file_name, 'green')}"
urls = parseVideoStream(video_stream)
extension = ".ts"
else:
raise TypeError(
f"Invalid item type: expected an instance of Track or Video, "
f"received an instance of {type(item).__name__}. "
)
task_id = progress.add_task(
description=description,
start=True,
visible=True,
total=None,
# fields
speed=0,
size=0,
)
track_stream = api.getTrackStream(track.id, download_quality)
with Session() as s:
stream_data = b""
time_start = perf_counter()
stream_data, file_extension = downloadTrackStream(track_stream)
for url in urls:
req = s.get(url)
full_path = path.with_suffix(file_extension)
full_path.parent.mkdir(parents=True, exist_ok=True)
assert req.status_code == 200, (
f"Could not download stream data for: "
f"{type(item).__name__} '{item.title}', "
f"status code: {req.status_code}"
)
with full_path.open("wb") as f:
stream_data += req.content
speed = (
len(stream_data)
/ (perf_counter() - time_start)
/ (1024 * 128)
)
size = len(stream_data) / 1024**2
progress.update(
task_id,
advance=len(req.content),
speed=speed,
size=size,
)
path = path.with_suffix(extension)
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("wb") as f:
f.write(stream_data)
# extract flac from m4a container
if isinstance(item, Track):
if track_stream.audioQuality == "HI_RES_LOSSLESS":
path = convertFileExtension(
source_file=path,
extension=".flac",
remove_source=True,
is_video=False,
copy_audio=True, # extract flac from m4a container
)
if track_stream.audioQuality == "HI_RES_LOSSLESS":
full_path = convertFileExtension(
full_path, ".flac", remove_source=True, copy_audio=True
if not cover_data and item.album.cover:
cover_data = Cover(item.album.cover).content
try:
addMetadata(
path, item, cover_data, credits, album_artist=album_artist
)
except Exception as e:
logging.error(f"Can not add metadata to: {path}, {e}")
elif isinstance(item, Video):
path = convertFileExtension(
source_file=path,
extension=".mp4",
remove_source=True,
is_video=True,
copy_audio=True,
)
if not cover_data and track.album.cover:
cover_data = Cover(track.album.cover).content
try:
addVideoMetadata(path, item)
except Exception as e:
logging.error(f"Can not add metadata to: {path}, {e}")
try:
addMetadata(
full_path, track, cover_data=cover_data, credits=credits
)
except Exception as e:
click.echo(
f"{click.style('', 'yellow')} Cant set metadata to {click.style(file_name, 'yellow')}. {e}"
progress.remove_task(task_id)
logging.info(f"{item.title!r}{speed:.2f} Mbps • {size:.2f} MB")
pool = ThreadPoolExecutor(
max_workers=THREADS_COUNT or ctx.obj.config.download.threads
)
def submitItem(
item: Union[Track, Video],
filename: str,
cover_data=b"",
credits: List[AlbumItemsCredits.ItemWithCredits.CreditsEntry] = [],
album_artist="",
):
if not item.allowStreaming:
logging.warning(
f"{type(item).__name__} '{item.title}' does not allow streaming"
)
return
path = Path(PATH) if PATH else ctx.obj.config.download.path
path /= f"{filename}.*"
if not DO_NOT_SKIP: # check if item is already downloaded
if isinstance(item, Track):
if trackExists(item.audioQuality, DOWNLOAD_QUALITY, path):
logging.warning(f"Track '{item.title}' skipped")
return
elif isinstance(item, Video):
if path.with_suffix(".mp4").exists():
logging.warning(f"Video '{item.title}' skipped")
return
pool.submit(
handleItemDownload,
item=item,
path=path,
cover_data=cover_data,
credits=credits,
album_artist=album_artist,
)
def downloadAlbum(album: Album):
click.echo(f"Album {album.title}")
logging.info(f"Album {album.title!r}")
cover_data = Cover(album.cover).content if album.cover else b""
all_items: List[
Union[AlbumItemsCredits.VideoItem, AlbumItemsCredits.TrackItem]
] = []
offset = 0
while True:
album_items = api.getAlbumItemsCredits(album.id, offset=offset)
all_items.extend(album_items.items)
for item in album_items.items:
filename = formatResource(
template=TEMPLATE or ctx.obj.config.template.album,
resource=item.item,
album_artist=album.artist.name,
)
submitItem(
item.item,
filename,
cover_data,
item.credits,
album.artist.name,
)
if (
album_items.limit + album_items.offset
@@ -141,122 +301,111 @@ def DownloadCommand(
offset += album_items.limit
cover_data = Cover(album.cover).content if album.cover else b""
def handleResource(resource: TidalResource) -> None:
logging.debug(f"Handling Resource '{resource}'")
for item in all_items:
if isinstance(item.item, Track):
track = item.item
file_name = formatTrack(
template=template or ctx.obj.config.template.album,
track=track,
album_artist=album.artist.name,
)
downloadTrack(
track=track,
file_name=file_name,
cover_data=cover_data,
credits=item.credits,
)
def handleResource(resource: TidalResource):
match resource.type:
case "track":
track = api.getTrack(resource.id)
file_name = formatTrack(
template=template or ctx.obj.config.template.track,
track=track,
filename = formatResource(
TEMPLATE or ctx.obj.config.template.track, track
)
downloadTrack(
track=track,
file_name=file_name,
submitItem(track, filename)
case "video":
video = api.getVideo(resource.id)
filename = formatResource(
TEMPLATE or ctx.obj.config.template.video, video
)
submitItem(video, filename)
case "album":
album = api.getAlbum(resource.id)
downloadAlbum(album)
case "artist":
artist = api.getArtist(resource.id)
logging.info(f"Artist {artist.name!r}")
def getAllAlbums(singles: bool):
all_albums: List[Album] = []
offset = 0
while True:
items = api.getArtistAlbums(
artist_albums = api.getArtistAlbums(
resource.id,
offset=offset,
filter="EPSANDSINGLES" if singles else "ALBUMS",
)
all_albums.extend(items.items)
for album in artist_albums.items:
downloadAlbum(album)
if (
items.limit + items.offset
> items.totalNumberOfItems
artist_albums.limit + artist_albums.offset
> artist_albums.totalNumberOfItems
):
break
offset += items.limit
offset += artist_albums.limit
return all_albums
if singles_filter == "include":
albums = getAllAlbums(False) + getAllAlbums(True)
if SINGLES_FILTER == "include":
getAllAlbums(False)
getAllAlbums(True)
else:
albums = getAllAlbums(singles_filter == "only")
for album in albums:
downloadAlbum(album)
getAllAlbums(SINGLES_FILTER == "only")
case "playlist":
playlist = api.getPlaylist(resource.id)
click.echo(f"Playlist {playlist.title}")
all_items: List[
Union[
PlaylistItems.PlaylistVideoItem,
PlaylistItems.PlaylistTrackItem,
]
] = []
logging.info(f"Playlist {playlist.title!r}")
offset = 0
while True:
items = api.getPlaylistItems(playlist.uuid, offset=offset)
all_items.extend(items.items)
playlist_items = api.getPlaylistItems(
playlist.uuid, offset=offset
)
if items.limit + items.offset > items.totalNumberOfItems:
break
offset += items.limit
for item in all_items:
if isinstance(
item.item, PlaylistItems.PlaylistTrackItem.PlaylistTrack
):
track = item.item
file_name = formatTrack(
template=template
for item in playlist_items.items:
filename = formatResource(
template=TEMPLATE
or ctx.obj.config.template.playlist,
track=track,
resource=item.item,
playlist_title=playlist.title,
playlist_index=track.index // 100000,
playlist_index=item.item.index // 100000,
)
downloadTrack(track=item.item, file_name=file_name)
submitItem(item.item, filename)
if (
playlist_items.limit + playlist_items.offset
> playlist_items.totalNumberOfItems
):
break
offset += playlist_items.limit
progress.start()
# TODO: make sure every resource is unique
for resource in ctx.obj.resources:
try:
handleResource(resource)
except ApiError as e:
click.echo(click.style(f"{e}", "red"))
except AuthError as e:
click.echo(click.style(f"{e}", "red"))
logging.error(e)
break
except ApiError as e:
logging.error(e)
# session does not have streaming privileges
if e.sub_status == 4006:
break
pool.shutdown(wait=True)
progress.stop()
UrlGroup.add_command(DownloadCommand)
+2 -2
View File
@@ -3,7 +3,7 @@ import click
from tiddl.utils import TidalResource, ResourceTypeLiteral
from ..ctx import Context, passContext
ResourceTypeList: list[ResourceTypeLiteral] = ["track", "album", "artist", "playlist"]
ResourceTypeList: list[ResourceTypeLiteral] = ["track", "video", "album", "artist", "playlist"]
@click.group("fav")
@@ -16,7 +16,7 @@ ResourceTypeList: list[ResourceTypeLiteral] = ["track", "album", "artist", "play
)
@passContext
def FavGroup(ctx: Context, resource_types: list[ResourceTypeLiteral]):
"""Get your Tidal favorites"""
"""Get your Tidal favorites."""
api = ctx.obj.getApi()
+1 -1
View File
@@ -12,7 +12,7 @@ from tiddl.utils import TidalResource
@click.argument("filename", type=click.File(mode="r"))
@passContext
def FileGroup(ctx: Context, filename: TextIOWrapper):
"""Parse txt or JSON file with urls"""
"""Parse txt or JSON file with urls."""
_, extension = splitext(filename.name)
+7 -2
View File
@@ -10,7 +10,7 @@ from ..ctx import Context, passContext
@click.argument("query")
@passContext
def SearchGroup(ctx: Context, query: str):
"""Search on Tidal"""
"""Search on Tidal."""
# TODO: give user interactive choice what to select
@@ -23,6 +23,10 @@ def SearchGroup(ctx: Context, query: str):
# it's not that big deal as we refetch one resource at most,
# but it should be redesigned
if not search.topHit:
click.echo(f"No search results for '{query}'")
return
value = search.topHit.value
icon = click.style("\u2bcc", "magenta")
@@ -39,6 +43,7 @@ def SearchGroup(ctx: Context, query: str):
resource = TidalResource(type="playlist", id=str(value.uuid))
click.echo(f"{icon} Playlist {value.title}")
elif isinstance(value, Video):
click.echo(f"{icon} Video {value.title} (currently not supported)")
resource = TidalResource(type="video", id=str(value.id))
click.echo(f"{icon} Video {value.title}")
ctx.obj.resources.append(resource)
+1 -1
View File
@@ -21,7 +21,7 @@ def UrlGroup(ctx: Context, url: TidalResource):
Get Tidal URL.
It can be Tidal link or `resource_type/resource_id` format.
The resource can be a track, album, playlist or artist.
The resource can be a track, video, album, playlist or artist.
"""
ctx.obj.resources.append(url)
+5
View File
@@ -1,3 +1,5 @@
# 3.0 TODO: change config path to ~/.config/tiddl.json
from pydantic import BaseModel
from pathlib import Path
@@ -10,6 +12,7 @@ CONFIG_INDENT = 2
class TemplateConfig(BaseModel):
track: str = "{artist} - {title}"
video: str = "{artist} - {title}"
album: str = "{album_artist}/{album}/{number:02d}. {title}"
playlist: str = "{playlist}/{playlist_number:02d}. {artist} - {title}"
@@ -17,6 +20,7 @@ class TemplateConfig(BaseModel):
class DownloadConfig(BaseModel):
quality: TrackArg = "high"
path: Path = Path.home() / "Music" / "Tiddl"
threads: int = 4
class AuthConfig(BaseModel):
@@ -31,6 +35,7 @@ class Config(BaseModel):
template: TemplateConfig = TemplateConfig()
download: DownloadConfig = DownloadConfig()
auth: AuthConfig = AuthConfig()
omit_cache: bool = False
def save(self):
with open(CONFIG_PATH, "w") as f:
+3 -2
View File
@@ -105,8 +105,9 @@ def downloadTrackStream(track_stream: TrackStream) -> tuple[bytes, str]:
def parseVideoStream(video_stream: VideoStream) -> list[str]:
"""Parse `video_stream` manifest and return video urls"""
# TOOD: add video quality arg.
# for now we download the highest quality
# TODO: add video quality arg,
# for now we download the highest quality.
# -vq option in download command
class VideoManifest(BaseModel):
mimeType: str
+44 -9
View File
@@ -9,7 +9,7 @@ from mutagen.flac import Picture
from mutagen.mp4 import MP4 as MutagenMP4
from mutagen.mp4 import MP4Cover
from tiddl.models.resource import Track
from tiddl.models.resource import Track, Video
from tiddl.models.api import AlbumItemsCredits
from typing import List
@@ -22,6 +22,7 @@ def addMetadata(
track: Track,
cover_data=b"",
credits: List[AlbumItemsCredits.ItemWithCredits.CreditsEntry] = [],
album_artist="",
):
logger.debug((track_path, track.id))
@@ -42,23 +43,28 @@ def addMetadata(
metadata["TRACKNUMBER"] = str(track.trackNumber)
metadata["DISCNUMBER"] = str(track.volumeNumber)
if track.artist:
metadata["ARTIST"] = track.artist.name
metadata["ARTISTS"] = [artist.name for artist in track.artists]
metadata["ALBUM"] = track.album.title
metadata["ALBUMARTIST"] = ", ".join(
metadata["ARTIST"] = "; ".join(
[artist.name.strip() for artist in track.artists]
)
if album_artist:
metadata["ALBUMARTIST"] = album_artist
elif track.artist:
metadata["ALBUMARTIST"] = track.artist.name
if track.streamStartDate:
metadata["DATE"] = track.streamStartDate.strftime("%Y-%m-%d")
metadata["ORIGINALDATE"] = track.streamStartDate.strftime(
"%Y-%m-%d"
)
metadata["YEAR"] = str(track.streamStartDate.strftime("%Y"))
metadata["ORIGINALYEAR"] = str(track.streamStartDate.strftime("%Y"))
metadata["COPYRIGHT"] = track.copyright
if track.copyright:
metadata["COPYRIGHT"] = track.copyright
metadata["ISRC"] = track.isrc
if track.bpm:
@@ -83,7 +89,7 @@ def addMetadata(
"title": track.title,
"tracknumber": str(track.trackNumber),
"discnumber": str(track.volumeNumber),
"copyright": track.copyright,
"copyright": track.copyright if track.copyright else "",
"albumartist": track.artist.name if track.artist else "",
"artist": ";".join(
[artist.name.strip() for artist in track.artists]
@@ -92,7 +98,7 @@ def addMetadata(
"date": str(track.streamStartDate)
if track.streamStartDate
else "",
"bpm": str(track.bpm or ""),
"bpm": str(track.bpm or 0),
}
)
@@ -105,7 +111,36 @@ def addMetadata(
logger.error(f"Failed to add metadata to {track_path}: {e}")
def addVideoMetadata(path: Path, video: Video):
metadata = MutagenEasyMP4(path)
metadata.update(
{
"title": video.title,
"albumartist": video.artist.name if video.artist else "",
"artist": ";".join(
[artist.name.strip() for artist in video.artists]
),
"album": video.album.title if video.album else "",
"date": str(video.streamStartDate) if video.streamStartDate else "",
}
)
if video.trackNumber:
metadata["tracknumber"] = str(video.trackNumber)
if video.volumeNumber:
metadata["discnumber"] = str(video.volumeNumber)
try:
metadata.save(path)
except Exception as e:
logger.error(f"Failed to add metadata to {path}: {e}")
class Cover:
# TODO: cache covers
def __init__(self, uid: str, size=1280) -> None:
if size > 1280:
logger.warning(
+1 -1
View File
@@ -168,4 +168,4 @@ class Search(BaseModel):
playlists: Playlists
tracks: Tracks
videos: Videos
topHit: TopHit
topHit: Optional[TopHit] = None
+6 -6
View File
@@ -38,7 +38,7 @@ class Track(BaseModel):
volumeNumber: int
version: Optional[str] = None
popularity: int
copyright: str
copyright: Optional[str] = None
bpm: Optional[int] = None
url: str
isrc: str
@@ -56,7 +56,7 @@ class Track(BaseModel):
class Video(BaseModel):
class Arist(BaseModel):
class Artist(BaseModel):
id: int
name: str
type: str
@@ -73,7 +73,7 @@ class Video(BaseModel):
title: str
volumeNumber: int
trackNumber: int
releaseDate: str
streamStartDate: Optional[datetime] = None
imagePath: Optional[str] = None
imageId: str
vibrantColor: str
@@ -83,15 +83,15 @@ class Video(BaseModel):
adSupportedStreamReady: bool
djReady: bool
stemReady: bool
streamStartDate: str
streamStartDate: Optional[datetime] = None
allowStreaming: bool
explicit: bool
popularity: int
type: str
adsUrl: Optional[str] = None
adsPrePaywallOnly: bool
artist: Optional[Arist] = None
artists: List[Arist]
artist: Optional[Artist] = None
artists: List[Artist]
album: Optional[Album] = None
+72 -7
View File
@@ -7,12 +7,12 @@ from pydantic import BaseModel
from urllib.parse import urlparse
from pathlib import Path
from typing import Literal, get_args
from typing import Literal, Union, get_args
from tiddl.models.constants import TrackQuality, QUALITY_TO_ARG
from tiddl.models.resource import Track
from tiddl.models.resource import Track, Video
ResourceTypeLiteral = Literal["track", "album", "playlist", "artist"]
ResourceTypeLiteral = Literal["track", "video", "album", "playlist", "artist"]
class TidalResource(BaseModel):
@@ -79,7 +79,9 @@ def formatTrack(
"disc": track.volumeNumber,
"date": (track.streamStartDate if track.streamStartDate else ""),
# i think we can remove year as we are able to format date
"year": track.streamStartDate.strftime("%Y") if track.streamStartDate else "",
"year": track.streamStartDate.strftime("%Y")
if track.streamStartDate
else "",
"playlist": sanitizeString(playlist_title),
"bpm": track.bpm or "",
"quality": QUALITY_TO_ARG[track.audioQuality],
@@ -100,6 +102,69 @@ def formatTrack(
return formatted_track
def formatResource(
template: str,
resource: Union[Track, Video],
album_artist="",
playlist_title="",
playlist_index=0,
) -> str:
artist = sanitizeString(resource.artist.name) if resource.artist else ""
features = [
sanitizeString(item_artist.name)
for item_artist in resource.artists
if item_artist.name != artist
]
resource_dict = {
"id": str(resource.id),
"title": sanitizeString(resource.title),
"artist": artist,
"artists": ", ".join(features + [artist]),
"features": ", ".join(features),
"album": sanitizeString(resource.album.title if resource.album else ""),
"number": resource.trackNumber,
"disc": resource.volumeNumber,
"date": (resource.streamStartDate if resource.streamStartDate else ""),
# i think we can remove year as we are able to format date
"year": resource.streamStartDate.strftime("%Y")
if resource.streamStartDate
else "",
"playlist": sanitizeString(playlist_title),
"album_artist": sanitizeString(album_artist),
"playlist_number": playlist_index or 0,
"quality": "",
"version": "",
"bpm": "",
}
if isinstance(resource, Track):
resource_dict.update(
{
"version": sanitizeString(resource.version or ""),
"quality": QUALITY_TO_ARG[resource.audioQuality],
"bpm": resource.bpm or "",
}
)
elif isinstance(resource, Video):
resource_dict.update({"quality": resource.quality})
formatted_template = template.format(**resource_dict)
disallowed_chars = r'[\\:"*?<>|]+'
invalid_chars = re.findall(disallowed_chars, formatted_template)
if invalid_chars:
raise ValueError(
f"Template '{template}' and formatted resource '{formatted_template}'"
f"contains disallowed characters: {' '.join(sorted(set(invalid_chars)))}"
)
return formatted_template
def trackExists(
track_quality: TrackQuality, download_quality: TrackQuality, file_name: Path
):
@@ -151,9 +216,9 @@ def convertFileExtension(
if is_video:
ffmpeg_args["c:v"] = "copy"
ffmpeg.input(str(source_file)).output(
str(output_file), **ffmpeg_args
).run(overwrite_output=1)
ffmpeg.input(str(source_file)).output(str(output_file), **ffmpeg_args).run(
overwrite_output=1
)
if remove_source:
os.remove(source_file)