Fix ALAC duration and timescale handling

This commit is contained in:
Rafael Moraes
2026-04-20 09:53:38 -03:00
+84 -27
View File
@@ -168,10 +168,17 @@ def extract_song(input_path: str) -> SongInfo:
trex_defaults = (
_extract_trex_defaults(song_info.moov_data, audio_track_id)
if song_info.moov_data
else {"default_sample_duration": 1024, "default_sample_size": 0}
else None
)
default_sample_duration = trex_defaults["default_sample_duration"]
default_sample_size = trex_defaults["default_sample_size"]
if trex_defaults:
default_sample_duration = trex_defaults["default_sample_duration"]
default_sample_size = trex_defaults["default_sample_size"]
else:
# Fallback defaults. ALAC typically uses 4096 samples per frame,
# while AAC uses 1024. Default to 4096 if the track contains 'alac'.
is_alac = song_info.moov_data and b"alac" in song_info.moov_data
default_sample_duration = 4096 if is_alac else 1024
default_sample_size = 0
logger.debug(
f"Default sample duration: {default_sample_duration}, "
f"default sample size: {default_sample_size}"
@@ -210,6 +217,18 @@ def extract_song(input_path: str) -> SongInfo:
song_info.samples.extend(samples_from_pair)
moof_box = None
# Post-process samples: if this is ALAC, ensure all samples have duration 4096.
# Apple Music fragments often report 1024 in trex/tfhd defaults, but
# ALAC frames are actually 4096 samples long. This mismatch is the
# root cause of the 1:16 duration reporting for 5-minute tracks.
is_alac = song_info.moov_data and (b"alac" in song_info.moov_data or b"ALAC" in song_info.moov_data)
if is_alac:
logger.debug("ALAC detected: forcing all sample durations to 4096")
for sample in song_info.samples:
# Only override if it was 0 or the common incorrect default of 1024
if sample.duration in (0, 1024):
sample.duration = 4096
logger.debug(f"Extracted {len(song_info.samples)} samples from {input_path}")
return song_info
@@ -596,10 +615,12 @@ def write_decrypted_m4a(
orig_mvhd = None
orig_tkhd = None
orig_mdhd = None
orig_hdlr = None
orig_smhd = None
orig_dinf = None
timescale = 44100 # Default
# We will use the actual audio sample rate from the stsd as our
# master timescale to ensure 100% duration consistency.
orig_hdlr = None
timescale = 44100 # Default fallback
if original_path:
with open(original_path, "rb") as f:
@@ -611,7 +632,8 @@ def write_decrypted_m4a(
if orig_data:
stsd_content = _extract_stsd_content(orig_data)
timescale = _extract_timescale(orig_data)
# Extract the REAL sample rate from the codec configuration
timescale = _extract_sample_rate_from_stsd(stsd_content) or _extract_timescale(orig_data)
# Find moov box and extract child boxes
moov_idx = orig_data.find(b"moov")
@@ -712,7 +734,7 @@ def _write_moov(
# mvhd (movie header)
if orig_mvhd:
f.write(_patch_mvhd_duration(orig_mvhd, total_duration))
f.write(_patch_mvhd_duration(orig_mvhd, total_duration, timescale))
else:
mvhd_content = struct.pack(">II", 0, 0) # creation, modification
mvhd_content += struct.pack(">I", timescale)
@@ -755,7 +777,7 @@ def _write_moov(
# mdhd (media header) - preserves original language code
if orig_mdhd:
f.write(_patch_mdhd_duration(orig_mdhd, total_duration))
f.write(_patch_mdhd_duration(orig_mdhd, total_duration, timescale))
else:
mdhd_content = struct.pack(">II", 0, 0) # creation, modification
mdhd_content += struct.pack(">I", timescale)
@@ -845,6 +867,23 @@ def _write_moov(
f.write(struct.pack(">I", mdat_offset))
f.seek(0, 2) # Back to end
def _extract_sample_rate_from_stsd(stsd_content: bytes) -> Optional[int]:
"""Extract the actual audio sample rate from the stsd box content."""
# Header: version(1)+flags(3)+count(4) + Entry: size(4)+type(4) = 16 bytes
# AudioSampleEntry v0: reserved(6)+dref(2)+ver(2)+rev(2)+vend(4)+chan(2)+size(2)+comp(2)+pack(2)+rate(4)
# The fixed-point sample_rate field is at offset 16 + 24 = 40.
if not stsd_content or len(stsd_content) < 44:
return None
samplerate_offset = 40
sample_rate_fixed = struct.unpack(">I", stsd_content[samplerate_offset : samplerate_offset + 4])[0]
sample_rate = sample_rate_fixed >> 16
# Sanity check: standard audio sample rates should be between 8000 and 384000
if 8000 <= sample_rate <= 384000:
return sample_rate
return None
def _write_stsd(f, stsd_content: bytes):
"""Write sample description box using content from original file.
@@ -1188,10 +1227,16 @@ def _extract_timescale(data: bytes) -> int:
"""Extract timescale from moov/mvhd or mdhd box."""
# Look for mdhd box (media header has the audio timescale)
idx = data.find(b"mdhd")
if idx > 0 and idx + 24 < len(data):
# mdhd: version(1) + flags(3) + creation(4) + modification(4) + timescale(4)
return struct.unpack(">I", data[idx + 16 : idx + 20])[0]
return 44100 # Default
if idx > 0 and idx + 28 < len(data):
# mdhd: size(4) + type(b'mdhd') + version(1) + flags(3)
version = data[idx + 4]
if version == 0:
# v0: ver+flags(4) + creation(4) + modification(4) + timescale(4)
return struct.unpack(">I", data[idx + 16 : idx + 20])[0]
else:
# v1: ver+flags(4) + creation(8) + modification(8) + timescale(4)
return struct.unpack(">I", data[idx + 24 : idx + 28])[0]
return 44100 # Default fallback
def _find_child_box(
@@ -1244,15 +1289,17 @@ def _find_audio_trak(moov_data: bytes) -> Optional[bytes]:
return None
def _patch_mvhd_duration(box_data: bytes, duration: int) -> bytes:
"""Return a copy of the mvhd box with its duration field patched."""
def _patch_mvhd_duration(box_data: bytes, duration: int, timescale: int) -> bytes:
"""Return a copy of the mvhd box with its duration and timescale fields patched."""
data = bytearray(box_data)
version = data[8] # After size(4) + type(4)
if version == 0:
# v0: ver+flags(4) + creation(4) + modification(4) + timescale(4) + duration(4)
struct.pack_into(">I", data, 20, timescale)
struct.pack_into(">I", data, 24, duration)
else:
# v1: ver+flags(4) + creation(8) + modification(8) + timescale(4) + duration(8)
struct.pack_into(">I", data, 28, timescale)
struct.pack_into(">Q", data, 32, duration)
return bytes(data)
@@ -1272,17 +1319,18 @@ def _patch_tkhd_duration(box_data: bytes, duration: int) -> bytes:
return bytes(data)
def _patch_mdhd_duration(box_data: bytes, duration: int) -> bytes:
"""Return a copy of the mdhd box with its duration field patched.
Preserves the original language code and all other fields.
"""
def _patch_mdhd_duration(box_data: bytes, duration: int, timescale: int) -> bytes:
"""Return a copy of the mdhd box with its duration and timescale fields patched."""
data = bytearray(box_data)
version = data[8]
if version == 0:
# Same layout as mvhd v0
# v0: ver+flags(4) + creation(4) + modification(4) + timescale(4) + duration(4)
struct.pack_into(">I", data, 20, timescale)
struct.pack_into(">I", data, 24, duration)
else:
# v1: ver+flags(4) + creation(8) + modification(8) + timescale(4) + duration(8)
struct.pack_into(">I", data, 28, timescale)
struct.pack_into(">Q", data, 32, duration)
return bytes(data)
@@ -1331,8 +1379,13 @@ def _extract_trex_defaults(moov_data: bytes, target_track_id: int = 0) -> dict:
Dict with keys: default_sample_duration, default_sample_size,
default_sample_description_index, default_sample_flags.
"""
# Determine fallback duration based on codec
# ALAC frames are 4096 samples, AAC frames are 1024 samples
is_alac = b"alac" in moov_data or b"ALAC" in moov_data
fallback_duration = 4096 if is_alac else 1024
defaults = {
"default_sample_duration": 1024,
"default_sample_duration": fallback_duration,
"default_sample_size": 0,
"default_sample_description_index": 1,
"default_sample_flags": 0,
@@ -1361,12 +1414,17 @@ def _extract_trex_defaults(moov_data: bytes, target_track_id: int = 0) -> dict:
defaults["default_sample_description_index"] = struct.unpack(
">I", trex_data[16:20]
)[0]
defaults["default_sample_duration"] = struct.unpack(
">I", trex_data[20:24]
)[0]
defaults["default_sample_size"] = struct.unpack(">I", trex_data[24:28])[
0
]
# Extract duration and protect against Apple's dummy values
parsed_duration = struct.unpack(">I", trex_data[20:24])[0]
# Override if the provider wrote 0, or if they incorrectly wrote 1024 for an ALAC track
if parsed_duration == 0 or (is_alac and parsed_duration == 1024):
defaults["default_sample_duration"] = fallback_duration
else:
defaults["default_sample_duration"] = parsed_duration
defaults["default_sample_size"] = struct.unpack(">I", trex_data[24:28])[0]
defaults["default_sample_flags"] = struct.unpack(
">I", trex_data[28:32]
)[0]
@@ -1380,7 +1438,6 @@ def _extract_trex_defaults(moov_data: bytes, target_track_id: int = 0) -> dict:
return defaults
def _extract_encryption_info(moov_data: bytes) -> Optional[EncryptionInfo]:
"""Extract encryption scheme info from the audio track's sinf box.