Refactor song decryption and staging

This commit is contained in:
Rafael Moraes
2026-02-25 14:08:35 -03:00
parent b3b5e6d1b2
commit afe42848d0
+23 -121
View File
@@ -4,8 +4,7 @@ from ..interface.enums import SongCodec, SyncedLyricsFormat
from ..interface.interface_song import AppleMusicSongInterface
from ..interface.types import DecryptionKeyAv
from ..utils import async_subprocess
from .amdecrypt import decrypt_file
from .constants import DEFAULT_SONG_DECRYPTION_KEY
from .amdecrypt import decrypt_file, decrypt_file_hex
from .downloader_base import AppleMusicBaseDownloader
from .enums import RemuxMode
from .types import DownloadItem
@@ -142,93 +141,6 @@ class AppleMusicSongDownloader(AppleMusicBaseDownloader):
return download_item
def fix_key_id(self, input_path: str):
count = 0
with open(input_path, "rb+") as file:
while data := file.read(4096):
pos = file.tell()
i = 0
while tenc := max(0, data.find(b"tenc", i)):
kid = tenc + 12
file.seek(max(0, pos - 4096) + kid, 0)
file.write(bytes.fromhex(f"{count:032}"))
count += 1
i = kid + 1
file.seek(pos, 0)
async def remux_mp4box(self, input_path: str, output_path: str):
await async_subprocess(
self.full_mp4box_path,
"-quiet",
"-add",
input_path,
"-itags",
"artist=placeholder",
"-keep-utc",
"-new",
output_path,
silent=self.silent,
)
async def remux_ffmpeg(
self,
input_path: str,
output_path: str,
decryption_key: str = None,
):
if decryption_key:
key = [
"-decryption_key",
decryption_key,
]
else:
key = []
await async_subprocess(
self.full_ffmpeg_path,
"-loglevel",
"error",
"-y",
*key,
"-i",
input_path,
"-c",
"copy",
"-movflags",
"+faststart",
output_path,
silent=self.silent,
)
async def decrypt_mp4decrypt(
self,
input_path: str,
output_path: str,
decryption_key: str,
legacy: bool,
):
if legacy:
keys = [
"--key",
f"1:{decryption_key}",
]
else:
self.fix_key_id(input_path)
keys = [
"--key",
"0" * 31 + "1" + f":{decryption_key}",
"--key",
"0" * 32 + f":{DEFAULT_SONG_DECRYPTION_KEY}",
]
await async_subprocess(
self.full_mp4decrypt_path,
*keys,
input_path,
output_path,
silent=self.silent,
)
async def decrypt_amdecrypt(
self,
input_path: str,
@@ -244,46 +156,43 @@ class AppleMusicSongDownloader(AppleMusicBaseDownloader):
output_path,
)
async def decrypt_amdecrypt_hex(
self,
input_path: str,
output_path: str,
decryption_key: str,
legacy: bool = False,
) -> None:
await decrypt_file_hex(
input_path,
output_path,
decryption_key,
legacy=legacy,
)
async def stage(
self,
encrypted_path: str,
decrypted_path: str,
staged_path: str,
decryption_key: DecryptionKeyAv,
codec: SongCodec,
media_id: str,
fairplay_key: str,
):
if codec.is_legacy() and self.remux_mode == RemuxMode.FFMPEG:
await self.remux_ffmpeg(
encrypted_path,
staged_path,
decryption_key.audio_track.key,
)
elif codec.is_legacy() or not self.use_wrapper:
await self.decrypt_mp4decrypt(
encrypted_path,
decrypted_path,
decryption_key.audio_track.key,
codec.is_legacy(),
)
if self.remux_mode == RemuxMode.FFMPEG:
await self.remux_ffmpeg(
decrypted_path,
staged_path,
)
else:
await self.remux_mp4box(
decrypted_path,
staged_path,
)
else:
if self.use_wrapper:
await self.decrypt_amdecrypt(
encrypted_path,
staged_path,
media_id,
fairplay_key,
)
else:
await self.decrypt_amdecrypt_hex(
encrypted_path,
staged_path,
decryption_key.audio_track.key,
legacy=codec.is_legacy(),
)
def get_lyrics_synced_path(self, final_path: str) -> str:
return str(Path(final_path).with_suffix("." + self.synced_lyrics_format.value))
@@ -321,15 +230,8 @@ class AppleMusicSongDownloader(AppleMusicBaseDownloader):
encrypted_path,
)
decrypted_path = self.get_temp_path(
download_item.media_metadata["id"],
download_item.random_uuid,
"decrypted",
".m4a",
)
await self.stage(
encrypted_path,
decrypted_path,
download_item.staged_path,
download_item.decryption_key,
self.codec,