Refactor download methods to use generators

This commit is contained in:
Rafael Moraes
2025-09-14 12:34:01 -03:00
parent 268d9a71fc
commit 5b884743d8
4 changed files with 73 additions and 50 deletions
+9 -6
View File
@@ -613,23 +613,26 @@ def main(
continue
if media_metadata["type"] in {"songs", "library-songs"}:
downloader_song.download(
for _ in downloader_song.download(
media_metadata=media_metadata,
playlist_attributes=download_queue.playlist_attributes,
playlist_track=download_index,
)
):
pass
if media_metadata["type"] in {"music-videos", "library-music-videos"}:
downloader_music_video.download(
for _ in downloader_music_video.download(
media_metadata=media_metadata,
playlist_attributes=download_queue.playlist_attributes,
playlist_track=download_index,
)
):
pass
if media_metadata["type"] == "uploaded-videos":
downloader_post.download(
for _ in downloader_post.download(
media_metadata=media_metadata,
)
):
pass
except KeyboardInterrupt:
exit(0)
except (
+24 -15
View File
@@ -430,24 +430,22 @@ class DownloaderMusicVideo:
self.downloader.get_cover_file_extension(cover_format)
)
import typing
def download(
self,
media_id: str = None,
media_metadata: dict = None,
playlist_attributes: dict = None,
playlist_track: int = None,
) -> DownloadInfo:
try:
download_info = self._download(
media_id,
media_metadata,
playlist_attributes,
playlist_track,
)
self.downloader._final_processing(download_info)
finally:
self.downloader.cleanup_temp_path()
return download_info
) -> typing.Generator[DownloadInfo, None, None]:
yield from self.downloader._final_processing_wrapper(
self._download,
media_id,
media_metadata,
playlist_attributes,
playlist_track,
)
def _download(
self,
@@ -455,8 +453,9 @@ class DownloaderMusicVideo:
media_metadata: dict = None,
playlist_attributes: dict = None,
playlist_track: int = None,
) -> DownloadInfo:
) -> typing.Generator[DownloadInfo, None, None]:
download_info = DownloadInfo()
yield download_info
if playlist_track is None and playlist_attributes:
raise ValueError(
@@ -479,7 +478,11 @@ class DownloaderMusicVideo:
download_info.media_id = media_id
colored_media_id = color_text(media_id, colorama.Style.DIM)
self.downloader.check_database_and_raise(media_id)
database_final_path = self.downloader.get_database_final_path(media_id)
if database_final_path:
download_info.final_path = database_final_path
yield download_info
raise MediaFileAlreadyExistsException(database_final_path)
if not media_metadata:
logger.debug(f"[{colored_media_id}] Getting Music Video metadata")
@@ -487,6 +490,7 @@ class DownloaderMusicVideo:
download_info.media_metadata = media_metadata
if not self.downloader.is_media_streamable(media_metadata):
yield download_info
raise MediaNotStreamableException()
alt_media_id = self.get_music_video_id_alt(media_metadata) or media_id
@@ -514,8 +518,11 @@ class DownloaderMusicVideo:
webplayback = self.downloader.apple_music_api.get_webplayback(media_id)
logger.debug(f"[{colored_media_id}] Getting stream info")
stream_info = self.get_stream_info_from_webplayback(webplayback)
if not stream_info:
yield download_info
raise MediaFormatNotAvailableException()
download_info.stream_info = stream_info
final_path = self.downloader.get_final_path(
@@ -536,6 +543,7 @@ class DownloaderMusicVideo:
download_info.cover_path = cover_path
if final_path.exists() and not self.downloader.overwrite:
yield download_info
raise MediaFileAlreadyExistsException(final_path)
logger.debug(f"[{colored_media_id}] Getting decryption key")
@@ -589,6 +597,7 @@ class DownloaderMusicVideo:
)
logger.debug(
f"[{colored_media_id}]"
"Decrypting video/audio to "
f'{decrypted_path_video}"/"{decrypted_path_audio}" '
f'and remuxing to "{staged_path}"'
@@ -603,4 +612,4 @@ class DownloaderMusicVideo:
)
download_info.staged_path = staged_path
return download_info
yield download_info
+17 -13
View File
@@ -1,6 +1,7 @@
from __future__ import annotations
import logging
import typing
from pathlib import Path
import colorama
@@ -87,23 +88,20 @@ class DownloaderPost:
self,
media_id: str = None,
media_metadata: dict = None,
) -> DownloadInfo:
try:
download_info = self._download(
media_id,
media_metadata,
)
self.downloader._final_processing(download_info)
finally:
self.downloader.cleanup_temp_path()
return download_info
) -> typing.Generator[DownloadInfo, None, None]:
yield from self.downloader._final_processing_wrapper(
self._download,
media_id,
media_metadata,
)
def _download(
self,
media_id: str = None,
media_metadata: dict = None,
) -> DownloadInfo:
) -> typing.Generator[DownloadInfo, None, None]:
download_info = DownloadInfo()
yield download_info
if not media_id and not media_metadata:
raise ValueError("Either media_id or media_metadata must be provided")
@@ -113,7 +111,11 @@ class DownloaderPost:
download_info.media_id = media_id
colored_media_id = color_text(media_id, colorama.Style.DIM)
self.downloader.check_database_and_raise(media_id)
database_final_path = self.downloader.get_database_final_path(media_id)
if database_final_path:
download_info.final_path = database_final_path
yield download_info
raise MediaFileAlreadyExistsException(database_final_path)
if not media_metadata:
logger.debug(f"[{colored_media_id}] Getting Post Video metadata")
@@ -121,6 +123,7 @@ class DownloaderPost:
download_info.media_metadata = media_metadata
if not self.downloader.is_media_streamable(media_metadata):
yield download_info
raise MediaNotStreamableException()
tags = self.get_tags(media_metadata)
@@ -133,6 +136,7 @@ class DownloaderPost:
download_info.final_path = final_path
if final_path.exists() and not self.downloader.overwrite:
yield download_info
raise MediaFileAlreadyExistsException(final_path)
cover_url = self.downloader.get_cover_url(media_metadata)
@@ -161,4 +165,4 @@ class DownloaderPost:
)
download_info.staged_path = staged_path
return download_info
yield download_info
+23 -16
View File
@@ -6,6 +6,7 @@ import json
import logging
import re
import subprocess
import typing
from pathlib import Path
from xml.dom import minidom
from xml.etree import ElementTree
@@ -593,18 +594,14 @@ class DownloaderSong:
media_metadata: dict = None,
playlist_attributes: dict = None,
playlist_track: int = None,
) -> DownloadInfo:
try:
download_info = self._download(
media_id,
media_metadata,
playlist_attributes,
playlist_track,
)
self.downloader._final_processing(download_info)
finally:
self.downloader.cleanup_temp_path()
return download_info
) -> typing.Generator[DownloadInfo, None, None]:
yield from self.downloader._final_processing_wrapper(
self._download,
media_id,
media_metadata,
playlist_attributes,
playlist_track,
)
def _download(
self,
@@ -612,8 +609,9 @@ class DownloaderSong:
media_metadata: dict = None,
playlist_attributes: dict = None,
playlist_track: int = None,
) -> DownloadInfo:
) -> typing.Generator[DownloadInfo, None, None]:
download_info = DownloadInfo()
yield download_info
if playlist_track is None and playlist_attributes:
raise ValueError(
@@ -636,7 +634,11 @@ class DownloaderSong:
download_info.media_id = media_id
colored_media_id = color_text(media_id, colorama.Style.DIM)
self.downloader.check_database_and_raise(media_id)
database_final_path = self.downloader.get_database_final_path(media_id)
if database_final_path:
download_info.final_path = database_final_path
yield download_info
raise MediaFileAlreadyExistsException(database_final_path)
if not media_metadata:
logger.debug(f"[{colored_media_id}] Getting Song metadata")
@@ -672,7 +674,8 @@ class DownloaderSong:
logger.info(
f"[{colored_media_id}] Downloading synced lyrics only, skipping song download"
)
return download_info
yield download_info
return
cover_url = self.downloader.get_cover_url(media_metadata)
cover_format = self.downloader.get_cover_format(cover_url)
@@ -685,6 +688,7 @@ class DownloaderSong:
download_info.cover_path = cover_path
if final_path.exists() and not self.downloader.overwrite:
yield download_info
raise MediaFileAlreadyExistsException(final_path)
logger.debug(f"[{colored_media_id}] Getting stream info")
@@ -699,8 +703,11 @@ class DownloaderSong:
download_info.decryption_key = decryption_key
else:
stream_info = self.get_stream_info(media_metadata)
if not stream_info or not stream_info.audio_track.widevine_pssh:
yield download_info
raise MediaFormatNotAvailableException()
logger.debug(f"[{colored_media_id}] Getting decryption key")
decryption_key = self.get_decryption_key(
stream_info,
@@ -745,4 +752,4 @@ class DownloaderSong:
)
download_info.staged_path = staged_path
return download_info
yield download_info