Files
decky-loader/backend/decky_loader/browser.py
T
jbofill 9f586a1b97
Builder Win / Build PluginLoader for Win (push) Has been cancelled
Builder / Build PluginLoader (push) Has been cancelled
Push Updated Plugin Stub to Template / copy-stub (push) Has been cancelled
Lint / Run linters (push) Has been cancelled
Type Check / Run type checkers (push) Has been cancelled
Feat: Disable plugins (#850)
* implement base frontend changes necessary for plugin disabling

* implement frontend diisable functions/ modal

* plugin disable boilerplate / untested

* Feat disable plugins (#810)

* implement base frontend changes necessary for plugin disabling

* implement frontend diisable functions/ modal

---------

Co-authored-by: Jesse Bofill <jesse_bofill@yahoo.com>

* fix mistakes

* add frontend

* working plugin disable, not tested extensively

* fix uninstalled hidden plugins remaining in list

* hide plugin irrelevant plugin setting menu option when disabled

* fix hidden plugin issues

* reset disabled plugin on uninstall

* fix plugin load on reenable

* move disable settings uninstall cleanup

* add engilsh tranlsations for enable/ disable elements

* fix bug where wrong loadType can get passed to importPlugin

* show correct number of hidden plugins if plugin is both hidden and disabled

* fix: get fresh list of plugin updates when changed in settings plugin list

* fix: fix invalid semver plugin version from preventing latest updates

* retain x position when changing focus in list items  that have multiple horizontal focusables

* correction to pluging version checking validation

* make sure disabled plugins get checked for updates

* show number of disabled plugins at bottom of plugin view

* add notice to update modals that disabled plugins will be enabled upon installation

* run formatter

* Update backend/decky_loader/locales/en-US.json

Co-authored-by: EMERALD <hudson.samuels@gmail.com>

* chore: correct filename typo

* chore: change disabled icon

* chore: revert accidental defsettings changes

* format

* add timeout to frontend importPlugin

if a request hangs this prevent it from blocking other plugin loads.
backend diaptch_plugin which calls this for individual plugin load (as opposed to batch) is set to 15s.
other callers of importPlugin are not using timeout, same as before.

* fix plugin update checking loop

---------

Co-authored-by: marios <marios8543@gmail.com>
Co-authored-by: EMERALD <hudson.samuels@gmail.com>
2025-12-30 13:29:08 -06:00

356 lines
16 KiB
Python

# Full imports
import json
# import pprint
# from pprint import pformat
# Partial imports
from aiohttp import ClientSession
from asyncio import sleep
from hashlib import sha256
from io import BytesIO
from logging import getLogger
from os import R_OK, W_OK, path, listdir, access, mkdir
from re import sub
from shutil import rmtree
from time import time
from zipfile import ZipFile
from enum import IntEnum
from typing import Dict, List, TypedDict
# Local modules
from .localplatform.localplatform import chown, chmod, get_chown_plugin_path
from .loader import Loader, Plugins
from .helpers import get_ssl_context, download_remote_binary_to_path
from .enums import UserType
from .settings import SettingsManager
logger = getLogger("Browser")
class PluginInstallType(IntEnum):
INSTALL = 0
REINSTALL = 1
UPDATE = 2
DOWNGRADE = 3
OVERWRITE = 4
class PluginInstallRequest(TypedDict):
name: str
artifact: str
version: str
hash: str
install_type: PluginInstallType
class PluginInstallContext:
def __init__(self, artifact: str, name: str, version: str, hash: str) -> None:
self.artifact = artifact
self.name = name
self.version = version
self.hash = hash
class PluginBrowser:
def __init__(self, plugin_path: str, plugins: Plugins, loader: Loader, settings: SettingsManager) -> None:
self.plugin_path = plugin_path
self.plugins = plugins
self.loader = loader
self.settings = settings
self.install_requests: Dict[str, PluginInstallContext | List[PluginInstallContext]] = {}
def _unzip_to_plugin_dir(self, zip: BytesIO, name: str, hash: str):
zip_hash = sha256(zip.getbuffer()).hexdigest()
if hash and (zip_hash != hash):
return False
zip_file = ZipFile(zip)
zip_file.extractall(self.plugin_path)
return True
async def _download_remote_binaries_for_plugin_with_name(self, pluginBasePath: str):
rv = False
try:
packageJsonPath = path.join(pluginBasePath, 'package.json')
pluginBinPath = path.join(pluginBasePath, 'bin')
logger.debug(f"Checking package.json at {packageJsonPath}")
if access(packageJsonPath, R_OK):
with open(packageJsonPath, "r", encoding="utf-8") as f:
packageJson = json.load(f)
if "remote_binary" in packageJson and len(packageJson["remote_binary"]) > 0:
# create bin directory if needed.
chmod(pluginBasePath, 777)
if access(pluginBasePath, W_OK):
if not path.exists(pluginBinPath):
logger.debug(f"Creating bin directory at {pluginBinPath}")
mkdir(pluginBinPath)
if not access(pluginBinPath, W_OK):
chmod(pluginBinPath, 777)
rv = True
for remoteBinary in packageJson["remote_binary"]:
# Required Fields. If any Remote Binary is missing these fail the install.
binName = remoteBinary["name"]
binURL = remoteBinary["url"]
binHash = remoteBinary["sha256hash"]
logger.info(f"Attempting to download {binName} from {binURL}")
if not await download_remote_binary_to_path(binURL, binHash, path.join(pluginBinPath, binName)):
rv = False
raise Exception(f"Error Downloading Remote Binary {binName}@{binURL} with hash {binHash} to {path.join(pluginBinPath, binName)}")
else:
rv = True
logger.info(f"No Remote Binaries to Download")
except Exception as e:
rv = False
logger.debug(str(e))
return rv
"""Return the filename (only) for the specified plugin"""
def find_plugin_folder(self, name: str) -> str | None:
for folder in listdir(self.plugin_path):
try:
with open(path.join(self.plugin_path, folder, 'plugin.json'), "r", encoding="utf-8") as f:
plugin = json.load(f)
if plugin['name'] == name:
return folder
except:
logger.debug(f"skipping {folder}")
def set_plugin_dir_permissions(self, plugin_dir: str) -> bool:
plugin_json_path = path.join(plugin_dir, 'plugin.json')
logger.debug(f"Checking plugin.json at {plugin_json_path}")
root_plugin = False
if access(plugin_json_path, R_OK):
with open(plugin_json_path, "r", encoding="utf-8") as f:
plugin_json = json.load(f)
if "flags" in plugin_json and "root" in plugin_json["flags"]:
root_plugin = True
logger.debug("root_plugin %d, dir %s", root_plugin, plugin_dir)
if get_chown_plugin_path():
return chown(plugin_dir, UserType.EFFECTIVE_USER if root_plugin else UserType.HOST_USER, True) and chown(plugin_dir, UserType.EFFECTIVE_USER, False) and chmod(plugin_dir, 755) and chown(plugin_json_path, UserType.EFFECTIVE_USER, False) and chmod(plugin_json_path, 755)
else:
logger.debug("chown disabled by environment")
return True
async def uninstall_plugin(self, name: str):
if self.loader.watcher:
self.loader.watcher.disabled = True
plugin_folder = self.find_plugin_folder(name)
assert plugin_folder is not None
plugin_dir = path.join(self.plugin_path, plugin_folder)
try:
logger.info("uninstalling " + name)
logger.info(" at dir " + plugin_dir)
logger.debug("calling frontend unload for %s" % str(name))
await self.loader.ws.emit("loader/unload_plugin", name)
# plugins_snapshot = self.plugins.copy()
# snapshot_string = pformat(plugins_snapshot)
# logger.debug("current plugins: %s", snapshot_string)
if name in self.plugins:
logger.debug("Plugin %s was found", name)
await self.plugins[name].stop(uninstall=True)
logger.debug("Plugin %s was stopped", name)
del self.plugins[name]
logger.debug("Plugin %s was removed from the dictionary", name)
self.cleanup_plugin_settings(name)
logger.debug("removing files %s" % str(name))
rmtree(plugin_dir)
except FileNotFoundError:
logger.warning(f"Plugin {name} not installed, skipping uninstallation")
except Exception as e:
logger.error(f"Plugin {name} in {plugin_dir} was not uninstalled")
logger.error(f"Error at {str(e)}", exc_info=e)
finally:
if self.loader.watcher:
self.loader.watcher.disabled = False
async def _install(self, artifact: str, name: str, version: str, hash: str):
await self.loader.ws.emit("loader/plugin_download_start", name)
await self.loader.ws.emit("loader/plugin_download_info", 5, "Store.download_progress_info.start")
# Will be set later in code
res_zip = None
# Check if plugin was already installed before this
isInstalled = False
try:
pluginFolderPath = self.find_plugin_folder(name)
if pluginFolderPath:
isInstalled = True
except:
logger.error(f"Failed to determine if {name} is already installed, continuing anyway.")
# Preserve plugin order before removing plugin (uninstall alters the order and removes the plugin from the list)
current_plugin_order = self.settings.getSetting("pluginOrder")[:]
if self.loader.watcher:
self.loader.watcher.disabled = True
# Check if the file is a local file or a URL
if artifact.startswith("file://"):
logger.info(f"Installing {name} from local ZIP file (Version: {version})")
await self.loader.ws.emit("loader/plugin_download_info", 10, "Store.download_progress_info.open_zip")
res_zip = BytesIO(open(artifact[7:], "rb").read())
else:
logger.info(f"Installing {name} from URL (Version: {version})")
await self.loader.ws.emit("loader/plugin_download_info", 10, "Store.download_progress_info.download_zip")
async with ClientSession() as client:
logger.debug(f"Fetching {artifact}")
res = await client.get(artifact, ssl=get_ssl_context())
#TODO track progress of this download in chunks like with decky updates
#TODO but squish with min 15 and max 75
if res.status == 200:
logger.debug("Got 200. Reading...")
data = await res.read()
logger.debug(f"Read {len(data)} bytes")
res_zip = BytesIO(data)
else:
logger.fatal(f"Could not fetch from URL. {await res.text()}")
await self.loader.ws.emit("loader/plugin_download_info", 70, "Store.download_progress_info.increment_count")
storeUrl = ""
match self.settings.getSetting("store", 0):
case 0: storeUrl = "https://plugins.deckbrew.xyz/plugins" # default
case 1: storeUrl = "https://testing.deckbrew.xyz/plugins" # testing
case 2: storeUrl = self.settings.getSetting("store-url", "https://plugins.deckbrew.xyz/plugins") # custom
case _: storeUrl = "https://plugins.deckbrew.xyz/plugins"
logger.info(f"Incrementing installs for {name} from URL {storeUrl} (version {version})")
async with ClientSession() as client:
res = await client.post(storeUrl+f"/{name}/versions/{version}/increment?isUpdate={isInstalled}", ssl=get_ssl_context())
if res.status != 200:
logger.error(f"Server did not accept install count increment request. code: {res.status}")
await self.loader.ws.emit("loader/plugin_download_info", 75, "Store.download_progress_info.parse_zip")
if res_zip and version == "dev":
with ZipFile(res_zip) as plugin_zip:
plugin_json_list = [file for file in plugin_zip.namelist() if file.endswith("/plugin.json") and file.count("/") == 1]
if len(plugin_json_list) == 0:
logger.fatal("No plugin.json found in plugin ZIP")
return
elif len(plugin_json_list) > 1:
logger.fatal("Multiple plugin.json found in plugin ZIP")
return
else:
plugin_json_file = plugin_json_list[0]
name = sub(r"/.+$", "", plugin_json_file)
try:
with plugin_zip.open(plugin_json_file) as f:
plugin_json_data = json.loads(f.read().decode('utf-8'))
plugin_name_from_plugin_json = plugin_json_data.get('name')
if plugin_name_from_plugin_json and plugin_name_from_plugin_json.strip():
logger.info(f"Extracted plugin name from {plugin_json_file}: {plugin_name_from_plugin_json}")
name = plugin_name_from_plugin_json
else:
logger.warning(f"Nonexistent or invalid 'name' key value in {plugin_json_file}. Falling back to extracting from path.")
except Exception as e:
logger.error(f"Failed to read or parse {plugin_json_file}: {str(e)}. Falling back to extracting from path.")
# Check to make sure we got the file
if res_zip is None:
logger.fatal(f"Could not fetch {artifact}")
return
# If plugin is installed, uninstall it
if isInstalled:
await self.loader.ws.emit("loader/plugin_download_info", 80, "Store.download_progress_info.uninstalling_previous")
try:
logger.debug("Uninstalling existing plugin...")
await self.uninstall_plugin(name)
except:
logger.error(f"Plugin {name} could not be uninstalled.")
await self.loader.ws.emit("loader/plugin_download_info", 90, "Store.download_progress_info.installing_plugin")
# Install the plugin
logger.debug("Unzipping...")
ret = self._unzip_to_plugin_dir(res_zip, name, hash)
if ret:
plugin_folder = self.find_plugin_folder(name)
assert plugin_folder is not None
plugin_dir = path.join(self.plugin_path, plugin_folder)
await self.loader.ws.emit("loader/plugin_download_info", 95, "Store.download_progress_info.download_remote")
ret = await self._download_remote_binaries_for_plugin_with_name(plugin_dir)
chown_ret = self.set_plugin_dir_permissions(plugin_dir)
if ret:
logger.info(f"Installed {name} (Version: {version})")
if name in self.loader.plugins:
await self.loader.plugins[name].stop()
self.loader.plugins.pop(name, None)
await sleep(1)
if not isInstalled:
current_plugin_order = self.settings.getSetting("pluginOrder")
current_plugin_order.append(name)
self.settings.setSetting("pluginOrder", current_plugin_order)
logger.debug("Plugin %s was added to the pluginOrder setting", name)
await self.loader.import_plugin(path.join(plugin_dir, "main.py"), plugin_folder)
elif not chown_ret:
logger.error("Could not chown plugin")
return
else:
logger.error("Could not download remote binaries")
return
else:
logger.fatal(f"SHA-256 Mismatch!!!! {name} (Version: {version})")
if self.loader.watcher:
self.loader.watcher.disabled = False
await self.loader.ws.emit("loader/plugin_download_finish", name)
async def request_plugin_install(self, artifact: str, name: str, version: str, hash: str, install_type: PluginInstallType):
request_id = str(time())
self.install_requests[request_id] = PluginInstallContext(artifact, name, version, hash)
await self.loader.ws.emit("loader/add_plugin_install_prompt", name, version, request_id, hash, install_type)
async def request_multiple_plugin_installs(self, requests: List[PluginInstallRequest]):
request_id = str(time())
self.install_requests[request_id] = [PluginInstallContext(req['artifact'], req['name'], req['version'], req['hash']) for req in requests]
await self.loader.ws.emit("loader/add_multiple_plugins_install_prompt", request_id, requests)
async def confirm_plugin_install(self, request_id: str):
requestOrRequests = self.install_requests.pop(request_id)
if isinstance(requestOrRequests, list):
[await self._install(req.artifact, req.name, req.version, req.hash) for req in requestOrRequests]
else:
await self._install(requestOrRequests.artifact, requestOrRequests.name, requestOrRequests.version, requestOrRequests.hash)
def cancel_plugin_install(self, request_id: str):
self.install_requests.pop(request_id)
def cleanup_plugin_settings(self, name: str):
"""Removes any settings related to a plugin. Propably called when a plugin is uninstalled.
Args:
name (string): The name of the plugin
"""
frozen_plugins = self.settings.getSetting("frozenPlugins", [])
if name in frozen_plugins:
frozen_plugins.remove(name)
self.settings.setSetting("frozenPlugins", frozen_plugins)
hidden_plugins = self.settings.getSetting("hiddenPlugins", [])
if name in hidden_plugins:
hidden_plugins.remove(name)
self.settings.setSetting("hiddenPlugins", hidden_plugins)
plugin_order = self.settings.getSetting("pluginOrder", [])
if name in plugin_order:
plugin_order.remove(name)
self.settings.setSetting("pluginOrder", plugin_order)
disabled_plugins: List[str] = self.settings.getSetting("disabled_plugins", [])
if name in disabled_plugins:
disabled_plugins.remove(name)
self.settings.setSetting("disabled_plugins", disabled_plugins)
logger.debug("Removed any settings for plugin %s", name)