Add optional m3u8 wrapper support

This commit is contained in:
Rafael Moraes
2026-04-24 11:18:01 -03:00
parent 9f60043375
commit b00163a71c
3 changed files with 34 additions and 4 deletions
+1 -1
View File
@@ -96,7 +96,7 @@ class AppleMusicSongDownloader:
staged_path=staged_path,
)
if self.base.use_wrapper and not legacy:
if self.base.interface.base.use_wrapper and not legacy:
await self._decrypt_amdecrypt(
encrypted_path,
staged_path,
+9 -1
View File
@@ -29,12 +29,16 @@ class AppleMusicBaseInterface:
itunes_api: ItunesApi,
cover_format: CoverFormat,
cover_size: int,
use_wrapper: bool,
wrapper_m3u8_ip: str,
cdm: Cdm,
) -> None:
self.apple_music_api = apple_music_api
self.itunes_api = itunes_api
self.cover_format = cover_format
self.cover_size = cover_size
self.use_wrapper = use_wrapper
self.wrapper_m3u8_ip = wrapper_m3u8_ip
self.cdm = cdm
@staticmethod
@@ -121,8 +125,10 @@ class AppleMusicBaseInterface:
apple_music_api: AppleMusicApi,
cover_format: CoverFormat = CoverFormat.JPG,
cover_size: int = 1200,
itunes_api: ItunesApi | None = None,
use_wrapper: bool = False,
wrapper_m3u8_ip: str = "127.0.0.1:20020",
wvd_path: str | None = None,
itunes_api: ItunesApi | None = None,
):
itunes_api = itunes_api or await ItunesApi.create(
storefront=apple_music_api.storefront,
@@ -135,6 +141,8 @@ class AppleMusicBaseInterface:
itunes_api=itunes_api,
cover_format=cover_format,
cover_size=cover_size,
use_wrapper=use_wrapper,
wrapper_m3u8_ip=wrapper_m3u8_ip,
cdm=cdm,
)
return base
+24 -2
View File
@@ -5,6 +5,7 @@ import json
import re
from typing import Callable
from xml.dom import minidom
import struct
from xml.etree import ElementTree
import m3u8
@@ -258,6 +259,25 @@ class AppleMusicSongInterface:
else:
return await self._get_stream_info(song_metadata, codec)
async def get_wrapper_m3u8(self, adam_id: str) -> str | None:
host, port = self.base.wrapper_m3u8_ip.split(":")
reader, writer = await asyncio.open_connection(host, port)
data = struct.pack("B", len(adam_id)) + adam_id.encode()
writer.write(data)
await writer.drain()
response = await reader.readuntil(b"\n")
m3u8_url = response.decode().strip()
writer.close()
await writer.wait_closed()
if m3u8_url:
return m3u8_url
return None
async def _get_stream_info(
self,
song_metadata: dict,
@@ -272,8 +292,10 @@ class AppleMusicSongInterface:
)
)["data"][0]
m3u8_master_url = song_metadata["attributes"]["extendedAssetUrls"].get(
"enhancedHls"
m3u8_master_url = (
self.get_wrapper_m3u8(self.base.parse_catalog_media_id(song_metadata))
if self.base.use_wrapper
else song_metadata["attributes"]["extendedAssetUrls"].get("enhancedHls")
)
if not m3u8_master_url:
return None