Compare commits

...

35 Commits

Author SHA1 Message Date
R. M ae4c2abfe4 Changed version 2023-06-11 01:09:24 -03:00
R. M 37554e8f49 Added missing line 2023-06-11 01:09:12 -03:00
R. M aee494f464 Changed version 2023-06-11 00:17:05 -03:00
R. M eddd6f9053 Update __init__.py 2023-06-11 00:15:47 -03:00
R. M 24d9fdf9ee Increase video cover size 2023-06-10 12:20:35 -03:00
R. M a0fef9944e Changed URL argument behavior 2023-06-10 12:20:00 -03:00
R. M e37038e67b Remove skip cleanup 2023-06-10 12:15:53 -03:00
R. M 52c59f9a17 Update gamdl.py 2023-06-10 12:09:37 -03:00
R. M 2f80b9dc65 Update __main__.py 2023-06-10 12:09:35 -03:00
R. M caed322fd0 Update __init__.py 2023-06-10 12:09:33 -03:00
R. M 76396f3fed Update README.md 2023-05-20 14:21:10 -03:00
R. M b64cc06641 Version bump 2023-05-20 14:15:22 -03:00
R. M 4e2c54934a LRC only mode 2023-05-20 14:13:26 -03:00
R. M e76c79d9b4 Remove Print Video M3U8 URL 2023-05-20 13:14:32 -03:00
R. M 6dd730c368 Increased cover size 2023-05-20 13:12:14 -03:00
R. M 2a2403c130 Update gamdl.py 2023-05-20 13:10:11 -03:00
R. M fd47acab4f Remove HE-AAC 2023-05-20 13:09:50 -03:00
R. M 66d8211a16 Update get_sanizated_string 2023-05-20 13:05:26 -03:00
R. M 158f0e9f27 Update gamdl.py 2023-05-20 13:04:59 -03:00
R. M 0d9b225fdc Version bump 2023-04-19 22:47:02 -03:00
R. M 254147096a Fix index_js_uri 2023-04-19 22:46:31 -03:00
R. M 3a10069c76 heaac 2023-04-05 23:43:46 -03:00
R. M 9e07aee4e6 Update __init__.py 2023-04-04 16:13:30 -03:00
R. M 2c18a285a0 Update gamdl.py 2023-04-04 16:13:21 -03:00
R. M c854af5b2c Changed version 2023-04-04 16:08:22 -03:00
R. M f10a4a731b Gapless tag 2023-04-04 16:07:47 -03:00
R. M 527dd9935a Update __init__.py 2023-03-28 01:30:37 -03:00
R. M 06d5c10725 Update gamdl.py 2023-03-28 01:30:20 -03:00
R. M 58a8e3944d Update gamdl.py 2023-03-28 01:15:23 -03:00
R. M 4c7e563d4c Remove useless if 2023-03-28 01:13:02 -03:00
R. M f05dace5c1 Changed version 2023-03-28 01:10:11 -03:00
R. M eb81728475 Remove unused methods 2023-03-28 01:08:41 -03:00
R. M 96c90e1716 Better synced lyrics time format 2023-03-28 01:08:09 -03:00
R. M 7459d95df0 Added missing SD quality 2023-03-27 08:40:10 -03:00
R. M b2521e2933 Changed video get stream url method 2023-03-27 08:39:09 -03:00
4 changed files with 460 additions and 372 deletions
+26 -20
View File
@@ -42,50 +42,56 @@ Some new features that I added:
## Usage
```
usage: gamdl [-h] [-u [URLS_TXT]] [-w WVD_LOCATION] [-f FINAL_PATH] [-t TEMP_PATH] [-c COOKIES_LOCATION] [-m] [-p]
[-o] [-n] [-s] [-e] [-i] [-v]
[url ...]
usage: gamdl [-h] [-u [URLS_TXT]] [-w WVD_LOCATION] [-f FINAL_PATH]
[-t TEMP_PATH] [-c COOKIES_LOCATION] [-m] [-p] [-o] [-n]
[-l] [-s] [-e] [-v]
[url ...]
Download Apple Music songs/music videos/albums/playlists
positional arguments:
url Apple Music song/music video/album/playlist URL(s) (default: None)
url Apple Music song/music video/album/playlist URL(s)
(default: None)
options:
-h, --help show this help message and exit
-u [URLS_TXT], --urls-txt [URLS_TXT]
Read URLs from a text file (default: None)
-w WVD_LOCATION, --wvd-location WVD_LOCATION
.wvd file location (default: *.wvd)
.wvd file location (ignored if using -l/--lrc-only)
(default: ./*.wvd)
-f FINAL_PATH, --final-path FINAL_PATH
Final Path (default: Apple Music)
Final Path (default: ./Apple Music)
-t TEMP_PATH, --temp-path TEMP_PATH
Temp Path (default: temp)
Temp Path (default: ./temp)
-c COOKIES_LOCATION, --cookies-location COOKIES_LOCATION
Cookies location (default: cookies.txt)
Cookies location (default: ./cookies.txt)
-m, --disable-music-video-skip
Disable music video skip on playlists/albums (default: False)
Disable music video skip on playlists/albums (default:
False)
-p, --prefer-hevc Prefer HEVC over AVC (default: False)
-o, --overwrite Overwrite existing files (default: False)
-n, --no-lrc Don't create .lrc file (default: False)
-n, --no-lrc Don't create .lrc file (ignored if using -l/--lrc-
only) (default: False)
-l, --lrc-only Skip downloading songs and only create .lrc files
(default: False)
-s, --skip-cleanup Skip cleanup (default: False)
-e, --print-exceptions
Print execeptions (default: False)
-i, --print-video-m3u8-url
Print Video M3U8 URL (default: False)
-v, --version show program's version number and exit
```
## Songs/Music Videos quality
* Songs:
* 256kbps AAC
* AAC 256kbps
* Music Videos (varies depending on the video):
* 4K HEVC 20mbps / AAC 256kbps
* 4K HEVC 12mbps / AAC 256kbps
* 1080p AVC 10mbps / AAC 256kbps
* 1080p AVC 6.5bps / AAC 256kbps
* 720p AVC 4mbps / AAC 256kbps
* 480p AVC 1.5mbps / AAC 256kbps
* 360p AVC 1mbps / AAC 256kbps
* 4K HEVC 20mbps, AAC 256kbps
* 4K HEVC 12mbps, AAC 256kbps
* 1080p AVC 10mbps, AAC 256kbps
* 1080p AVC 6.5bps, AAC 256kbps
* 720p AVC 4mbps, AAC 256kbps
* 576p AVC 2mbps, AAC 256kbps
* 480p AVC 1.5mbps, AAC 256kbps
* 360p AVC 1mbps, AAC 256kbps
Some videos may include EIA-608 closed captions.
+118 -97
View File
@@ -1,109 +1,104 @@
import shutil
import argparse
import shutil
import traceback
from .gamdl import Gamdl
__version__ = '1.3'
__version__ = "1.9.3"
def main():
if not shutil.which('mp4decrypt'):
raise Exception('mp4decrypt is not on PATH')
if not shutil.which('MP4Box'):
raise Exception('MP4Box is not on PATH')
for tool in ("MP4Box", "mp4decrypt"):
if not shutil.which(tool):
raise Exception(f"{tool} is not on PATH")
parser = argparse.ArgumentParser(
description = 'Download Apple Music songs/music videos/albums/playlists',
formatter_class = argparse.ArgumentDefaultsHelpFormatter
description="Download Apple Music songs/music videos/albums/playlists",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
'url',
help = 'Apple Music song/music video/album/playlist URL(s)',
nargs = '*'
"url",
help="Apple Music song/music video/album/playlist URL(s)",
nargs="+",
)
parser.add_argument(
'-u',
'--urls-txt',
help = 'Read URLs from a text file',
nargs = '?'
"-u",
"--urls-txt",
help="Read URLs from a text file",
action="store_true",
)
parser.add_argument(
'-w',
'--wvd-location',
default = '*.wvd',
help = '.wvd file location'
"-w",
"--wvd-location",
default="./*.wvd",
help=".wvd file location (ignored if using -l/--lrc-only)",
)
parser.add_argument(
'-f',
'--final-path',
default = 'Apple Music',
help = 'Final Path'
"-f",
"--final-path",
default="./Apple Music",
help="Final Path",
)
parser.add_argument(
'-t',
'--temp-path',
default = 'temp',
help = 'Temp Path'
"-t",
"--temp-path",
default="./temp",
help="Temp Path",
)
parser.add_argument(
'-c',
'--cookies-location',
default = 'cookies.txt',
help = 'Cookies location'
"-c",
"--cookies-location",
default="./cookies.txt",
help="Cookies location",
)
parser.add_argument(
'-m',
'--disable-music-video-skip',
action = 'store_true',
help = 'Disable music video skip on playlists/albums'
"-m",
"--disable-music-video-skip",
action="store_true",
help="Disable music video skip on playlists/albums",
)
parser.add_argument(
'-p',
'--prefer-hevc',
action = 'store_true',
help = 'Prefer HEVC over AVC'
"-p",
"--prefer-hevc",
action="store_true",
help="Prefer HEVC over AVC",
)
parser.add_argument(
'-o',
'--overwrite',
action = 'store_true',
help = 'Overwrite existing files'
"-o",
"--overwrite",
action="store_true",
help="Overwrite existing files",
)
parser.add_argument(
'-n',
'--no-lrc',
action = 'store_true',
help = "Don't create .lrc file"
"-n",
"--no-lrc",
action="store_true",
help="Don't create .lrc file (ignored if using -l/--lrc-only)",
)
parser.add_argument(
'-s',
'--skip-cleanup',
action = 'store_true',
help = 'Skip cleanup'
"-l",
"--lrc-only",
action="store_true",
help="Skip downloading songs and only create .lrc files",
)
parser.add_argument(
'-e',
'--print-exceptions',
action = 'store_true',
help = 'Print execeptions'
"-e",
"--print-exceptions",
action="store_true",
help="Print execeptions",
)
parser.add_argument(
'-i',
'--print-video-m3u8-url',
action = 'store_true',
help = 'Print Video M3U8 URL'
)
parser.add_argument(
'-v',
'--version',
action = 'version',
version = f'%(prog)s {__version__}'
"-v",
"--version",
action="version",
version=f"%(prog)s {__version__}",
)
args = parser.parse_args()
if not args.url and not args.urls_txt:
parser.error('you must specify an url or a text file using -u/--urls-txt')
if args.urls_txt:
with open(args.urls_txt, 'r', encoding = 'utf8') as f:
args.url = f.read().splitlines()
_url = []
for url_txt in args.url:
with open(url_txt, "r", encoding="utf8") as f:
_url.extend(f.read().splitlines())
args.url = _url
dl = Gamdl(
args.wvd_location,
args.cookies_location,
@@ -111,9 +106,8 @@ def main():
args.prefer_hevc,
args.temp_path,
args.final_path,
args.no_lrc,
args.lrc_only,
args.overwrite,
args.skip_cleanup
)
error_count = 0
download_queue = []
@@ -124,43 +118,66 @@ def main():
exit(1)
except:
error_count += 1
print(f'Failed to check URL {i + 1}/{len(args.url)}')
print(f"Failed to check URL {i + 1}/{len(args.url)}")
if args.print_exceptions:
traceback.print_exc()
for i, url in enumerate(download_queue):
for j, track in enumerate(url):
print(f'Downloading "{track["attributes"]["name"]}" (track {j + 1}/{len(url)} from URL {i + 1}/{len(download_queue)})')
track_id = track['id']
print(
f'Downloading "{track["attributes"]["name"]}" (track {j + 1}/{len(url)} from URL {i + 1}/{len(download_queue)})'
)
track_id = track["id"]
try:
webplayback = dl.get_webplayback(track_id)
if track['type'] == 'music-videos':
if args.print_video_m3u8_url:
print(webplayback['hls-playlist-url'])
tags = dl.get_tags_music_video(track['attributes']['url'].split('/')[-1].split('?')[0])
final_location = dl.get_final_location('.m4v', tags)
if dl.check_exists(final_location) and not args.overwrite:
if track["type"] == "music-videos":
tags = dl.get_tags_music_video(
track["attributes"]["url"].split("/")[-1].split("?")[0]
)
final_location = dl.get_final_location(".m4v", tags)
if final_location.exists() and not args.overwrite:
continue
playlist = dl.get_playlist_music_video(webplayback)
stream_url_audio = dl.get_stream_url_music_video_audio(playlist)
decryption_keys_audio = dl.get_decryption_keys_music_video(stream_url_audio, track_id)
stream_url_video, stream_url_audio = dl.get_stream_url_music_video(
webplayback
)
decryption_keys_audio = dl.get_decryption_keys_music_video(
stream_url_audio, track_id
)
encrypted_location_audio = dl.get_encrypted_location_audio(track_id)
dl.download(encrypted_location_audio, stream_url_audio)
decrypted_location_audio = dl.get_decrypted_location_audio(track_id)
dl.decrypt(encrypted_location_audio, decrypted_location_audio, decryption_keys_audio)
stream_url_video = dl.get_stream_url_music_video_video(playlist)
decryption_keys_video = dl.get_decryption_keys_music_video(stream_url_video, track_id)
dl.decrypt(
encrypted_location_audio,
decrypted_location_audio,
decryption_keys_audio,
)
decryption_keys_video = dl.get_decryption_keys_music_video(
stream_url_video, track_id
)
encrypted_location_video = dl.get_encrypted_location_video(track_id)
dl.download(encrypted_location_video, stream_url_video)
decrypted_location_video = dl.get_decrypted_location_video(track_id)
dl.decrypt(encrypted_location_video, decrypted_location_video, decryption_keys_video)
fixed_location = dl.get_fixed_location(track_id, '.m4v')
dl.fixup_music_video(decrypted_location_audio, decrypted_location_video, fixed_location)
dl.make_final(final_location, fixed_location, tags)
dl.decrypt(
encrypted_location_video,
decrypted_location_video,
decryption_keys_video,
)
fixed_location = dl.get_fixed_location(track_id, ".m4v")
dl.fixup_music_video(
decrypted_location_audio,
decrypted_location_video,
fixed_location,
)
final_location.parent.mkdir(parents=True, exist_ok=True)
dl.move_final(final_location, fixed_location, tags)
else:
unsynced_lyrics, synced_lyrics = dl.get_lyrics(track_id)
tags = dl.get_tags_song(webplayback, unsynced_lyrics)
final_location = dl.get_final_location('.m4a', tags)
if dl.check_exists(final_location) and not args.overwrite:
final_location = dl.get_final_location(".m4a", tags)
if args.lrc_only:
final_location.parent.mkdir(parents=True, exist_ok=True)
dl.make_lrc(final_location, synced_lyrics)
continue
if final_location.exists() and not args.overwrite:
continue
stream_url = dl.get_stream_url_song(webplayback)
decryption_keys = dl.get_decryption_keys_song(stream_url, track_id)
@@ -168,16 +185,20 @@ def main():
dl.download(encrypted_location, stream_url)
decrypted_location = dl.get_decrypted_location_audio(track_id)
dl.decrypt(encrypted_location, decrypted_location, decryption_keys)
fixed_location = dl.get_fixed_location(track_id, '.m4a')
fixed_location = dl.get_fixed_location(track_id, ".m4a")
dl.fixup_song(decrypted_location, fixed_location)
dl.make_final(final_location, fixed_location, tags)
dl.make_lrc(final_location, synced_lyrics)
final_location.parent.mkdir(parents=True, exist_ok=True)
dl.move_final(final_location, fixed_location, tags)
if not args.no_lrc:
dl.make_lrc(final_location, synced_lyrics)
except KeyboardInterrupt:
exit(1)
except:
error_count += 1
print(f'Failed to download "{track["attributes"]["name"]}" (track {j + 1}/{len(url)} from URL {i + 1}/{len(download_queue)})')
print(
f'Failed to download "{track["attributes"]["name"]}" (track {j + 1}/{len(url)} from URL {i + 1}/{len(download_queue)})'
)
if args.print_exceptions:
traceback.print_exc()
dl.cleanup()
print(f'Done ({error_count} error(s))')
print(f"Done ({error_count} error(s))")
+2 -2
View File
@@ -1,4 +1,4 @@
import gamdl
from . import main
if __name__ == "__main__":
gamdl.main()
main()
+314 -253
View File
@@ -1,404 +1,465 @@
from pathlib import Path
import glob
from http.cookiejar import MozillaCookieJar
import re
import base64
import datetime
from xml.etree import ElementTree
import functools
import subprocess
import glob
import re
import shutil
import gamdl.storefront_ids
from pywidevine import Cdm, Device, PSSH, WidevinePsshData
import requests
import subprocess
from http.cookiejar import MozillaCookieJar
from pathlib import Path
from xml.etree import ElementTree
import m3u8
from yt_dlp import YoutubeDL
import requests
from mutagen.mp4 import MP4, MP4Cover
from pywidevine import PSSH, Cdm, Device, WidevinePsshData
from yt_dlp import YoutubeDL
import gamdl.storefront_ids
class Gamdl:
def __init__(self, wvd_location, cookies_location, disable_music_video_skip, prefer_hevc, temp_path, final_path, no_lrc, overwrite, skip_cleanup):
def __init__(
self,
wvd_location,
cookies_location,
disable_music_video_skip,
prefer_hevc,
temp_path,
final_path,
lrc_only,
overwrite,
):
self.disable_music_video_skip = disable_music_video_skip
self.prefer_hevc = prefer_hevc
self.temp_path = Path(temp_path)
self.final_path = Path(final_path)
self.no_lrc = no_lrc
self.overwrite = overwrite
self.skip_cleanup = skip_cleanup
wvd_location = glob.glob(wvd_location)
if not wvd_location:
raise Exception('.wvd file not found')
self.cdm = Cdm.from_device(Device.load(Path(wvd_location[0])))
self.cdm_session = self.cdm.open()
cookies = MozillaCookieJar(Path(cookies_location))
cookies.load(ignore_discard = True, ignore_expires = True)
self.lrc_only = lrc_only
if not self.lrc_only:
wvd_location = glob.glob(wvd_location)
if not wvd_location:
raise Exception(".wvd file not found")
self.cdm = Cdm.from_device(Device.load(wvd_location[0]))
self.cdm_session = self.cdm.open()
cookies = MozillaCookieJar(cookies_location)
cookies.load(ignore_discard=True, ignore_expires=True)
self.session = requests.Session()
self.session.cookies.update(cookies)
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:95.0) Gecko/20100101 Firefox/95.0',
'Accept': 'application/json',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate, br',
'content-type': 'application/json',
'Media-User-Token': self.session.cookies.get_dict()['media-user-token'],
'x-apple-renewal': 'true',
'DNT': '1',
'Connection': 'keep-alive',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-site',
'origin': 'https://beta.music.apple.com'
})
web_page = self.session.get('https://beta.music.apple.com').text
index_js_uri = re.search('(?<=index\.)(.*?)(?=\.js")', web_page).group(1)
index_js_page = self.session.get(f'https://beta.music.apple.com/assets/index.{index_js_uri}.js').text
self.session.headers.update(
{
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:95.0) Gecko/20100101 Firefox/95.0",
"Accept": "application/json",
"Accept-Language": "en-US,en;q=0.5",
"Accept-Encoding": "gzip, deflate, br",
"content-type": "application/json",
"Media-User-Token": self.session.cookies.get_dict()["media-user-token"],
"x-apple-renewal": "true",
"DNT": "1",
"Connection": "keep-alive",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-site",
"origin": "https://beta.music.apple.com",
}
)
web_page = self.session.get("https://beta.music.apple.com").text
index_js_uri = re.search(r"/assets/index-legacy-[^/]+\.js", web_page).group(0)
index_js_page = self.session.get(
f"https://beta.music.apple.com{index_js_uri}"
).text
token = re.search('(?=eyJh)(.*?)(?=")', index_js_page).group(1)
self.session.headers.update({"authorization": f'Bearer {token}'})
self.country = self.session.cookies.get_dict()['itua']
self.session.headers.update({"authorization": f"Bearer {token}"})
self.country = self.session.cookies.get_dict()["itua"]
self.storefront = getattr(gamdl.storefront_ids, self.country.upper())
def get_download_queue(self, url):
download_queue = []
product_id = url.split('/')[-1].split('i=')[-1].split('&')[0].split('?')[0]
response = self.session.get(f'https://amp-api.music.apple.com/v1/catalog/{self.country}/?ids[songs]={product_id}&ids[albums]={product_id}&ids[playlists]={product_id}&ids[music-videos]={product_id}').json()['data'][0]
if response['type'] in ('songs', 'music-videos') and 'playParams' in response['attributes']:
product_id = url.split("/")[-1].split("i=")[-1].split("&")[0].split("?")[0]
response = self.session.get(
f"https://amp-api.music.apple.com/v1/catalog/{self.country}/?ids[songs]={product_id}&ids[albums]={product_id}&ids[playlists]={product_id}&ids[music-videos]={product_id}"
).json()["data"][0]
if response["type"] == "songs" and "playParams" in response["attributes"]:
download_queue.append(response)
if response['type'] == 'albums' or response['type'] == 'playlists':
for track in response['relationships']['tracks']['data']:
if 'playParams' in track['attributes']:
if track['type'] == 'music-videos' and self.disable_music_video_skip:
if (
response["type"] == "music-videos"
and "playParams" in response["attributes"]
and not self.lrc_only
):
download_queue.append(response)
if response["type"] == "albums" or response["type"] == "playlists":
for track in response["relationships"]["tracks"]["data"]:
if "playParams" in track["attributes"]:
if (
track["type"] == "music-videos"
and self.disable_music_video_skip
and not self.lrc_only
):
download_queue.append(track)
if track['type'] == 'songs':
if track["type"] == "songs":
download_queue.append(track)
if not download_queue:
raise Exception('Criteria not met')
raise Exception("Criteria not met")
return download_queue
def get_webplayback(self, track_id):
response = self.session.post(
'https://play.itunes.apple.com/WebObjects/MZPlay.woa/wa/webPlayback',
json = {
'salableAdamId': track_id,
'language': 'en-US'
}
"https://play.itunes.apple.com/WebObjects/MZPlay.woa/wa/webPlayback",
json={
"salableAdamId": track_id,
"language": "en-US",
},
).json()["songList"][0]
return response
def get_playlist_music_video(self, webplayback):
return m3u8.load(webplayback['hls-playlist-url'])
def get_stream_url_song(self, webplayback):
return next(i for i in webplayback["assets"] if i["flavor"] == "28:ctrp256")['URL']
return next(i for i in webplayback["assets"] if i["flavor"] == "28:ctrp256")[
"URL"
]
def get_stream_url_music_video_audio(self, playlist):
return [i for i in playlist.media if i.type == "AUDIO"][-1].uri
def get_stream_url_music_video_video(self, playlist):
def get_stream_url_music_video(self, webplayback):
with YoutubeDL(
{
"allow_unplayable_formats": True,
"quiet": True,
"no_warnings": True,
}
) as ydl:
playlist = ydl.extract_info(webplayback["hls-playlist-url"], download=False)
if self.prefer_hevc:
return playlist.playlists[-1].uri
stream_url_video = playlist["formats"][-1]["url"]
else:
return [i for i in playlist.playlists if 'avc' in i.stream_info.codecs][-1].uri
def check_exists(self, final_location):
return Path(final_location).exists()
stream_url_video = [
i["url"]
for i in playlist["formats"]
if i["vcodec"] is not None and "avc1" in i["vcodec"]
][-1]
stream_url_audio = next(
i["url"]
for i in playlist["formats"]
if "audio-stereo-256" in i["format_id"]
)
return stream_url_video, stream_url_audio
def get_encrypted_location_video(self, track_id):
return self.temp_path / f'{track_id}_encrypted_video.mp4'
return self.temp_path / f"{track_id}_encrypted_video.mp4"
def get_encrypted_location_audio(self, track_id):
return self.temp_path / f'{track_id}_encrypted_audio.mp4'
return self.temp_path / f"{track_id}_encrypted_audio.mp4"
def get_decrypted_location_video(self, track_id):
return self.temp_path / f'{track_id}_decrypted_video.mp4'
return self.temp_path / f"{track_id}_decrypted_video.mp4"
def get_decrypted_location_audio(self, track_id):
return self.temp_path / f'{track_id}_decrypted_audio.mp4'
return self.temp_path / f"{track_id}_decrypted_audio.mp4"
def get_fixed_location(self, track_id, file_extension):
return self.temp_path / f'{track_id}_fixed{file_extension}'
return self.temp_path / f"{track_id}_fixed{file_extension}"
def download(self, encrypted_location, stream_url):
with YoutubeDL({
'quiet': True,
'no_warnings': True,
'outtmpl': str(encrypted_location),
'allow_unplayable_formats': True,
'fixup': 'never',
'overwrites': self.overwrite,
'external_downloader': 'aria2c'
}) as ydl:
with YoutubeDL(
{
"quiet": True,
"no_warnings": True,
"outtmpl": str(encrypted_location),
"allow_unplayable_formats": True,
"fixup": "never",
"overwrites": self.overwrite,
"external_downloader": "aria2c",
}
) as ydl:
ydl.download(stream_url)
def get_license_b64(self, challenge, track_uri, track_id):
return self.session.post(
'https://play.itunes.apple.com/WebObjects/MZPlay.woa/wa/acquireWebPlaybackLicense',
json = {
'challenge': challenge,
'key-system': 'com.widevine.alpha',
'uri': track_uri,
'adamId': track_id,
'isLibrary': False,
'user-initiated': True
}
).json()['license']
"https://play.itunes.apple.com/WebObjects/MZPlay.woa/wa/acquireWebPlaybackLicense",
json={
"challenge": challenge,
"key-system": "com.widevine.alpha",
"uri": track_uri,
"adamId": track_id,
"isLibrary": False,
"user-initiated": True,
},
).json()["license"]
def get_decryption_keys_music_video(self, stream_url, track_id):
playlist = m3u8.load(stream_url)
track_uri = next(i for i in playlist.keys if i.keyformat == "urn:uuid:edef8ba9-79d6-4ace-a3c8-27dcd51d21ed").uri
pssh = PSSH(track_uri.split(',')[1])
challenge = base64.b64encode(self.cdm.get_license_challenge(self.cdm_session, pssh)).decode('utf-8')
track_uri = next(
i
for i in playlist.keys
if i.keyformat == "urn:uuid:edef8ba9-79d6-4ace-a3c8-27dcd51d21ed"
).uri
pssh = PSSH(track_uri.split(",")[1])
challenge = base64.b64encode(
self.cdm.get_license_challenge(self.cdm_session, pssh)
).decode()
license_b64 = self.get_license_b64(challenge, track_uri, track_id)
self.cdm.parse_license(self.cdm_session, license_b64)
return f'1:{next(i for i in self.cdm.get_keys(self.cdm_session) if i.type == "CONTENT").key.hex()}'
def get_decryption_keys_song(self, stream_url, track_id):
track_uri = m3u8.load(stream_url).keys[0].uri
widevine_pssh_data = WidevinePsshData()
widevine_pssh_data.algorithm = 1
widevine_pssh_data.key_ids.append(base64.b64decode(track_uri.split(",")[1]))
pssh = PSSH(base64.b64encode(widevine_pssh_data.SerializeToString()).decode('utf-8'))
challenge = base64.b64encode(self.cdm.get_license_challenge(self.cdm_session, pssh)).decode('utf-8')
pssh = PSSH(base64.b64encode(widevine_pssh_data.SerializeToString()).decode())
challenge = base64.b64encode(
self.cdm.get_license_challenge(self.cdm_session, pssh)
).decode()
license_b64 = self.get_license_b64(challenge, track_uri, track_id)
self.cdm.parse_license(self.cdm_session, license_b64)
return f'1:{next(i for i in self.cdm.get_keys(self.cdm_session) if i.type == "CONTENT").key.hex()}'
def decrypt(self, encrypted_location, decrypted_location, decryption_keys):
subprocess.run(
[
'mp4decrypt',
"mp4decrypt",
encrypted_location,
'--key',
"--key",
decryption_keys,
decrypted_location
decrypted_location,
],
check = True
check=True,
)
def get_synced_lyrics_formated_time(self, unformatted_time):
if 's' in unformatted_time:
unformatted_time = unformatted_time.replace('s', '')
if '.' not in unformatted_time:
unformatted_time += '.0'
s = int(unformatted_time.split('.')[-2].split(':')[-1]) * 1000
try:
m = int(unformatted_time.split('.')[-2].split(':')[-2]) * 60000
except:
m = 0
ms = f'{int(unformatted_time.split(".")[-1]):03d}'
if int(ms[2]) >= 5:
ms = int(f'{int(ms[:2]) + 1}') * 10
else:
ms = int(ms)
formated_time = datetime.datetime.fromtimestamp((s + m + ms)/1000.0)
return formated_time.strftime('%M:%S.%f')[:-4]
unformatted_time = (
unformatted_time.replace("m", "").replace("s", "").replace(":", ".")
)
unformatted_time = unformatted_time.split(".")
m, s, ms = 0, 0, 0
ms = int(unformatted_time[-1])
if len(unformatted_time) >= 2:
s = int(unformatted_time[-2]) * 1000
if len(unformatted_time) >= 3:
m = int(unformatted_time[-3]) * 60000
unformatted_time = datetime.datetime.fromtimestamp((ms + s + m) / 1000.0)
ms_new = f"{int(str(unformatted_time.microsecond)[:3]):03d}"
if int(ms_new[2]) >= 5:
ms = int(f"{int(ms_new[:2]) + 1}") * 10
unformatted_time += datetime.timedelta(
milliseconds=ms
) - datetime.timedelta(microseconds=unformatted_time.microsecond)
return unformatted_time.strftime("%M:%S.%f")[:-4]
def get_lyrics(self, track_id):
try:
lyrics_ttml = ElementTree.fromstring(self.session.get(f'https://amp-api.music.apple.com/v1/catalog/{self.country}/songs/{track_id}/lyrics').json()['data'][0]['attributes']['ttml'])
lyrics_ttml = ElementTree.fromstring(
self.session.get(
f"https://amp-api.music.apple.com/v1/catalog/{self.country}/songs/{track_id}/lyrics"
).json()["data"][0]["attributes"]["ttml"]
)
except:
return None, None
unsynced_lyrics = ''
synced_lyrics = ''
for div in lyrics_ttml.iter('{http://www.w3.org/ns/ttml}div'):
for p in div.iter('{http://www.w3.org/ns/ttml}p'):
if p.attrib.get('begin'):
unsynced_lyrics = ""
synced_lyrics = ""
for div in lyrics_ttml.iter("{http://www.w3.org/ns/ttml}div"):
for p in div.iter("{http://www.w3.org/ns/ttml}p"):
if p.attrib.get("begin"):
synced_lyrics += f'[{self.get_synced_lyrics_formated_time(p.attrib.get("begin"))}]{p.text}\n'
if p.text is not None:
unsynced_lyrics += p.text + '\n'
unsynced_lyrics += '\n'
unsynced_lyrics += p.text + "\n"
unsynced_lyrics += "\n"
return unsynced_lyrics[:-2], synced_lyrics
@functools.lru_cache()
def get_cover(self, url):
return requests.get(url).content
def get_tags_song(self, webplayback, unsynced_lyrics):
metadata = next(i for i in webplayback["assets"] if i["flavor"] == "28:ctrp256")['metadata']
cover_url = next(i for i in webplayback["assets"] if i["flavor"] == "28:ctrp256")['artworkURL']
metadata = next(
i for i in webplayback["assets"] if i["flavor"] == "28:ctrp256"
)["metadata"]
cover_url = next(
i for i in webplayback["assets"] if i["flavor"] == "28:ctrp256"
)["artworkURL"].replace("600x600bb", "1200x1200bb")
tags = {
'\xa9nam': [metadata['itemName']],
'\xa9gen': [metadata['genre']],
'aART': [metadata['playlistArtistName']],
'\xa9alb': [metadata['playlistName']],
'soar': [metadata['sort-artist']],
'soal': [metadata['sort-album']],
'sonm': [metadata['sort-name']],
'\xa9ART': [metadata['artistName']],
'geID': [metadata['genreId']],
'atID': [int(metadata['artistId'])],
'plID': [int(metadata['playlistId'])],
'cnID': [int(metadata['itemId'])],
'sfID': [metadata['s']],
'rtng': [metadata['explicit']],
'pgap': metadata['gapless'],
'cpil': metadata['compilation'],
'disk': [(metadata['discNumber'], metadata['discCount'])],
'trkn': [(metadata['trackNumber'], metadata['trackCount'])],
'covr': [MP4Cover(self.get_cover(cover_url), MP4Cover.FORMAT_JPEG)],
'stik': [1]
"\xa9nam": [metadata["itemName"]],
"\xa9gen": [metadata["genre"]],
"aART": [metadata["playlistArtistName"]],
"\xa9alb": [metadata["playlistName"]],
"soar": [metadata["sort-artist"]],
"soal": [metadata["sort-album"]],
"sonm": [metadata["sort-name"]],
"\xa9ART": [metadata["artistName"]],
"geID": [metadata["genreId"]],
"atID": [int(metadata["artistId"])],
"plID": [int(metadata["playlistId"])],
"cnID": [int(metadata["itemId"])],
"sfID": [metadata["s"]],
"rtng": [metadata["explicit"]],
"pgap": metadata["gapless"],
"cpil": metadata["compilation"],
"disk": [(metadata["discNumber"], metadata["discCount"])],
"trkn": [(metadata["trackNumber"], metadata["trackCount"])],
"covr": [MP4Cover(self.get_cover(cover_url))],
"stik": [1],
}
if 'copyright' in metadata:
tags['cprt'] = [metadata['copyright']]
if 'releaseDate' in metadata:
tags['\xa9day'] = [metadata['releaseDate']]
if 'comments' in metadata:
tags['\xa9cmt'] = [metadata['comments']]
if 'xid' in metadata:
tags['xid '] = [metadata['xid']]
if 'composerId' in metadata:
tags['cmID'] = [int(metadata['composerId'])]
tags['\xa9wrt'] = [metadata['composerName']]
tags['soco'] = [metadata['sort-composer']]
if "copyright" in metadata:
tags["cprt"] = [metadata["copyright"]]
if "releaseDate" in metadata:
tags["\xa9day"] = [metadata["releaseDate"]]
if "comments" in metadata:
tags["\xa9cmt"] = [metadata["comments"]]
if "xid" in metadata:
tags["xid "] = [metadata["xid"]]
if "composerId" in metadata:
tags["cmID"] = [int(metadata["composerId"])]
tags["\xa9wrt"] = [metadata["composerName"]]
tags["soco"] = [metadata["sort-composer"]]
if unsynced_lyrics:
tags['\xa9lyr'] = [unsynced_lyrics]
tags["\xa9lyr"] = [unsynced_lyrics]
return tags
def get_tags_music_video(self, track_id):
metadata = requests.get(f'https://itunes.apple.com/lookup?id={track_id}&entity=album&country={self.country}&lang=en_US').json()['results']
extra_metadata = requests.get(f'https://music.apple.com/music-video/{metadata[0]["trackId"]}', headers = {'X-Apple-Store-Front': f'{self.storefront} t:music31'}).json()['storePlatformData']['product-dv']['results'][str(metadata[0]['trackId'])]
metadata = requests.get(
f"https://itunes.apple.com/lookup?id={track_id}&entity=album&country={self.country}&lang=en_US"
).json()["results"]
extra_metadata = requests.get(
f'https://music.apple.com/music-video/{metadata[0]["trackId"]}',
headers={"X-Apple-Store-Front": f"{self.storefront} t:music31"},
).json()["storePlatformData"]["product-dv"]["results"][
str(metadata[0]["trackId"])
]
tags = {
'\xa9ART': [metadata[0]["artistName"]],
'\xa9nam': [metadata[0]["trackCensoredName"]],
'\xa9day': [metadata[0]["releaseDate"]],
'\xa9gen': [metadata[0]['primaryGenreName']],
'stik': [6],
'atID': [metadata[0]['artistId']],
'cnID': [metadata[0]["trackId"]],
'geID': [int(extra_metadata['genres'][0]['genreId'])],
'sfID': [int(self.storefront.split('-')[0])],
'covr': [MP4Cover(self.get_cover(metadata[0]["artworkUrl30"].replace('30x30bb.jpg', '600x600bb.jpg')), MP4Cover.FORMAT_JPEG)]
"\xa9ART": [metadata[0]["artistName"]],
"\xa9nam": [metadata[0]["trackCensoredName"]],
"\xa9day": [metadata[0]["releaseDate"]],
"\xa9gen": [metadata[0]["primaryGenreName"]],
"stik": [6],
"atID": [metadata[0]["artistId"]],
"cnID": [metadata[0]["trackId"]],
"geID": [int(extra_metadata["genres"][0]["genreId"])],
"sfID": [int(self.storefront.split("-")[0])],
"covr": [
MP4Cover(
self.get_cover(
metadata[0]["artworkUrl30"].replace(
"30x30bb.jpg", "1920x1080bb.jpg"
)
)
)
],
}
if 'copyright' in extra_metadata:
tags['cprt'] = [extra_metadata['copyright']]
if metadata[0]['trackExplicitness'] == 'notExplicit':
tags['rtng'] = [0]
elif metadata[0]['trackExplicitness'] == 'explicit':
tags['rtng'] = [1]
if "copyright" in extra_metadata:
tags["cprt"] = [extra_metadata["copyright"]]
if metadata[0]["trackExplicitness"] == "notExplicit":
tags["rtng"] = [0]
elif metadata[0]["trackExplicitness"] == "explicit":
tags["rtng"] = [1]
else:
tags['rtng'] = [2]
tags["rtng"] = [2]
if len(metadata) > 1:
tags['\xa9alb'] = [metadata[1]["collectionCensoredName"]]
tags['aART'] = [metadata[1]["artistName"]]
tags['plID'] = [metadata[1]["collectionId"]]
tags['disk'] = [(metadata[0]["discNumber"], metadata[0]["discCount"])]
tags['trkn'] = [(metadata[0]["trackNumber"], metadata[0]["trackCount"])]
tags["\xa9alb"] = [metadata[1]["collectionCensoredName"]]
tags["aART"] = [metadata[1]["artistName"]]
tags["plID"] = [metadata[1]["collectionId"]]
tags["disk"] = [(metadata[0]["discNumber"], metadata[0]["discCount"])]
tags["trkn"] = [(metadata[0]["trackNumber"], metadata[0]["trackCount"])]
return tags
def get_sanizated_string(self, dirty_string, is_folder):
for character in ['\\', '/', ':', '*', '?', '"', '<', '>', '|', ';']:
dirty_string = dirty_string.replace(character, '_')
dirty_string = re.sub(r'[\\/:\*\?"<>\|;]', "_", dirty_string)
if is_folder:
dirty_string = dirty_string[:40]
if dirty_string[-1:] == '.':
dirty_string = dirty_string[:-1] + '_'
if dirty_string.endswith(".") == ".":
dirty_string = dirty_string[:-1] + "_"
else:
dirty_string = dirty_string[:36]
return dirty_string.strip()
def get_final_location_overwrite_prevented_music_video(self, final_location):
count = 1
while True:
if final_location.with_name(f'{final_location.stem} {count}.m4v').exists():
if final_location.with_name(f"{final_location.stem} {count}.m4v").exists():
count += 1
else:
return final_location.with_name(f'{final_location.stem} {count}.m4v')
return final_location.with_name(f"{final_location.stem} {count}.m4v")
def get_final_location(self, file_extension, tags):
final_location = self.final_path
if 'plID' in tags:
if tags['disk'][0][1] > 1:
file_name = self.get_sanizated_string(f'{tags["disk"][0][0]}-{tags["trkn"][0][0]:02d} {tags["©nam"][0]}', False)
if "plID" in tags:
if tags["disk"][0][1] > 1:
file_name = self.get_sanizated_string(
f'{tags["disk"][0][0]}-{tags["trkn"][0][0]:02d} {tags["©nam"][0]}',
False,
)
else:
file_name = self.get_sanizated_string(f'{tags["trkn"][0][0]:02d} {tags["©nam"][0]}', False)
if 'cpil' in tags and tags['cpil']:
final_location /= f'Compilations/{self.get_sanizated_string(tags["©alb"][0], True)}'
file_name = self.get_sanizated_string(
f'{tags["trkn"][0][0]:02d} {tags["©nam"][0]}', False
)
if "cpil" in tags and tags["cpil"]:
final_location /= (
f'Compilations/{self.get_sanizated_string(tags["©alb"][0], True)}'
)
else:
final_location /= f'{self.get_sanizated_string(tags["aART"][0], True)}/{self.get_sanizated_string(tags["©alb"][0], True)}'
else:
file_name = self.get_sanizated_string(tags["©nam"][0], False)
final_location /= f'{self.get_sanizated_string(tags["©ART"][0], True)}/Unknown Album/'
final_location /= f'{file_name}{file_extension}'
final_location /= (
f'{self.get_sanizated_string(tags["©ART"][0], True)}/Unknown Album/'
)
final_location /= f"{file_name}{file_extension}"
try:
if file_extension == '.m4v' and final_location.exists() and MP4(final_location)['cnID'][0] != tags['cnID'][0]:
final_location = self.get_final_location_overwrite_prevented_music_video(final_location)
if (
file_extension == ".m4v"
and final_location.exists()
and MP4(final_location)["cnID"][0] != tags["cnID"][0]
):
final_location = (
self.get_final_location_overwrite_prevented_music_video(
final_location
)
)
except:
pass
return final_location
def fixup_music_video(self, decrypted_location_audio, decrypted_location_video, fixed_location):
def fixup_music_video(
self, decrypted_location_audio, decrypted_location_video, fixed_location
):
subprocess.run(
[
'MP4Box',
'-quiet',
'-add',
"MP4Box",
"-quiet",
"-add",
decrypted_location_audio,
'-add',
"-add",
decrypted_location_video,
'-itags',
'artist=placeholder',
'-new',
fixed_location
"-itags",
"artist=placeholder",
"-new",
fixed_location,
],
check = True
check=True,
)
def fixup_song(self, decrypted_location, fixed_location):
subprocess.run(
[
'MP4Box',
'-quiet',
'-add',
"MP4Box",
"-quiet",
"-add",
decrypted_location,
'-itags',
'artist=placeholder',
'-new',
fixed_location
"-itags",
"artist=placeholder",
"-new",
fixed_location,
],
check = True
check=True,
)
def make_lrc(self, final_location, synced_lyrics):
if synced_lyrics and not self.no_lrc:
with open(final_location.with_suffix('.lrc'), 'w', encoding = 'utf8') as f:
if synced_lyrics:
with open(final_location.with_suffix(".lrc"), "w", encoding="utf8") as f:
f.write(synced_lyrics)
def make_final(self, final_location, fixed_location, tags):
final_location.parent.mkdir(parents = True, exist_ok = True)
shutil.copy(fixed_location, final_location)
def move_final(self, final_location, fixed_location, tags):
shutil.move(fixed_location, final_location)
file = MP4(final_location)
file.update(tags)
file.save()
def cleanup(self):
if self.temp_path.exists() and not self.skip_cleanup:
if self.temp_path.exists():
shutil.rmtree(self.temp_path)