mirror of
https://github.com/SteamDeckHomebrew/decky-loader.git
synced 2026-06-13 04:05:04 +03:00
f5eba51c52
sadly requires the `setproctitle` pypi module because python doesn't have a builtin to do this :/
189 lines
8.0 KiB
Python
189 lines
8.0 KiB
Python
from os import path, environ
|
|
from signal import SIG_IGN, SIGINT, SIGTERM, signal
|
|
from importlib.util import module_from_spec, spec_from_file_location
|
|
from json import dumps, loads
|
|
from logging import getLogger
|
|
from sys import exit, path as syspath, modules as sysmodules
|
|
from traceback import format_exc
|
|
from asyncio import (get_event_loop, new_event_loop,
|
|
set_event_loop, sleep)
|
|
from setproctitle import setproctitle, setthreadtitle
|
|
|
|
from .messages import SocketResponseDict, SocketMessageType
|
|
from ..localplatform.localsocket import LocalSocket
|
|
from ..localplatform.localplatform import setgid, setuid, get_username, get_home_path
|
|
from ..enums import UserType
|
|
from .. import helpers
|
|
|
|
from typing import List, TypeVar, Any
|
|
|
|
DataType = TypeVar("DataType")
|
|
|
|
class SandboxedPlugin:
|
|
def __init__(self,
|
|
name: str,
|
|
passive: bool,
|
|
flags: List[str],
|
|
file: str,
|
|
plugin_directory: str,
|
|
plugin_path: str,
|
|
version: str|None,
|
|
author: str,
|
|
api_version: int) -> None:
|
|
self.name = name
|
|
self.passive = passive
|
|
self.flags = flags
|
|
self.file = file
|
|
self.plugin_path = plugin_path
|
|
self.plugin_directory = plugin_directory
|
|
self.version = version
|
|
self.author = author
|
|
self.api_version = api_version
|
|
|
|
self.log = getLogger("sandboxed_plugin")
|
|
|
|
def initialize(self, socket: LocalSocket):
|
|
self._socket = socket
|
|
|
|
try:
|
|
signal(SIGINT, SIG_IGN)
|
|
signal(SIGTERM, SIG_IGN)
|
|
|
|
setproctitle(f"{self.name} ({self.file})")
|
|
setthreadtitle(self.name)
|
|
|
|
set_event_loop(new_event_loop())
|
|
if self.passive:
|
|
return
|
|
setgid(UserType.ROOT if "root" in self.flags else UserType.HOST_USER)
|
|
setuid(UserType.ROOT if "root" in self.flags else UserType.HOST_USER)
|
|
# export a bunch of environment variables to help plugin developers
|
|
environ["HOME"] = get_home_path(UserType.ROOT if "root" in self.flags else UserType.HOST_USER)
|
|
environ["USER"] = "root" if "root" in self.flags else get_username()
|
|
environ["DECKY_VERSION"] = helpers.get_loader_version()
|
|
environ["DECKY_USER"] = get_username()
|
|
environ["DECKY_USER_HOME"] = helpers.get_home_path()
|
|
environ["DECKY_HOME"] = helpers.get_homebrew_path()
|
|
environ["DECKY_PLUGIN_SETTINGS_DIR"] = path.join(environ["DECKY_HOME"], "settings", self.plugin_directory)
|
|
environ["DECKY_PLUGIN_RUNTIME_DIR"] = path.join(environ["DECKY_HOME"], "data", self.plugin_directory)
|
|
environ["DECKY_PLUGIN_LOG_DIR"] = path.join(environ["DECKY_HOME"], "logs", self.plugin_directory)
|
|
environ["DECKY_PLUGIN_DIR"] = path.join(self.plugin_path, self.plugin_directory)
|
|
environ["DECKY_PLUGIN_NAME"] = self.name
|
|
if self.version:
|
|
environ["DECKY_PLUGIN_VERSION"] = self.version
|
|
environ["DECKY_PLUGIN_AUTHOR"] = self.author
|
|
|
|
# append the plugin's `py_modules` to the recognized python paths
|
|
syspath.append(path.join(environ["DECKY_PLUGIN_DIR"], "py_modules"))
|
|
|
|
#TODO: FIX IN A LESS CURSED WAY
|
|
keys = [key for key in sysmodules if key.startswith("decky_loader.")]
|
|
for key in keys:
|
|
sysmodules[key.replace("decky_loader.", "")] = sysmodules[key]
|
|
|
|
from .imports import decky
|
|
async def emit(event: str, *args: Any) -> None:
|
|
await self._socket.write_single_line_server(dumps({
|
|
"type": SocketMessageType.EVENT,
|
|
"event": event,
|
|
"args": args
|
|
}))
|
|
# copy the docstring over so we don't have to duplicate it
|
|
emit.__doc__ = decky.emit.__doc__
|
|
decky.emit = emit
|
|
sysmodules["decky"] = decky
|
|
# provided for compatibility
|
|
sysmodules["decky_plugin"] = decky
|
|
|
|
spec = spec_from_file_location("_", self.file)
|
|
assert spec is not None
|
|
module = module_from_spec(spec)
|
|
assert spec.loader is not None
|
|
spec.loader.exec_module(module)
|
|
# TODO fix self weirdness once plugin.json versioning is done. need this before WS release!
|
|
if self.api_version > 0:
|
|
self.Plugin = module.Plugin()
|
|
else:
|
|
self.Plugin = module.Plugin
|
|
|
|
if hasattr(self.Plugin, "_migration"):
|
|
if self.api_version > 0:
|
|
get_event_loop().run_until_complete(self.Plugin._migration())
|
|
else:
|
|
get_event_loop().run_until_complete(self.Plugin._migration(self.Plugin))
|
|
if hasattr(self.Plugin, "_main"):
|
|
if self.api_version > 0:
|
|
get_event_loop().create_task(self.Plugin._main())
|
|
else:
|
|
get_event_loop().create_task(self.Plugin._main(self.Plugin))
|
|
get_event_loop().create_task(socket.setup_server())
|
|
except:
|
|
self.log.error("Failed to start " + self.name + "!\n" + format_exc())
|
|
exit(0)
|
|
get_event_loop().run_forever()
|
|
|
|
async def _unload(self):
|
|
try:
|
|
self.log.info("Attempting to unload with plugin " + self.name + "'s \"_unload\" function.\n")
|
|
if hasattr(self.Plugin, "_unload"):
|
|
if self.api_version > 0:
|
|
await self.Plugin._unload()
|
|
else:
|
|
await self.Plugin._unload(self.Plugin)
|
|
self.log.info("Unloaded " + self.name + "\n")
|
|
else:
|
|
self.log.info("Could not find \"_unload\" in " + self.name + "'s main.py" + "\n")
|
|
except:
|
|
self.log.error("Failed to unload " + self.name + "!\n" + format_exc())
|
|
pass
|
|
|
|
async def _uninstall(self):
|
|
try:
|
|
self.log.info("Attempting to uninstall with plugin " + self.name + "'s \"_uninstall\" function.\n")
|
|
if hasattr(self.Plugin, "_uninstall"):
|
|
if self.api_version > 0:
|
|
await self.Plugin._uninstall()
|
|
else:
|
|
await self.Plugin._uninstall(self.Plugin)
|
|
self.log.info("Uninstalled " + self.name + "\n")
|
|
else:
|
|
self.log.info("Could not find \"_uninstall\" in " + self.name + "'s main.py" + "\n")
|
|
except:
|
|
self.log.error("Failed to uninstall " + self.name + "!\n" + format_exc())
|
|
pass
|
|
|
|
async def on_new_message(self, message : str) -> str|None:
|
|
data = loads(message)
|
|
|
|
if "stop" in data:
|
|
self.log.info(f"Calling Loader unload function for {self.name}.")
|
|
await self._unload()
|
|
|
|
if data.get('uninstall'):
|
|
self.log.info("Calling Loader uninstall function.")
|
|
await self._uninstall()
|
|
|
|
get_event_loop().stop()
|
|
while get_event_loop().is_running():
|
|
await sleep(0.1)
|
|
get_event_loop().close()
|
|
exit(0)
|
|
|
|
d: SocketResponseDict = {"type": SocketMessageType.RESPONSE, "res": None, "success": True, "id": data["id"]}
|
|
try:
|
|
if data.get("legacy"):
|
|
if self.api_version > 0:
|
|
raise Exception("Legacy methods may not be used on api_version > 0")
|
|
# Legacy kwargs
|
|
d["res"] = await getattr(self.Plugin, data["method"])(self.Plugin, **data["args"])
|
|
else:
|
|
if self.api_version < 1 :
|
|
raise Exception("api_version 1 or newer is required to call methods with index-based arguments")
|
|
# New args
|
|
d["res"] = await getattr(self.Plugin, data["method"])(*data["args"])
|
|
except Exception as e:
|
|
d["res"] = str(e)
|
|
d["success"] = False
|
|
finally:
|
|
return dumps(d, ensure_ascii=False)
|