Refactor path sanitization and formatting logic

This commit is contained in:
Rafael Moraes
2026-01-15 22:11:14 -03:00
parent 52324d519c
commit e1c8cb51ad
2 changed files with 112 additions and 45 deletions
+93 -42
View File
@@ -10,7 +10,7 @@ from yt_dlp import YoutubeDL
from ..interface.enums import CoverFormat
from ..interface.types import MediaTags, PlaylistTags
from ..utils import async_subprocess
from ..utils import CustomStringFormatter, async_subprocess
from .constants import ILLEGAL_CHAR_REPLACEMENT, ILLEGAL_CHARS_RE, TEMP_PATH_TEMPLATE
from .enums import DownloadMode, RemuxMode
from .hardcoded_wvd import HARDCODED_WVD
@@ -137,63 +137,116 @@ class AppleMusicBaseDownloader:
/ (f"{media_id}_{file_tag}" + file_extension)
)
def get_sanitized_string(self, dirty_string: str, is_folder: bool) -> str:
dirty_string = re.sub(
def sanitize_string(
self,
dirty_string: str,
file_ext: str = None,
) -> str:
sanitized_string = re.sub(
ILLEGAL_CHARS_RE,
ILLEGAL_CHAR_REPLACEMENT,
dirty_string,
)
if is_folder:
dirty_string = dirty_string[: self.truncate]
if dirty_string.endswith("."):
dirty_string = dirty_string[:-1] + ILLEGAL_CHAR_REPLACEMENT
).strip()
if file_ext is None:
sanitized_string = sanitized_string[: self.truncate]
if sanitized_string.endswith("."):
sanitized_string = sanitized_string[:-1] + ILLEGAL_CHAR_REPLACEMENT
else:
if self.truncate is not None:
dirty_string = dirty_string[: self.truncate - 4]
return dirty_string.strip()
sanitized_string = sanitized_string[: self.truncate - len(file_ext)]
sanitized_string += file_ext
return sanitized_string.strip()
def get_final_path(
self,
tags: MediaTags,
file_extension: str,
playlist_tags: PlaylistTags,
playlist_tags: PlaylistTags | None,
) -> str:
if tags.album is not None:
"""
Available tags:
album
album_artist
album_id
artist
artist_id
title
title_id
composer
composer_id
track
track_total
disc
disc_total
date
playlist_artist
playlist_id
playlist_title
playlist_track
media_type
"""
if tags.album:
template_folder = (
self.compilation_folder_template.split("/")
if tags.compilation
else self.album_folder_template.split("/")
)
template_file = (
self.multi_disc_file_template.split("/")
if tags.disc_total > 1
else self.single_disc_file_template.split("/")
)
else:
template_folder = self.no_album_folder_template.split("/")
template_file = self.no_album_file_template.split("/")
template_final = template_folder + template_file
tags_dict = tags.__dict__.copy()
if playlist_tags:
tags_dict.update(playlist_tags.__dict__)
return str(
Path(
self.output_path,
*[
self.get_sanitized_string(i.format(**tags_dict), True)
for i in template_final[0:-1]
],
(
self.get_sanitized_string(
template_final[-1].format(**tags_dict), False
)
+ file_extension
),
)
template_file = (
self.multi_disc_file_template.split("/")
if isinstance(tags.disc_total, int) and tags.disc_total > 1
else self.single_disc_file_template.split("/")
)
template_parts = template_folder + template_file
formatted_parts = []
for i, part in enumerate(template_parts):
is_folder = i < len(template_parts) - 1
formatted_part = CustomStringFormatter().format(
part,
album=(tags.album, "Unknown Album"),
album_artist=(tags.album_artist, "Unknown Artist"),
album_id=(tags.album_id, "Unknown Album ID"),
artist=(tags.artist, "Unknown Artist"),
artist_id=(tags.artist_id, "Unknown Artist ID"),
composer=(tags.composer, "Unknown Composer"),
composer_id=(tags.composer_id, "Unknown Composer ID"),
date=(tags.date, "Unknown Date"),
disc=(tags.disc, ""),
disc_total=(tags.disc_total, ""),
media_type=(tags.media_type, "Unknown Media Type"),
playlist_artist=(
(playlist_tags.playlist_artist if playlist_tags else None),
"Unknown Playlist Artist",
),
playlist_id=(
(playlist_tags.playlist_id if playlist_tags else None),
"Unknown Playlist ID",
),
playlist_title=(
(playlist_tags.playlist_title if playlist_tags else None),
"Unknown Playlist Title",
),
playlist_track=(
(playlist_tags.playlist_track if playlist_tags else None),
"",
),
title=(tags.title, "Unknown Title"),
title_id=(tags.title_id, "Unknown Title ID"),
track=(tags.track, ""),
track_total=(tags.track_total, ""),
)
sanitized_formatted_part = self.sanitize_string(
formatted_part,
file_extension if not is_folder else None,
)
formatted_parts.append(sanitized_formatted_part)
return str(Path(self.output_path, *formatted_parts))
async def download_stream(self, stream_url: str, download_path: str):
if self.download_mode == DownloadMode.YTDLP:
@@ -346,13 +399,11 @@ class AppleMusicBaseDownloader:
Path(
self.output_path,
*[
self.get_sanitized_string(i.format(**tags_dict), True)
self.sanitize_string(i.format(**tags_dict), True)
for i in template_file[0:-1]
],
*[
self.get_sanitized_string(
template_file[-1].format(**tags_dict), False
)
self.sanitize_string(template_file[-1].format(**tags_dict), False)
+ ".m3u8"
],
)
+19 -3
View File
@@ -1,7 +1,8 @@
import json
import typing
import subprocess
import asyncio
import json
import string
import subprocess
import typing
import httpx
@@ -79,3 +80,18 @@ async def sequential_gather(
if interval > 0 and i < len(tasks) - 1:
await asyncio.sleep(interval)
return results
class CustomStringFormatter(string.Formatter):
def format_field(self, value: typing.Any, format_spec: str) -> str:
if isinstance(value, tuple) and len(value) == 2:
actual_value, fallback_value = value
if actual_value is None:
return fallback_value
try:
return super().format_field(actual_value, format_spec)
except Exception:
return fallback_value
return super().format_field(value, format_spec)