Add use_single_content_key and use_cenc options

This commit is contained in:
Rafael Moraes
2026-05-20 16:57:15 -03:00
parent 3dd829b38c
commit da7346f704
+63 -23
View File
@@ -1013,6 +1013,8 @@ async def decrypt_samples(
samples: List[SampleInfo],
encryption_info: EncryptionInfo,
encryption_info_per_desc: Optional[dict] = None,
*,
use_single_content_key: bool = False,
progress_callback=None,
) -> bytes:
"""
@@ -1030,10 +1032,13 @@ async def decrypt_samples(
Args:
wrapper_ip: wrapper-v2 base URL, e.g. ``http://127.0.0.1:80`` or legacy ``host:port``
use_single_content_key: When ``False`` (default), description 0 uses the built-in
prefetch key locally and description 1+ uses ``fairplay_key`` via the wrapper.
When ``True``, every sample uses ``fairplay_key`` through the wrapper only.
progress_callback: Optional callback(current_sample, total_samples, bytes_processed, speed)
"""
base_url = _wrapper_v2_base_url(wrapper_ip)
keys = [PREFETCH_KEY, fairplay_key]
keys = [fairplay_key] if use_single_content_key else [PREFETCH_KEY, fairplay_key]
decrypted_data = bytearray()
last_desc_index: int = 255
total_samples = len(samples)
@@ -1068,12 +1073,16 @@ async def decrypt_samples(
for i, sample in enumerate(samples):
if last_desc_index != sample.desc_index:
await flush_crypto_batch(client)
key_uri = keys[min(sample.desc_index, len(keys) - 1)]
segment_adam = "0" if key_uri == PREFETCH_KEY else track_id
segment_uri = key_uri
if use_single_content_key:
segment_adam = track_id
segment_uri = fairplay_key
else:
key_uri = keys[min(sample.desc_index, len(keys) - 1)]
segment_adam = "0" if key_uri == PREFETCH_KEY else track_id
segment_uri = key_uri
last_desc_index = sample.desc_index
if segment_adam == "0":
if not use_single_content_key and segment_adam == "0":
await flush_crypto_batch(client)
enc_info = (
encryption_info_per_desc.get(sample.desc_index)
@@ -1580,32 +1589,55 @@ def mux_decrypted_mp4_tracks(
logger.debug(f"Muxed decrypted AV file to {output_path}")
def _encryption_info_for_hex_decrypt(
track_info: SongInfo,
*,
use_cenc: bool,
) -> EncryptionInfo:
"""Resolve encryption scheme for hex decrypt (moov metadata or defaults)."""
base = track_info.encryption_info or EncryptionInfo(scheme_type="cbcs")
if not use_cenc:
return base
return EncryptionInfo(
scheme_type="cenc",
crypt_byte_block=0,
skip_byte_block=0,
per_sample_iv_size=base.per_sample_iv_size,
constant_iv=base.constant_iv,
kid=base.kid,
)
async def _decrypt_track_hex(
input_path: str,
decryption_key: str,
handler_type: bytes,
use_prefetch_key: bool = False,
use_track_key_for_all_descriptions: bool = False,
*,
use_cenc: bool = False,
use_single_content_key: bool = False,
file_backed: bool = False,
) -> DecryptedTrack:
"""Decrypt one audio/video/text track with a raw AES key."""
"""Decrypt one audio/video/text track with a raw AES key.
``use_single_content_key``:
``False`` (default for catalog audio): description 0 uses the built-in
prefetch AES key, description 1+ uses ``decryption_key`` (Apple CBCS layout).
``True`` (web AAC, muxed MV audio): every sample description uses
``decryption_key``.
"""
track_info = await asyncio.to_thread(extract_song, input_path, handler_type)
track_key = bytes.fromhex(decryption_key)
if use_track_key_for_all_descriptions:
if use_single_content_key:
keys = {sample.desc_index: track_key for sample in track_info.samples}
elif handler_type == b"soun" and use_prefetch_key:
keys = {0: track_key}
elif handler_type == b"soun":
keys = {0: DEFAULT_SONG_DECRYPTION_KEY, 1: track_key}
else:
keys = {sample.desc_index: track_key for sample in track_info.samples}
enc_info = track_info.encryption_info or EncryptionInfo(
scheme_type="cenc" if use_prefetch_key else "cbcs"
)
enc_info = _encryption_info_for_hex_decrypt(track_info, use_cenc=use_cenc)
enc_info_per_desc = None
if track_info.moov_data and not use_prefetch_key:
if track_info.moov_data:
enc_info_per_desc = await asyncio.to_thread(
_extract_encryption_info_per_stsd,
track_info.moov_data,
@@ -1659,15 +1691,17 @@ async def decrypt_file_hex(
input_audio_path: str,
decryption_key_video: str | None = None,
input_video_path: str | None = None,
use_prefetch_key: bool = False,
*,
use_cenc: bool = False,
use_single_content_key: bool = False,
) -> DecryptedMedia:
"""Decrypt audio and optional video with raw AES hex keys."""
audio = await _decrypt_track_hex(
input_audio_path,
decryption_key_audio,
b"soun",
use_prefetch_key,
use_track_key_for_all_descriptions=input_video_path is not None,
use_cenc=use_cenc,
use_single_content_key=use_single_content_key or input_video_path is not None,
file_backed=input_video_path is not None,
)
if input_video_path is None:
@@ -3198,9 +3232,11 @@ async def _decrypt_track_wrapper(
fairplay_key: str,
input_path: str,
handler_type: bytes = b"soun",
*,
use_single_content_key: bool = False,
progress_callback=None,
) -> DecryptedTrack:
"""Decrypt one track through wrapper-v2."""
"""Decrypt one track through wrapper-v2 (CBCS via FairPlay SKD)."""
song_info = await asyncio.to_thread(extract_song, input_path, handler_type)
enc_info = song_info.encryption_info or EncryptionInfo(scheme_type="cbcs")
enc_info_per_desc = None
@@ -3218,7 +3254,8 @@ async def _decrypt_track_wrapper(
song_info.samples,
enc_info,
enc_info_per_desc,
progress_callback,
use_single_content_key=use_single_content_key,
progress_callback=progress_callback,
)
return DecryptedTrack(input_path, song_info, decrypted_data)
@@ -3231,9 +3268,10 @@ async def decrypt_wrapper(
fairplay_key_video: str | None = None,
*,
fairplay_key_audio: str | None = None,
use_single_content_key: bool = False,
progress_callback=None,
) -> DecryptedMedia:
"""Decrypt audio and optional video through wrapper-v2."""
"""Decrypt audio and optional video through wrapper-v2 (CBCS)."""
if fairplay_key_audio is None:
if input_video_path is None and fairplay_key_video is not None:
fairplay_key_audio = fairplay_key_video
@@ -3246,7 +3284,8 @@ async def decrypt_wrapper(
fairplay_key_audio,
input_audio_path,
b"soun",
progress_callback,
use_single_content_key=use_single_content_key,
progress_callback=progress_callback,
)
if input_video_path is None:
return DecryptedMedia(audio=audio)
@@ -3261,7 +3300,8 @@ async def decrypt_wrapper(
fairplay_key_video,
input_video_path,
b"vide",
progress_callback,
use_single_content_key=use_single_content_key,
progress_callback=progress_callback,
)
)
caption_tracks = [