mirror of
https://github.com/SteamDeckHomebrew/decky-loader.git
synced 2026-06-13 12:15:09 +03:00
20094c5f75
Uses environment variables instead of hard coding the "deck" user/group. This adds support for systems other than the steam deck that are using the DeckUI. * Use Environment Variables * Use method to get USER from a systemd root process * Fix imports. Add get_user and get_user_group methods in helpers.py. Removed duplicated code * Add separate setters/getters for user vars. Ensure sleep prevents race condition of user setter in while loop
132 lines
5.3 KiB
Python
132 lines
5.3 KiB
Python
# Full imports
|
|
import json
|
|
|
|
# Partial imports
|
|
from aiohttp import ClientSession, web
|
|
from asyncio import get_event_loop
|
|
from concurrent.futures import ProcessPoolExecutor
|
|
from hashlib import sha256
|
|
from io import BytesIO
|
|
from logging import getLogger
|
|
from os import path, rename, listdir
|
|
from shutil import rmtree
|
|
from subprocess import call
|
|
from time import time
|
|
from zipfile import ZipFile
|
|
|
|
# Local modules
|
|
from helpers import get_ssl_context, get_user, get_user_group
|
|
from injector import get_tab, inject_to_tab
|
|
|
|
class PluginInstallContext:
|
|
def __init__(self, artifact, name, version, hash) -> None:
|
|
self.artifact = artifact
|
|
self.name = name
|
|
self.version = version
|
|
self.hash = hash
|
|
|
|
class PluginBrowser:
|
|
def __init__(self, plugin_path, server_instance, plugins) -> None:
|
|
self.log = getLogger("browser")
|
|
self.plugin_path = plugin_path
|
|
self.plugins = plugins
|
|
self.install_requests = {}
|
|
|
|
server_instance.add_routes([
|
|
web.post("/browser/install_plugin", self.install_plugin),
|
|
web.post("/browser/uninstall_plugin", self.uninstall_plugin)
|
|
])
|
|
|
|
def _unzip_to_plugin_dir(self, zip, name, hash):
|
|
zip_hash = sha256(zip.getbuffer()).hexdigest()
|
|
if hash and (zip_hash != hash):
|
|
return False
|
|
zip_file = ZipFile(zip)
|
|
zip_file.extractall(self.plugin_path)
|
|
code_chown = call(["chown", "-R", get_user()+":"+get_user_group(), self.plugin_path])
|
|
code_chmod = call(["chmod", "-R", "555", self.plugin_path])
|
|
if code_chown != 0 or code_chmod != 0:
|
|
logger.error(f"chown/chmod exited with a non-zero exit code (chown: {code_chown}, chmod: {code_chmod})")
|
|
return False
|
|
return True
|
|
|
|
def find_plugin_folder(self, name):
|
|
for folder in listdir(self.plugin_path):
|
|
try:
|
|
with open(path.join(self.plugin_path, folder, 'plugin.json'), 'r') as f:
|
|
plugin = json.load(f)
|
|
|
|
if plugin['name'] == name:
|
|
return path.join(self.plugin_path, folder)
|
|
except:
|
|
self.log.debug(f"skipping {folder}")
|
|
|
|
async def uninstall_plugin(self, name):
|
|
tab = await get_tab("SP")
|
|
|
|
try:
|
|
if type(name) != str:
|
|
data = await name.post()
|
|
name = data.get("name", "undefined")
|
|
self.log.info("uninstalling " + name)
|
|
self.log.info(" at dir " + self.find_plugin_folder(name))
|
|
await tab.evaluate_js(f"DeckyPluginLoader.unloadPlugin('{name}')")
|
|
if self.plugins[name]:
|
|
self.plugins[name].stop()
|
|
self.plugins.pop(name, None)
|
|
rmtree(self.find_plugin_folder(name))
|
|
except FileNotFoundError:
|
|
self.log.warning(f"Plugin {name} not installed, skipping uninstallation")
|
|
|
|
return web.Response(text="Requested plugin uninstall")
|
|
|
|
async def _install(self, artifact, name, version, hash):
|
|
try:
|
|
await self.uninstall_plugin(name)
|
|
except:
|
|
self.log.error(f"Plugin {name} not installed, skipping uninstallation")
|
|
self.log.info(f"Installing {name} (Version: {version})")
|
|
async with ClientSession() as client:
|
|
self.log.debug(f"Fetching {artifact}")
|
|
res = await client.get(artifact, ssl=get_ssl_context())
|
|
if res.status == 200:
|
|
self.log.debug("Got 200. Reading...")
|
|
data = await res.read()
|
|
self.log.debug(f"Read {len(data)} bytes")
|
|
res_zip = BytesIO(data)
|
|
with ProcessPoolExecutor() as executor:
|
|
self.log.debug("Unzipping...")
|
|
ret = await get_event_loop().run_in_executor(
|
|
executor,
|
|
self._unzip_to_plugin_dir,
|
|
res_zip,
|
|
name,
|
|
hash
|
|
)
|
|
if ret:
|
|
self.log.info(f"Installed {name} (Version: {version})")
|
|
await inject_to_tab("SP", "window.syncDeckyPlugins()")
|
|
else:
|
|
self.log.fatal(f"SHA-256 Mismatch!!!! {name} (Version: {version})")
|
|
else:
|
|
self.log.fatal(f"Could not fetch from URL. {await res.text()}")
|
|
|
|
async def install_plugin(self, request):
|
|
data = await request.post()
|
|
get_event_loop().create_task(self.request_plugin_install(data.get("artifact", ""), data.get("name", "No name"), data.get("version", "dev"), data.get("hash", False)))
|
|
return web.Response(text="Requested plugin install")
|
|
|
|
async def request_plugin_install(self, artifact, name, version, hash):
|
|
request_id = str(time())
|
|
self.install_requests[request_id] = PluginInstallContext(artifact, name, version, hash)
|
|
tab = await get_tab("SP")
|
|
await tab.open_websocket()
|
|
await tab.evaluate_js(f"DeckyPluginLoader.addPluginInstallPrompt('{name}', '{version}', '{request_id}', '{hash}')")
|
|
|
|
async def confirm_plugin_install(self, request_id):
|
|
request = self.install_requests.pop(request_id)
|
|
await self._install(request.artifact, request.name, request.version, request.hash)
|
|
|
|
def cancel_plugin_install(self, request_id):
|
|
self.install_requests.pop(request_id)
|