mirror of
https://github.com/oskvr37/tiddl.git
synced 2026-06-13 12:15:13 +03:00
Compare commits
41 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 05d63d153e | |||
| 04de8e677c | |||
| d0d146b87f | |||
| 77e488ff30 | |||
| 459d5a50b9 | |||
| ee160fc5bc | |||
| 1a78d875fa | |||
| b0ed7bd208 | |||
| e45628e15f | |||
| d9c8984dfa | |||
| c285be6ed2 | |||
| 5e4f9bdb6a | |||
| a282c1a4af | |||
| 46a6e748da | |||
| bf8ded5f60 | |||
| 7e0fb9fb37 | |||
| 880f6008b0 | |||
| 0f9a4006f1 | |||
| 3cfadd7795 | |||
| 47975e12bc | |||
| fbb32e735d | |||
| 3cba05910b | |||
| c22cb2941d | |||
| 6b82c40fae | |||
| 9abf141411 | |||
| 477b4b4635 | |||
| debca2fc1d | |||
| d830a8ed73 | |||
| 33b1e6c826 | |||
| e32fde7794 | |||
| 99804c0304 | |||
| 410146bdcf | |||
| 859d50772d | |||
| 79c21f7842 | |||
| 0f76845c35 | |||
| b1e28a8ae6 | |||
| 658e4a81ab | |||
| 4b6b23225a | |||
| ed9a05c666 | |||
| 8a2c30feaf | |||
| cda1dc6a7a |
@@ -14,6 +14,7 @@ body:
|
||||
attributes:
|
||||
label: What command was used?
|
||||
placeholder: tiddl
|
||||
render: shell
|
||||
|
||||
- type: textarea
|
||||
id: what-happened
|
||||
@@ -31,6 +32,7 @@ body:
|
||||
options:
|
||||
- 3.13
|
||||
- 3.14
|
||||
- 3.15
|
||||
default: 0
|
||||
validations:
|
||||
required: true
|
||||
|
||||
+1
-1
@@ -17,4 +17,4 @@ RUN python -c "import tomllib; f=open('pyproject.toml','rb'); print('\n'.join(to
|
||||
# -- Layer 3 - Uncached layer (regenerates anytime a new build is released) --
|
||||
COPY . .
|
||||
RUN pip install --no-deps .
|
||||
RUN rm -rf *
|
||||
RUN rm -rf -- ..?* .[!.]* *
|
||||
|
||||
@@ -24,6 +24,12 @@ We recommend using [uv](https://docs.astral.sh/uv/)
|
||||
uv tool install tiddl
|
||||
```
|
||||
|
||||
To install exact version e.g. 3.4.1
|
||||
|
||||
```bash
|
||||
uv tool install tiddl==3.4.1
|
||||
```
|
||||
|
||||
## pip
|
||||
|
||||
You can also use [pip](https://packaging.python.org/en/latest/tutorials/installing-packages/)
|
||||
@@ -81,18 +87,6 @@ $ tiddl download url <url>
|
||||
|
||||
Run `tiddl download` to see available download options.
|
||||
|
||||
### Error Handling
|
||||
|
||||
By default, tiddl stops when encountering unavailable items in collections such as playlists, albums, artists, or mixes (e.g., removed or region-locked tracks).
|
||||
|
||||
Use `--skip-errors` to automatically skip these items and continue downloading:
|
||||
|
||||
```bash
|
||||
tiddl download url <url> --skip-errors
|
||||
```
|
||||
|
||||
Skipped items are logged with track/album name and IDs for reference.
|
||||
|
||||
### Quality
|
||||
|
||||
| Quality | File extension | Details |
|
||||
|
||||
@@ -80,6 +80,24 @@ update_mtime = false
|
||||
# could be useful when data on Tidal has changed.
|
||||
rewrite_metadata = false
|
||||
|
||||
# if this option is set to true, an .lrc file will be created alongside the
|
||||
# track file with the same name
|
||||
write_lrc_file = false
|
||||
|
||||
# when enabled, existing path components are reused even if Tidal returns
|
||||
# different casing. This avoids creating separate paths on case-sensitive
|
||||
# filesystems that would conflict later when moved to case-insensitive systems.
|
||||
# For example, if "FooBar" already exists and the API returns "foobar",
|
||||
# downloads will continue under "FooBar".
|
||||
match_existing_path_case = false
|
||||
|
||||
# Dolby Atmos filter
|
||||
# none - download only STEREO tracks
|
||||
# only - download only DOLBY_ATMOS tracks
|
||||
# allow - download both
|
||||
# (both versions won't be downloaded at a time, it depends on what Tidal returns)
|
||||
atmos_filter = "none"
|
||||
|
||||
|
||||
[metadata]
|
||||
# embed metadata in files
|
||||
@@ -126,7 +144,7 @@ allowed = [
|
||||
# album = "albums/{album.artist} - {album.title}"
|
||||
|
||||
# you can access: {playlist}
|
||||
# playlist = "playlists/{title}"
|
||||
# playlist = "playlists/{playlist.title}"
|
||||
|
||||
|
||||
[m3u]
|
||||
|
||||
+1
-1
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "tiddl"
|
||||
version = "3.2.3"
|
||||
version = "3.4.4a1"
|
||||
description = "Download Tidal tracks with CLI downloader."
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.13"
|
||||
|
||||
@@ -124,28 +124,73 @@ def test_logout_with_token(monkeypatch: pytest.MonkeyPatch):
|
||||
mock_api_instance.logout_token.assert_called_once_with("token")
|
||||
mock_save.assert_called_once_with(AuthData())
|
||||
|
||||
assert "Logged out!" in result.stdout
|
||||
assert "Logged out successfully!\n" in result.stdout
|
||||
assert result.exit_code == 0
|
||||
|
||||
|
||||
def test_logout_no_token(monkeypatch: pytest.MonkeyPatch):
|
||||
"""Should only clear auth data."""
|
||||
"""Should do nothing."""
|
||||
|
||||
monkeypatch.setattr(
|
||||
"tiddl.cli.commands.auth.load_auth_data", lambda: AuthData(token=None)
|
||||
)
|
||||
|
||||
with (patch("tiddl.cli.commands.auth.AuthAPI") as MockAuthAPI,):
|
||||
result = runner.invoke(auth_command, ["logout"])
|
||||
|
||||
MockAuthAPI.assert_not_called()
|
||||
|
||||
assert "No active session found." in result.stdout
|
||||
assert result.exit_code == 0
|
||||
|
||||
|
||||
def test_logout_force(monkeypatch: pytest.MonkeyPatch):
|
||||
"""Should remove local token even when the API request raises an error."""
|
||||
|
||||
# 1. Mock existing session
|
||||
monkeypatch.setattr(
|
||||
"tiddl.cli.commands.auth.load_auth_data", lambda: AuthData(token="fake-token")
|
||||
)
|
||||
|
||||
with (
|
||||
patch("tiddl.cli.commands.auth.AuthAPI") as MockAuthAPI,
|
||||
patch("tiddl.cli.commands.auth.save_auth_data") as mock_save,
|
||||
):
|
||||
# 2. Configure the mock to RAISE an exception
|
||||
mock_api_instance = MockAuthAPI.return_value
|
||||
mock_api_instance.logout_token.side_effect = Exception("Server Down")
|
||||
|
||||
# 3. Invoke with --force
|
||||
result = runner.invoke(auth_command, ["logout", "--force"])
|
||||
|
||||
# 4. Assertions
|
||||
# API was still called
|
||||
mock_api_instance.logout_token.assert_called_once_with("fake-token")
|
||||
|
||||
# Local data was still wiped (this is the core of --force)
|
||||
mock_save.assert_called_once_with(AuthData())
|
||||
|
||||
# Check for your specific "force" success message
|
||||
assert "Token removed!" in result.stdout
|
||||
assert result.exit_code == 0
|
||||
|
||||
|
||||
def test_logout_fails_without_force(monkeypatch: pytest.MonkeyPatch):
|
||||
monkeypatch.setattr(
|
||||
"tiddl.cli.commands.auth.load_auth_data", lambda: AuthData(token="token")
|
||||
)
|
||||
|
||||
with (
|
||||
patch("tiddl.cli.commands.auth.AuthAPI") as MockAuthAPI,
|
||||
patch("tiddl.cli.commands.auth.save_auth_data") as mock_save,
|
||||
):
|
||||
|
||||
MockAuthAPI.return_value.logout_token.side_effect = Exception("Error")
|
||||
|
||||
result = runner.invoke(auth_command, ["logout"])
|
||||
|
||||
mock_save.assert_called_once_with(AuthData())
|
||||
MockAuthAPI.assert_not_called()
|
||||
|
||||
assert "Logged out!" in result.stdout
|
||||
assert result.exit_code == 0
|
||||
assert "Local session retained" in result.stdout
|
||||
mock_save.assert_not_called() # Ensure data wasn't wiped
|
||||
|
||||
|
||||
def test_refresh_not_logged_in(monkeypatch: pytest.MonkeyPatch):
|
||||
|
||||
@@ -38,6 +38,20 @@ def test_valid_config_file(tmp_path: Path):
|
||||
assert cfg.download.threads_count == 8
|
||||
|
||||
|
||||
def test_match_existing_path_case_config(tmp_path: Path):
|
||||
cfg_file = write_config(
|
||||
tmp_path,
|
||||
"""
|
||||
[download]
|
||||
match_existing_path_case = true
|
||||
""",
|
||||
)
|
||||
|
||||
cfg = load_config_file(cfg_file)
|
||||
|
||||
assert cfg.download.match_existing_path_case is True
|
||||
|
||||
|
||||
def test_invalid_type_raises(tmp_path: Path):
|
||||
cfg_file = write_config(
|
||||
tmp_path,
|
||||
|
||||
@@ -0,0 +1,38 @@
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from tiddl.cli.utils.path import resolve_existing_path_case
|
||||
|
||||
|
||||
def test_resolve_existing_path_case_reuses_existing_directories(tmp_path: Path):
|
||||
existing_album = tmp_path / "FooBar" / "[2024.01.02] Album"
|
||||
existing_album.mkdir(parents=True)
|
||||
|
||||
path = resolve_existing_path_case(
|
||||
tmp_path,
|
||||
Path("foobar") / "[2024.01.02] album" / "01 - Track.flac",
|
||||
)
|
||||
|
||||
assert path == existing_album / "01 - Track.flac"
|
||||
|
||||
|
||||
def test_resolve_existing_path_case_reuses_existing_file(tmp_path: Path):
|
||||
existing_file = tmp_path / "FooBar" / "01 - Track.flac"
|
||||
existing_file.parent.mkdir()
|
||||
existing_file.touch()
|
||||
|
||||
path = resolve_existing_path_case(tmp_path, Path("foobar") / "01 - track.flac")
|
||||
|
||||
assert path == existing_file
|
||||
|
||||
|
||||
def test_resolve_existing_path_case_keeps_new_components(tmp_path: Path):
|
||||
path = resolve_existing_path_case(tmp_path, Path("FooBar") / "New Album")
|
||||
|
||||
assert path == tmp_path / "FooBar" / "New Album"
|
||||
|
||||
|
||||
def test_resolve_existing_path_case_rejects_absolute_path(tmp_path: Path):
|
||||
with pytest.raises(ValueError, match="relative_path"):
|
||||
resolve_existing_path_case(tmp_path, tmp_path / "FooBar")
|
||||
+5
-2
@@ -13,6 +13,8 @@ log = logging.getLogger("tiddl")
|
||||
app = typer.Typer(name="tiddl", no_args_is_help=True, rich_markup_mode="rich")
|
||||
register_commands(app)
|
||||
|
||||
VERSION = "v3.4.4a1"
|
||||
|
||||
|
||||
@app.callback()
|
||||
def callback(
|
||||
@@ -30,13 +32,14 @@ def callback(
|
||||
),
|
||||
] = CONFIG.debug,
|
||||
):
|
||||
"""
|
||||
tiddl - download tidal tracks \u266b
|
||||
f"""
|
||||
tiddl {VERSION} - download tidal tracks \u266b
|
||||
|
||||
[link=https://github.com/oskvr37/tiddl]github (https://github.com/oskvr37/tiddl)[/link]
|
||||
[link=https://buymeacoffee.com/oskvr][yellow]buy me a coffee (https://buymeacoffee.com/oskvr)[/link]
|
||||
"""
|
||||
|
||||
log.debug(f"{VERSION=}")
|
||||
log.debug(f"{ctx.params=}")
|
||||
|
||||
is_ffmpeg_installed = ifs()
|
||||
|
||||
@@ -38,7 +38,7 @@ def login(
|
||||
|
||||
if not NO_BROWSER:
|
||||
typer.launch(uri)
|
||||
|
||||
|
||||
console.print(f"Go to '{uri}' and complete authentication!")
|
||||
|
||||
auth_end_at = time() + device_auth.expiresIn
|
||||
@@ -79,16 +79,41 @@ def login(
|
||||
|
||||
|
||||
@auth_command.command(help="Logout and remove token from app.")
|
||||
def logout():
|
||||
loaded_auth_data = load_auth_data()
|
||||
def logout(
|
||||
force: Annotated[
|
||||
bool,
|
||||
typer.Option(
|
||||
"--force",
|
||||
"-f",
|
||||
help="Clears local auth data even if the server request fails.",
|
||||
),
|
||||
] = False,
|
||||
):
|
||||
auth_data = load_auth_data()
|
||||
|
||||
if loaded_auth_data.token:
|
||||
auth_api = AuthAPI()
|
||||
auth_api.logout_token(loaded_auth_data.token)
|
||||
# If there's no token, we are effectively already logged out locally
|
||||
if not auth_data.token:
|
||||
console.print("[yellow]No active session found.")
|
||||
return
|
||||
|
||||
try:
|
||||
api = AuthAPI()
|
||||
api.logout_token(auth_data.token)
|
||||
success = True
|
||||
except Exception as error:
|
||||
console.print(f"[bold red]Logout request failed: {error}")
|
||||
success = False
|
||||
|
||||
if not (success or force):
|
||||
console.print("[bold yellow]Local session retained. Use --force to override.")
|
||||
return
|
||||
|
||||
save_auth_data(AuthData())
|
||||
|
||||
console.print("[bold green]Logged out!")
|
||||
if success:
|
||||
console.print("[bold green]Logged out successfully!")
|
||||
elif force:
|
||||
console.print("[bold green]Token removed!")
|
||||
|
||||
|
||||
@auth_command.command(help="Refreshes your token in app.")
|
||||
|
||||
@@ -20,6 +20,7 @@ from tiddl.cli.config import (
|
||||
ARTIST_SINGLES_FILTER_LITERAL,
|
||||
VALID_M3U_RESOURCE_LITERAL,
|
||||
VIDEOS_FILTER_LITERAL,
|
||||
ATMOS_FILTER_LITERAL,
|
||||
)
|
||||
from tiddl.cli.utils.resource import TidalResource
|
||||
from tiddl.cli.ctx import Context
|
||||
@@ -126,6 +127,14 @@ def download_callback(
|
||||
help="Raise an error on resource download failure. Use for debugging",
|
||||
),
|
||||
] = False,
|
||||
DOLBY_ATMOS_FILTER: Annotated[
|
||||
ATMOS_FILTER_LITERAL,
|
||||
typer.Option(
|
||||
"--dolby-atmos",
|
||||
"-da",
|
||||
help="Dolby Atmos filter, 'none' to exclude, 'allow' to include, 'only' to download only Dolby Atmos, if available.",
|
||||
),
|
||||
] = CONFIG.download.atmos_filter,
|
||||
):
|
||||
"""
|
||||
Download Tidal resources.
|
||||
@@ -135,6 +144,20 @@ def download_callback(
|
||||
|
||||
log.debug(f"{ctx.params=}")
|
||||
|
||||
def write_lrc_file(track: Track, lyrics: str, file_path: Path):
|
||||
if not CONFIG.download.write_lrc_file or not lyrics.strip():
|
||||
return
|
||||
|
||||
lrc_file_path = file_path.with_suffix(".lrc")
|
||||
|
||||
try:
|
||||
with open(lrc_file_path, "w", encoding="utf-8") as f:
|
||||
f.write(lyrics)
|
||||
except Exception as e:
|
||||
log.error(
|
||||
f"Failed to write LRC file for track {track.title} (ID: {track.id}): {e}"
|
||||
)
|
||||
|
||||
def save_m3u(
|
||||
resource_type: VALID_M3U_RESOURCE_LITERAL,
|
||||
filename: str,
|
||||
@@ -193,6 +216,8 @@ def download_callback(
|
||||
skip_existing=not SKIP_EXISTING,
|
||||
download_path=DOWNLOAD_PATH,
|
||||
scan_path=SCAN_PATH,
|
||||
match_existing_path_case=CONFIG.download.match_existing_path_case,
|
||||
dolby_atmos_filter=DOLBY_ATMOS_FILTER,
|
||||
)
|
||||
|
||||
class Metadata:
|
||||
@@ -237,7 +262,7 @@ def download_callback(
|
||||
if isinstance(item, Track):
|
||||
lyrics_subtitles = ""
|
||||
|
||||
if CONFIG.metadata.lyrics:
|
||||
if CONFIG.metadata.lyrics or CONFIG.download.write_lrc_file:
|
||||
try:
|
||||
lyrics_subtitles = ctx.obj.api.get_track_lyrics(
|
||||
item.id
|
||||
@@ -255,6 +280,8 @@ def download_callback(
|
||||
if track_metadata.cover and track_metadata.cover.data is None:
|
||||
track_metadata.cover.fetch_data()
|
||||
|
||||
write_lrc_file(item, lyrics_subtitles, download_path)
|
||||
|
||||
add_track_metadata(
|
||||
path=download_path,
|
||||
track=item,
|
||||
@@ -331,7 +358,9 @@ def download_callback(
|
||||
track_metadata=Metadata(
|
||||
cover=cover,
|
||||
date=str(album.releaseDate),
|
||||
artist=album.artist.name if album.artist else "",
|
||||
artist=(
|
||||
album.artist.name if album.artist else ""
|
||||
),
|
||||
credits=album_item.credits,
|
||||
album_review=album_review,
|
||||
),
|
||||
@@ -340,9 +369,11 @@ def download_callback(
|
||||
except ApiError as e:
|
||||
item = album_item.item
|
||||
track_info = f"Track: {getattr(item, 'title', 'Unknown')} (ID: {item.id})"
|
||||
if hasattr(item, 'album') and item.album:
|
||||
if hasattr(item, "album") and item.album:
|
||||
track_info += f", Album ID: {item.album.id}"
|
||||
ctx.obj.console.print(f"[red]API Error:[/] {e} ({track_info})")
|
||||
ctx.obj.console.print(
|
||||
f"[red]API Error:[/] {e} ({track_info})"
|
||||
)
|
||||
if RAISE_ERRORS:
|
||||
raise
|
||||
except Exception as e:
|
||||
@@ -386,6 +417,12 @@ def download_callback(
|
||||
track = ctx.obj.api.get_track(resource.id)
|
||||
album = ctx.obj.api.get_album(track.album.id)
|
||||
|
||||
cover: Cover | None = None
|
||||
save_cover = ("track" in CONFIG.cover.allowed) and CONFIG.cover.save
|
||||
|
||||
if album.cover and (CONFIG.metadata.cover or save_cover):
|
||||
cover = Cover(album.cover, size=CONFIG.cover.size)
|
||||
|
||||
await handle_item(
|
||||
item=track,
|
||||
file_path=format_template(
|
||||
@@ -394,6 +431,12 @@ def download_callback(
|
||||
album=album,
|
||||
quality=get_item_quality(track),
|
||||
),
|
||||
track_metadata=Metadata(
|
||||
cover=cover,
|
||||
date=str(album.releaseDate),
|
||||
artist=album.artist.name if album.artist else "",
|
||||
# credits are missing
|
||||
),
|
||||
)
|
||||
|
||||
if (
|
||||
@@ -414,7 +457,11 @@ def download_callback(
|
||||
video = ctx.obj.api.get_video(resource.id)
|
||||
template = TEMPLATE or CONFIG.templates.video
|
||||
|
||||
if "{album" in template and video.album and video.album.id is not None:
|
||||
if (
|
||||
"{album" in template
|
||||
and video.album
|
||||
and video.album.id is not None
|
||||
):
|
||||
album = ctx.obj.api.get_album(video.album.id)
|
||||
else:
|
||||
album = None
|
||||
@@ -462,13 +509,17 @@ def download_callback(
|
||||
except ApiError as e:
|
||||
item = mix_item.item
|
||||
track_info = f"Track: {getattr(item, 'title', 'Unknown')} (ID: {item.id})"
|
||||
ctx.obj.console.print(f"[red]API Error:[/] {e} ({track_info})")
|
||||
ctx.obj.console.print(
|
||||
f"[red]API Error:[/] {e} ({track_info})"
|
||||
)
|
||||
if RAISE_ERRORS:
|
||||
raise
|
||||
except Exception as e:
|
||||
item = mix_item.item
|
||||
track_info = f"Track: {getattr(item, 'title', 'Unknown')} (ID: {item.id})"
|
||||
ctx.obj.console.print(f"[red]Error:[/] {e} ({track_info})")
|
||||
ctx.obj.console.print(
|
||||
f"[red]Error:[/] {e} ({track_info})"
|
||||
)
|
||||
if RAISE_ERRORS:
|
||||
raise
|
||||
|
||||
@@ -499,11 +550,15 @@ def download_callback(
|
||||
try:
|
||||
await download_album(album)
|
||||
except ApiError as e:
|
||||
ctx.obj.console.print(f"[red]API Error:[/] {e} (Album: {album.title}, ID: {album.id})")
|
||||
ctx.obj.console.print(
|
||||
f"[red]API Error:[/] {e} (Album: {album.title}, ID: {album.id})"
|
||||
)
|
||||
if RAISE_ERRORS:
|
||||
raise
|
||||
except Exception as e:
|
||||
ctx.obj.console.print(f"[red]Error:[/] {e} (Album: {album.title}, ID: {album.id})")
|
||||
ctx.obj.console.print(
|
||||
f"[red]Error:[/] {e} (Album: {album.title}, ID: {album.id})"
|
||||
)
|
||||
if RAISE_ERRORS:
|
||||
raise
|
||||
|
||||
@@ -553,11 +608,15 @@ def download_callback(
|
||||
)
|
||||
)
|
||||
except ApiError as e:
|
||||
ctx.obj.console.print(f"[red]API Error:[/] {e} (Video: {video.title}, ID: {video.id})")
|
||||
ctx.obj.console.print(
|
||||
f"[red]API Error:[/] {e} (Video: {video.title}, ID: {video.id})"
|
||||
)
|
||||
if RAISE_ERRORS:
|
||||
raise
|
||||
except Exception as e:
|
||||
ctx.obj.console.print(f"[red]Error:[/] {e} (Video: {video.title}, ID: {video.id})")
|
||||
ctx.obj.console.print(
|
||||
f"[red]Error:[/] {e} (Video: {video.title}, ID: {video.id})"
|
||||
)
|
||||
if RAISE_ERRORS:
|
||||
raise
|
||||
|
||||
@@ -610,7 +669,9 @@ def download_callback(
|
||||
album=album,
|
||||
playlist=playlist,
|
||||
playlist_index=playlist_index,
|
||||
quality=get_item_quality(playlist_item.item),
|
||||
quality=get_item_quality(
|
||||
playlist_item.item
|
||||
),
|
||||
),
|
||||
track_metadata=Metadata(),
|
||||
)
|
||||
@@ -618,15 +679,19 @@ def download_callback(
|
||||
except ApiError as e:
|
||||
item = playlist_item.item
|
||||
track_info = f"Track: {getattr(item, 'title', 'Unknown')} (ID: {item.id})"
|
||||
if hasattr(item, 'album') and item.album:
|
||||
if hasattr(item, "album") and item.album:
|
||||
track_info += f", Album ID: {item.album.id}"
|
||||
ctx.obj.console.print(f"[red]API Error:[/] {e} ({track_info})")
|
||||
ctx.obj.console.print(
|
||||
f"[red]API Error:[/] {e} ({track_info})"
|
||||
)
|
||||
if RAISE_ERRORS:
|
||||
raise
|
||||
except Exception as e:
|
||||
item = playlist_item.item
|
||||
track_info = f"Track: {getattr(item, 'title', 'Unknown')} (ID: {item.id})"
|
||||
ctx.obj.console.print(f"[red]Error:[/] {e} ({track_info})")
|
||||
ctx.obj.console.print(
|
||||
f"[red]Error:[/] {e} ({track_info})"
|
||||
)
|
||||
if RAISE_ERRORS:
|
||||
raise
|
||||
|
||||
@@ -652,7 +717,7 @@ def download_callback(
|
||||
and playlist.squareImage
|
||||
):
|
||||
Cover(
|
||||
playlist.squareImage, size=max(CONFIG.cover.size, 1080)
|
||||
playlist.squareImage, size=min(CONFIG.cover.size, 1080)
|
||||
).save_to_directory(
|
||||
path=DOWNLOAD_PATH
|
||||
/ format_template(
|
||||
|
||||
@@ -7,8 +7,9 @@ from tempfile import NamedTemporaryFile
|
||||
import aiofiles
|
||||
import aiohttp
|
||||
|
||||
from tiddl.cli.config import VIDEOS_FILTER_LITERAL
|
||||
from tiddl.cli.config import VIDEOS_FILTER_LITERAL, ATMOS_FILTER_LITERAL
|
||||
from tiddl.cli.utils.download import get_existing_track_filename
|
||||
from tiddl.cli.utils.path import resolve_existing_path_case
|
||||
from tiddl.core.api import ApiError, TidalAPI
|
||||
from tiddl.core.api.models import StreamVideoQuality, Track, TrackQuality, Video
|
||||
from tiddl.core.utils import parse_track_stream, parse_video_stream
|
||||
@@ -50,6 +51,8 @@ class Downloader:
|
||||
skip_existing: bool
|
||||
download_path: Path
|
||||
scan_path: Path
|
||||
match_existing_path_case: bool
|
||||
dolby_atmos_filter: ATMOS_FILTER_LITERAL
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -62,6 +65,8 @@ class Downloader:
|
||||
skip_existing: bool,
|
||||
download_path: Path,
|
||||
scan_path: Path,
|
||||
match_existing_path_case: bool = False,
|
||||
dolby_atmos_filter: ATMOS_FILTER_LITERAL = "none",
|
||||
) -> None:
|
||||
self.api = tidal_api
|
||||
self.rich_output = rich_output
|
||||
@@ -72,6 +77,14 @@ class Downloader:
|
||||
self.skip_existing = skip_existing
|
||||
self.download_path = download_path
|
||||
self.scan_path = scan_path
|
||||
self.match_existing_path_case = match_existing_path_case
|
||||
self.dolby_atmos_filter = dolby_atmos_filter
|
||||
|
||||
def get_path(self, base_path: Path, relative_path: Path) -> Path:
|
||||
if self.match_existing_path_case:
|
||||
return resolve_existing_path_case(base_path, relative_path)
|
||||
|
||||
return base_path / relative_path
|
||||
|
||||
async def download(
|
||||
self, item: Track | Video, file_path: Path
|
||||
@@ -92,16 +105,16 @@ class Downloader:
|
||||
filename = get_existing_track_filename(
|
||||
item.audioQuality, self.track_quality, file_path
|
||||
)
|
||||
existing_file_path = self.get_path(self.scan_path, filename)
|
||||
vibrant_color = item.album.vibrantColor
|
||||
|
||||
elif isinstance(item, Video):
|
||||
filename = file_path.with_suffix(".mp4")
|
||||
existing_file_path = self.get_path(self.scan_path, filename)
|
||||
vibrant_color = item.vibrantColor
|
||||
|
||||
vibrant_color = vibrant_color or "gray"
|
||||
|
||||
existing_file_path = self.scan_path / filename
|
||||
|
||||
log.debug(f"{file_path=}, {filename=}, {existing_file_path=}")
|
||||
|
||||
result_message = "[green]Downloaded"
|
||||
@@ -134,6 +147,23 @@ class Downloader:
|
||||
stream = self.api.get_track_stream(
|
||||
track_id=item.id, quality=self.track_quality
|
||||
)
|
||||
|
||||
log.debug(
|
||||
f"{stream.trackId=}, {stream.audioQuality=}, {stream.audioMode=}"
|
||||
)
|
||||
|
||||
if (
|
||||
self.dolby_atmos_filter == "none"
|
||||
and stream.audioMode == "DOLBY_ATMOS"
|
||||
) or (
|
||||
self.dolby_atmos_filter == "only"
|
||||
and stream.audioMode == "STEREO"
|
||||
):
|
||||
self.rich_output.console.print(
|
||||
f"[blue]Skipping[/] [gray]{item.title}[/] [blue]due to Dolby Atmos filter[/] {self.dolby_atmos_filter}"
|
||||
)
|
||||
return None, False
|
||||
|
||||
except ApiError as e:
|
||||
log.error(f"{item.id=} {e=}")
|
||||
self.rich_output.console.print(
|
||||
@@ -142,15 +172,21 @@ class Downloader:
|
||||
return None, False
|
||||
|
||||
urls, _ = parse_track_stream(stream)
|
||||
download_path = self.download_path / filename
|
||||
download_path = self.get_path(self.download_path, filename)
|
||||
|
||||
quality = track_qualities_color[stream.audioQuality]
|
||||
quality_string = track_qualities_color[stream.audioQuality]
|
||||
|
||||
if stream.audioQuality in ["HI_RES_LOSSLESS", "LOSSLESS"]:
|
||||
quality = f"{quality} {stream.bitDepth}-bit, {(stream.sampleRate or 0) / 1000:.1f} kHz"
|
||||
|
||||
if stream.audioQuality == "HI_RES_LOSSLESS":
|
||||
if (
|
||||
stream.audioQuality in ["HI_RES_LOSSLESS", "LOSSLESS"]
|
||||
and stream.audioMode == "STEREO"
|
||||
):
|
||||
quality_string = f"{quality_string} {stream.bitDepth}-bit, {(stream.sampleRate or 0) / 1000:.1f} kHz"
|
||||
should_extract_flac = True
|
||||
else:
|
||||
download_path = download_path.with_suffix(".m4a")
|
||||
|
||||
if stream.audioMode == "DOLBY_ATMOS":
|
||||
quality_string = "[blue]Dolby Atmos[/]"
|
||||
|
||||
elif isinstance(item, Video):
|
||||
stream = self.api.get_video_stream(
|
||||
@@ -158,11 +194,13 @@ class Downloader:
|
||||
)
|
||||
|
||||
urls, ext = parse_video_stream(stream), ".ts"
|
||||
download_path = (self.download_path / filename).with_suffix(ext)
|
||||
quality = video_qualities_color[stream.videoQuality]
|
||||
download_path = self.get_path(self.download_path, filename).with_suffix(
|
||||
ext
|
||||
)
|
||||
quality_string = video_qualities_color[stream.videoQuality]
|
||||
|
||||
task_id = self.rich_output.download_start(
|
||||
f"[{vibrant_color}]{item.title} {quality}"
|
||||
f"[{vibrant_color}]{item.title} {quality_string}"
|
||||
)
|
||||
|
||||
download_path.parent.mkdir(exist_ok=True, parents=True)
|
||||
@@ -187,6 +225,11 @@ class Downloader:
|
||||
|
||||
shutil.move(tmp.name, download_path)
|
||||
|
||||
try:
|
||||
download_path.chmod(0o644)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
try:
|
||||
if isinstance(item, Track) and should_extract_flac:
|
||||
download_path = extract_flac(download_path)
|
||||
|
||||
@@ -2,9 +2,10 @@ from typer import Typer
|
||||
|
||||
from .url import url_subcommand
|
||||
from .fav import fav_subcommand
|
||||
from .search import search_subcommand
|
||||
|
||||
|
||||
SUBCOMMANDS: list[Typer] = [url_subcommand, fav_subcommand]
|
||||
SUBCOMMANDS: list[Typer] = [url_subcommand, fav_subcommand, search_subcommand]
|
||||
|
||||
|
||||
def register_subcommands(app: Typer):
|
||||
|
||||
@@ -0,0 +1,144 @@
|
||||
import typer
|
||||
from typing_extensions import Annotated
|
||||
|
||||
from tiddl.cli.ctx import Context
|
||||
from tiddl.cli.utils.resource import TidalResource
|
||||
from tiddl.core.api.models.base import Search, SearchArtist
|
||||
from tiddl.core.api.models.resources import Track, Album, Playlist, Video
|
||||
|
||||
from rich.panel import Panel
|
||||
from rich.table import Table
|
||||
|
||||
search_subcommand = typer.Typer()
|
||||
|
||||
|
||||
@search_subcommand.command(
|
||||
no_args_is_help=True,
|
||||
)
|
||||
def search(
|
||||
ctx: Context,
|
||||
query: Annotated[str, typer.Argument()],
|
||||
resource_types: Annotated[
|
||||
list[str],
|
||||
typer.Option(
|
||||
"-t",
|
||||
"--types",
|
||||
metavar="<resource>",
|
||||
help="Narrow resource types, usage: -t track -t album etc. Available resources: track, video, album, playlist, artist.",
|
||||
),
|
||||
] = ["track", "video", "album", "playlist", "artist"],
|
||||
number_top_results: Annotated[
|
||||
int,
|
||||
typer.Option(
|
||||
"--num-top",
|
||||
"-n",
|
||||
help="Number of top results to display per resource type.",
|
||||
),
|
||||
] = 3,
|
||||
pick_top_hit: Annotated[
|
||||
bool,
|
||||
typer.Option(
|
||||
"--top",
|
||||
"-T",
|
||||
help="Automatically pick the top hit if it exists and matches the specified resource types.",
|
||||
),
|
||||
] = False,
|
||||
):
|
||||
"""
|
||||
Search Tidal for tracks, videos, albums, playlists, artists, and mixes.
|
||||
|
||||
By default, it searches for all resource types. You can specify which resource types to search for using the `--type` option.
|
||||
"""
|
||||
|
||||
results: Search = ctx.obj.api.get_search(query=query)
|
||||
table = _prepare_table(query)
|
||||
|
||||
results_to_display = []
|
||||
if results.topHit is not None:
|
||||
top_hit = results.topHit
|
||||
top_hit_type = top_hit.type.rstrip("S").lower() # "ARTISTS" -> "artist"
|
||||
if top_hit_type in resource_types:
|
||||
if pick_top_hit:
|
||||
ctx.obj.resources.append(
|
||||
TidalResource.from_string(
|
||||
f"{top_hit_type}/{_display_id(top_hit.value)}"
|
||||
)
|
||||
)
|
||||
ctx.obj.console.print(
|
||||
f"[green]Automatically added top hit: {top_hit.type.title()} '{_display_name(top_hit.value)}'"
|
||||
)
|
||||
return
|
||||
else:
|
||||
results_to_display.append(
|
||||
(
|
||||
top_hit_type.title(),
|
||||
_display_name(top_hit.value),
|
||||
_display_id(top_hit.value),
|
||||
)
|
||||
)
|
||||
|
||||
type_to_items = {
|
||||
"artist": results.artists.items,
|
||||
"album": results.albums.items,
|
||||
"playlist": results.playlists.items,
|
||||
"track": results.tracks.items,
|
||||
"video": results.videos.items,
|
||||
}
|
||||
|
||||
for resource_type, items in type_to_items.items():
|
||||
if resource_type in resource_types:
|
||||
results_to_display.extend(
|
||||
(resource_type.title(), _display_name(item), _display_id(item))
|
||||
for item in items[:number_top_results]
|
||||
)
|
||||
|
||||
for i, (resource_type, name, id) in enumerate(results_to_display, start=1):
|
||||
table.add_row(str(i), resource_type, name, id)
|
||||
|
||||
panel = Panel(table, title="Search Results", highlight=True, expand=True)
|
||||
ctx.obj.console.print(panel)
|
||||
selection = ctx.obj.console.input(
|
||||
"[bold green]Enter the number of the resource to add to your list (comma-separated for multiple, q/empty = quit): "
|
||||
)
|
||||
selected_numbers = [s.strip() for s in selection.split(",")]
|
||||
|
||||
for num in selected_numbers:
|
||||
if num.lower() == "q":
|
||||
return
|
||||
|
||||
if not num.isdigit() or int(num) < 1 or int(num) > len(results_to_display):
|
||||
ctx.obj.console.print(f"[red]Invalid selection: {num}")
|
||||
continue
|
||||
|
||||
selected_resource = results_to_display[int(num) - 1]
|
||||
resource_type, name, id = selected_resource
|
||||
ctx.obj.resources.append(
|
||||
TidalResource.from_string(f"{resource_type.lower()}/{id}")
|
||||
)
|
||||
ctx.obj.console.print(f"[green]Added {resource_type} '{name}' to your list")
|
||||
|
||||
|
||||
def _display_name(item) -> str:
|
||||
if isinstance(item, SearchArtist):
|
||||
return item.name
|
||||
elif isinstance(item, Video):
|
||||
return f"{item.artist or item.artists[0].name or ""} - {item.title}"
|
||||
elif isinstance(item, (Track, Album)):
|
||||
return f"{item.artist or item.artists[0].name or ""} - {item.title} [blue][{', '.join(item.audioModes)}][/]"
|
||||
elif isinstance(item, (Playlist)):
|
||||
return item.title
|
||||
else:
|
||||
raise ValueError("Unknown item type")
|
||||
|
||||
|
||||
def _display_id(item) -> str:
|
||||
return item.uuid if isinstance(item, Playlist) else str(item.id)
|
||||
|
||||
|
||||
def _prepare_table(query: str) -> Table:
|
||||
table = Table(title=f"{query}", expand=True)
|
||||
table.add_column("#", style="yellow", ratio=1)
|
||||
table.add_column("Type", style="cyan", ratio=1)
|
||||
table.add_column("Title", style="green", ratio=8)
|
||||
table.add_column("ID", style="magenta", ratio=2)
|
||||
return table
|
||||
@@ -14,6 +14,7 @@ ARTIST_SINGLES_FILTER_LITERAL = Literal["none", "only", "include"]
|
||||
VALID_M3U_RESOURCE_LITERAL = Literal["album", "playlist", "mix"]
|
||||
VALID_RESOURCE_COVER_SAVE_LITERAL = Literal["track", "album", "playlist"]
|
||||
VIDEOS_FILTER_LITERAL = Literal["none", "only", "allow"]
|
||||
ATMOS_FILTER_LITERAL = Literal["none", "only", "allow"]
|
||||
|
||||
log = getLogger(__name__)
|
||||
|
||||
@@ -55,6 +56,9 @@ class Config(BaseModel):
|
||||
videos_filter: VIDEOS_FILTER_LITERAL = "none"
|
||||
update_mtime: bool = False
|
||||
rewrite_metadata: bool = False
|
||||
write_lrc_file: bool = False
|
||||
match_existing_path_case: bool = False
|
||||
atmos_filter: ATMOS_FILTER_LITERAL = "none"
|
||||
|
||||
def model_post_init(self, __context):
|
||||
# set scan path to download path when download path is non default
|
||||
|
||||
@@ -0,0 +1,38 @@
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def resolve_existing_path_case(base_path: Path, relative_path: Path) -> Path:
|
||||
"""
|
||||
Return base_path / relative_path, reusing existing path component casing.
|
||||
"""
|
||||
|
||||
if relative_path.is_absolute():
|
||||
raise ValueError("relative_path must not be absolute")
|
||||
|
||||
resolved_path = base_path
|
||||
|
||||
for part in relative_path.parts:
|
||||
if part in ("", "."):
|
||||
continue
|
||||
|
||||
existing_part = find_existing_child_case(resolved_path, part)
|
||||
resolved_path = resolved_path / (existing_part or part)
|
||||
|
||||
return resolved_path
|
||||
|
||||
|
||||
def find_existing_child_case(parent: Path, name: str) -> str | None:
|
||||
if not parent.is_dir():
|
||||
return None
|
||||
|
||||
casefolded_name = name.casefold()
|
||||
fallback: str | None = None
|
||||
|
||||
for child in parent.iterdir():
|
||||
if child.name == name:
|
||||
return child.name
|
||||
|
||||
if fallback is None and child.name.casefold() == casefolded_name:
|
||||
fallback = child.name
|
||||
|
||||
return fallback
|
||||
@@ -4,7 +4,6 @@ from pydantic import BaseModel
|
||||
|
||||
from .resources import (
|
||||
Album,
|
||||
Artist,
|
||||
Playlist,
|
||||
StreamVideoQuality,
|
||||
Track,
|
||||
@@ -125,7 +124,7 @@ class Favorites(BaseModel):
|
||||
class TrackStream(BaseModel):
|
||||
trackId: int
|
||||
assetPresentation: Literal["FULL"]
|
||||
audioMode: Literal["STEREO"]
|
||||
audioMode: Literal["STEREO", "DOLBY_ATMOS"]
|
||||
audioQuality: TrackQuality
|
||||
manifestMimeType: Literal["application/dash+xml", "application/vnd.tidal.bts"]
|
||||
manifestHash: str
|
||||
@@ -149,10 +148,21 @@ class VideoStream(BaseModel):
|
||||
manifest: str
|
||||
|
||||
|
||||
# It seemed like the search API doesn't return `artist.type`, so this is used instead of resources.Artist for search results to avoid validation errors.
|
||||
# FIXME: This can be discarded if we are okay with making the `type` field optional in resources.Artist, but I don't think it's my decision to make lol
|
||||
class SearchArtist(BaseModel): # search-specific, fewer required fields
|
||||
id: int
|
||||
name: str
|
||||
type: Optional[Literal["MAIN", "FEATURED"]] = None
|
||||
url: Optional[str] = None
|
||||
picture: Optional[str] = None
|
||||
popularity: Optional[int] = None
|
||||
|
||||
class Search(BaseModel):
|
||||
|
||||
|
||||
class Artists(Items):
|
||||
items: List[Artist]
|
||||
items: List[SearchArtist] # ← uses the inner model, not resources.Artist
|
||||
|
||||
class Albums(Items):
|
||||
items: List[Album]
|
||||
@@ -167,7 +177,7 @@ class Search(BaseModel):
|
||||
items: List[Video]
|
||||
|
||||
class TopHit(BaseModel):
|
||||
value: Union[Artist, Track, Playlist, Album]
|
||||
value: Union[SearchArtist, Track, Playlist, Album]
|
||||
type: Literal["ARTISTS", "TRACKS", "PLAYLISTS", "ALBUMS"]
|
||||
|
||||
artists: Artists
|
||||
|
||||
@@ -125,7 +125,7 @@ class Album(BaseModel):
|
||||
numberOfTracks: int
|
||||
numberOfVideos: int
|
||||
numberOfVolumes: int
|
||||
releaseDate: datetime
|
||||
releaseDate: datetime | None = None
|
||||
copyright: Optional[str] = None
|
||||
type: Literal["ALBUM", "SINGLE", "EP"]
|
||||
version: Optional[str] = None
|
||||
@@ -134,7 +134,7 @@ class Album(BaseModel):
|
||||
vibrantColor: Optional[str] = None
|
||||
videoCover: Optional[str] = None
|
||||
explicit: bool
|
||||
upc: str
|
||||
upc: Optional[str] = None
|
||||
popularity: int
|
||||
audioQuality: str
|
||||
audioModes: List[str]
|
||||
@@ -163,7 +163,7 @@ class Playlist(BaseModel):
|
||||
url: str
|
||||
image: Optional[str] = None
|
||||
popularity: int
|
||||
squareImage: str
|
||||
squareImage: Optional[str] = None
|
||||
promotedArtists: List[Album.Artist]
|
||||
lastItemAddedAt: Optional[str] = None
|
||||
|
||||
|
||||
@@ -1,17 +1,20 @@
|
||||
import base64
|
||||
import logging
|
||||
from os import environ
|
||||
from requests import request
|
||||
from typing import Any, TypeAlias
|
||||
|
||||
from tiddl.core.auth.exceptions import AuthClientError
|
||||
|
||||
log = logging.getLogger("tiddl")
|
||||
|
||||
|
||||
def get_auth_credentials() -> tuple[str, str]:
|
||||
ENV_KEY = "TIDDL_AUTH"
|
||||
|
||||
client_id, client_secret = (
|
||||
base64.b64decode(
|
||||
"ZlgySnhkbW50WldLMGl4VDsxTm45QWZEQWp4cmdKRkpiS05XTGVBeUtHVkdtSU51WFBQTEhWWEF2eEFnPQ=="
|
||||
"NE4zbjZRMXg5NUxMNUs3cDtvS09YZkpXMzcxY1g2eGFaMFB5aGdHTkJkTkxsQlpkNEFLS1lvdWdNamlrPQ=="
|
||||
)
|
||||
.decode()
|
||||
.split(";")
|
||||
@@ -22,6 +25,8 @@ def get_auth_credentials() -> tuple[str, str]:
|
||||
if env_value:
|
||||
client_id, client_secret = env_value.split(";")
|
||||
|
||||
log.debug(f"{client_id=}, {bool(env_value)=}")
|
||||
|
||||
return client_id, client_secret
|
||||
|
||||
|
||||
|
||||
@@ -23,9 +23,9 @@ class AuthResponse(BaseModel):
|
||||
acceptedEULA: bool
|
||||
created: int | str
|
||||
updated: int | str
|
||||
facebookUid: int
|
||||
appleUid: Optional[str]
|
||||
googleUid: Optional[str]
|
||||
facebookUid: Optional[int] = None
|
||||
appleUid: Optional[str] = None
|
||||
googleUid: Optional[str] = None
|
||||
accountLinkCreated: bool
|
||||
emailVerified: bool
|
||||
newUser: bool
|
||||
|
||||
@@ -50,6 +50,10 @@ class Cover:
|
||||
if not self.data:
|
||||
self.data = self.fetch_data()
|
||||
|
||||
if not self.data:
|
||||
log.debug(f"cover data is empty ({file})")
|
||||
return
|
||||
|
||||
file.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
try:
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
import logging
|
||||
import unicodedata
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
@@ -9,6 +12,9 @@ from mutagen.mp4 import MP4 as MutagenMP4, MP4Cover
|
||||
from tiddl.core.api.models import AlbumItemsCredits, Track
|
||||
|
||||
|
||||
log = logging.getLogger("tiddl")
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class Metadata:
|
||||
title: str
|
||||
@@ -30,6 +36,8 @@ class Metadata:
|
||||
|
||||
|
||||
def add_flac_metadata(track_path: Path, metadata: Metadata) -> None:
|
||||
log.debug(f"{track_path=}")
|
||||
|
||||
mutagen = MutagenFLAC(track_path)
|
||||
|
||||
if metadata.cover_data:
|
||||
@@ -125,6 +133,36 @@ def sort_credits_contributors(
|
||||
)
|
||||
|
||||
|
||||
def normalize_credits_keys(
|
||||
entries: list[AlbumItemsCredits.ItemWithCredits.CreditsEntry],
|
||||
) -> None:
|
||||
valid_entries: list[AlbumItemsCredits.ItemWithCredits.CreditsEntry] = []
|
||||
|
||||
for entry in entries:
|
||||
try:
|
||||
raw_key = entry.type.upper()
|
||||
|
||||
safe_key = (
|
||||
# NFKD splits accented chars (É → E + combining accent),
|
||||
unicodedata.normalize("NFKD", raw_key)
|
||||
.encode("ascii", "ignore")
|
||||
.decode("ascii")
|
||||
.replace("=", "")
|
||||
.strip()
|
||||
)
|
||||
|
||||
entry.type = safe_key
|
||||
|
||||
if safe_key:
|
||||
valid_entries.append(entry)
|
||||
|
||||
except Exception as e:
|
||||
log.debug(f"Skipping invalid credit tag '{entry.type}': {e}")
|
||||
|
||||
# replace the contents of the original list
|
||||
entries[:] = valid_entries
|
||||
|
||||
|
||||
def add_track_metadata(
|
||||
path: Path,
|
||||
track: Track,
|
||||
@@ -143,6 +181,7 @@ def add_track_metadata(
|
||||
credits_contributors = []
|
||||
|
||||
sort_credits_contributors(credits_contributors)
|
||||
normalize_credits_keys(credits_contributors)
|
||||
|
||||
metadata = Metadata(
|
||||
title=f"{track.title} ({track.version})" if track.version else track.title,
|
||||
|
||||
@@ -2,9 +2,25 @@ import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def run(cmd: list[str]):
|
||||
"""Run process without printing to terminal"""
|
||||
subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
||||
class FFmpegError(RuntimeError):
|
||||
pass
|
||||
|
||||
|
||||
def run(cmd: list[str]) -> subprocess.CompletedProcess:
|
||||
"""Run a process; raise `FFmpegError` on non-zero exit with stderr."""
|
||||
# Force UTF-8 encoding to prevent UnicodeDecodeError on Windows
|
||||
r = subprocess.run(
|
||||
cmd,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
encoding="utf-8",
|
||||
errors="replace" # Added as a safety net
|
||||
)
|
||||
if r.returncode != 0:
|
||||
raise FFmpegError(
|
||||
f"{cmd[0]} failed (rc={r.returncode}): {r.stderr.strip()}"
|
||||
)
|
||||
return r
|
||||
|
||||
|
||||
def is_ffmpeg_installed() -> bool:
|
||||
@@ -13,10 +29,25 @@ def is_ffmpeg_installed() -> bool:
|
||||
try:
|
||||
run(["ffmpeg", "-version"])
|
||||
return True
|
||||
except FileNotFoundError:
|
||||
except (FileNotFoundError, FFmpegError):
|
||||
return False
|
||||
|
||||
|
||||
def _probe_audio_codec(source: Path) -> str:
|
||||
"""Return first audio stream's codec_name, or "" if ffprobe is unavailable."""
|
||||
try:
|
||||
r = run([
|
||||
"ffprobe", "-v", "error",
|
||||
"-select_streams", "a:0",
|
||||
"-show_entries", "stream=codec_name",
|
||||
"-of", "default=noprint_wrappers=1:nokey=1",
|
||||
str(source),
|
||||
])
|
||||
return r.stdout.strip()
|
||||
except (FileNotFoundError, FFmpegError):
|
||||
return ""
|
||||
|
||||
|
||||
def convert_to_mp4(source: Path) -> Path:
|
||||
output_path = source.with_suffix(".mp4")
|
||||
|
||||
@@ -29,13 +60,26 @@ def convert_to_mp4(source: Path) -> Path:
|
||||
|
||||
def extract_flac(source: Path) -> Path:
|
||||
"""
|
||||
Extracts flac audio from mp4 container
|
||||
Extract FLAC audio from an MP4 container.
|
||||
|
||||
Tidal can serve AAC-in-MP4 for tracks without a lossless master, so the
|
||||
input may not actually contain FLAC.
|
||||
"""
|
||||
|
||||
codec = _probe_audio_codec(source)
|
||||
if codec and codec != "flac":
|
||||
target = source.with_suffix(".m4a")
|
||||
if target != source:
|
||||
source.replace(target)
|
||||
return target
|
||||
|
||||
target = source.with_suffix(".flac")
|
||||
tmp = source.with_suffix(".tmp.flac")
|
||||
|
||||
run(["ffmpeg", "-y", "-i", str(source), "-c", "copy", str(tmp)])
|
||||
|
||||
tmp.replace(source.with_suffix(".flac"))
|
||||
tmp.replace(target)
|
||||
if source != target and source.exists():
|
||||
source.unlink()
|
||||
|
||||
return source.with_suffix(".flac")
|
||||
return target
|
||||
|
||||
@@ -165,7 +165,7 @@ def generate_template_data(
|
||||
artists=", ".join(
|
||||
a.name for a in (album.artists or []) if a.type == "MAIN"
|
||||
),
|
||||
date=album.releaseDate,
|
||||
date=album.releaseDate or datetime.min,
|
||||
explicit=Explicit(getattr(album, "explicit", None)),
|
||||
master=UserFormat(
|
||||
"HIRES_LOSSLESS" in album.mediaMetadata.tags and quality == "MAX"
|
||||
|
||||
@@ -6,6 +6,8 @@ from xml.etree.ElementTree import fromstring
|
||||
|
||||
from tiddl.core.api.models import TrackStream, VideoStream
|
||||
|
||||
DOLBY_CODECS = ["eac3", "ac4"]
|
||||
|
||||
|
||||
def parse_manifest_XML(xml_content: str):
|
||||
"""
|
||||
@@ -80,7 +82,7 @@ def parse_track_stream(track_stream: TrackStream) -> tuple[list[str], str]:
|
||||
file_extension = ".flac"
|
||||
if track_stream.audioQuality == "HI_RES_LOSSLESS":
|
||||
file_extension = ".m4a"
|
||||
elif codecs.startswith("mp4"):
|
||||
elif codecs.startswith("mp4") or codecs in DOLBY_CODECS:
|
||||
file_extension = ".m4a"
|
||||
else:
|
||||
raise ValueError(f"Unknown codecs `{codecs}` (trackId {track_stream.trackId}")
|
||||
|
||||
Reference in New Issue
Block a user