mirror of
https://github.com/SteamDeckHomebrew/decky-loader.git
synced 2026-06-13 12:15:09 +03:00
8987076c5f
This reverts commit 39f4f2870b , because functions (seemingly) don't run after the event loop closes, so the unload function is never actually run.
169 lines
7.5 KiB
Python
169 lines
7.5 KiB
Python
from os import path, environ
|
|
from signal import SIGINT, signal
|
|
from importlib.util import module_from_spec, spec_from_file_location
|
|
from json import dumps, loads
|
|
from logging import getLogger
|
|
from sys import exit, path as syspath, modules as sysmodules
|
|
from traceback import format_exc
|
|
from asyncio import (get_event_loop, new_event_loop,
|
|
set_event_loop, sleep)
|
|
|
|
from .messages import SocketResponseDict, SocketMessageType
|
|
from ..localplatform.localsocket import LocalSocket
|
|
from ..localplatform.localplatform import setgid, setuid, get_username, get_home_path
|
|
from ..customtypes import UserType
|
|
from .. import helpers
|
|
|
|
from typing import List, TypeVar, Type
|
|
|
|
DataType = TypeVar("DataType")
|
|
|
|
class SandboxedPlugin:
|
|
def __init__(self,
|
|
name: str,
|
|
passive: bool,
|
|
flags: List[str],
|
|
file: str,
|
|
plugin_directory: str,
|
|
plugin_path: str,
|
|
version: str|None,
|
|
author: str,
|
|
api_version: int) -> None:
|
|
self.name = name
|
|
self.passive = passive
|
|
self.flags = flags
|
|
self.file = file
|
|
self.plugin_path = plugin_path
|
|
self.plugin_directory = plugin_directory
|
|
self.version = version
|
|
self.author = author
|
|
self.api_version = api_version
|
|
|
|
self.log = getLogger("plugin")
|
|
|
|
def initialize(self, socket: LocalSocket):
|
|
self._socket = socket
|
|
|
|
try:
|
|
signal(SIGINT, lambda s, f: exit(0))
|
|
|
|
set_event_loop(new_event_loop())
|
|
if self.passive:
|
|
return
|
|
setgid(UserType.ROOT if "root" in self.flags else UserType.HOST_USER)
|
|
setuid(UserType.ROOT if "root" in self.flags else UserType.HOST_USER)
|
|
# export a bunch of environment variables to help plugin developers
|
|
environ["HOME"] = get_home_path(UserType.ROOT if "root" in self.flags else UserType.HOST_USER)
|
|
environ["USER"] = "root" if "root" in self.flags else get_username()
|
|
environ["DECKY_VERSION"] = helpers.get_loader_version()
|
|
environ["DECKY_USER"] = get_username()
|
|
environ["DECKY_USER_HOME"] = helpers.get_home_path()
|
|
environ["DECKY_HOME"] = helpers.get_homebrew_path()
|
|
environ["DECKY_PLUGIN_SETTINGS_DIR"] = path.join(environ["DECKY_HOME"], "settings", self.plugin_directory)
|
|
helpers.mkdir_as_user(path.join(environ["DECKY_HOME"], "settings"))
|
|
helpers.mkdir_as_user(environ["DECKY_PLUGIN_SETTINGS_DIR"])
|
|
environ["DECKY_PLUGIN_RUNTIME_DIR"] = path.join(environ["DECKY_HOME"], "data", self.plugin_directory)
|
|
helpers.mkdir_as_user(path.join(environ["DECKY_HOME"], "data"))
|
|
helpers.mkdir_as_user(environ["DECKY_PLUGIN_RUNTIME_DIR"])
|
|
environ["DECKY_PLUGIN_LOG_DIR"] = path.join(environ["DECKY_HOME"], "logs", self.plugin_directory)
|
|
helpers.mkdir_as_user(path.join(environ["DECKY_HOME"], "logs"))
|
|
helpers.mkdir_as_user(environ["DECKY_PLUGIN_LOG_DIR"])
|
|
environ["DECKY_PLUGIN_DIR"] = path.join(self.plugin_path, self.plugin_directory)
|
|
environ["DECKY_PLUGIN_NAME"] = self.name
|
|
if self.version:
|
|
environ["DECKY_PLUGIN_VERSION"] = self.version
|
|
environ["DECKY_PLUGIN_AUTHOR"] = self.author
|
|
|
|
# append the plugin's `py_modules` to the recognized python paths
|
|
syspath.append(path.join(environ["DECKY_PLUGIN_DIR"], "py_modules"))
|
|
|
|
#TODO: FIX IN A LESS CURSED WAY
|
|
keys = [key for key in sysmodules if key.startswith("decky_loader.")]
|
|
for key in keys:
|
|
sysmodules[key.replace("decky_loader.", "")] = sysmodules[key]
|
|
|
|
from .imports import decky
|
|
async def emit_message(event: str, data: DataType | None = None, data_type: Type[DataType] | None = None) -> None:
|
|
await self._socket.write_single_line_server(dumps({
|
|
"type": SocketMessageType.EVENT,
|
|
"event": event,
|
|
"data": data
|
|
}))
|
|
# copy the docstring over so we don't have to duplicate it
|
|
emit_message.__doc__ = decky.emit_message.__doc__
|
|
decky.emit_message = emit_message
|
|
sysmodules["decky"] = decky
|
|
# provided for compatibility
|
|
sysmodules["decky_plugin"] = decky
|
|
|
|
spec = spec_from_file_location("_", self.file)
|
|
assert spec is not None
|
|
module = module_from_spec(spec)
|
|
assert spec.loader is not None
|
|
spec.loader.exec_module(module)
|
|
# TODO fix self weirdness once plugin.json versioning is done. need this before WS release!
|
|
if self.api_version > 0:
|
|
self.Plugin = module.Plugin()
|
|
else:
|
|
self.Plugin = module.Plugin
|
|
|
|
if hasattr(self.Plugin, "_migration"):
|
|
if self.api_version > 0:
|
|
get_event_loop().run_until_complete(self.Plugin._migration())
|
|
else:
|
|
get_event_loop().run_until_complete(self.Plugin._migration(self.Plugin))
|
|
if hasattr(self.Plugin, "_main"):
|
|
if self.api_version > 0:
|
|
get_event_loop().create_task(self.Plugin._main())
|
|
else:
|
|
get_event_loop().create_task(self.Plugin._main(self.Plugin))
|
|
get_event_loop().create_task(socket.setup_server())
|
|
get_event_loop().run_forever()
|
|
except:
|
|
self.log.error("Failed to start " + self.name + "!\n" + format_exc())
|
|
exit(0)
|
|
|
|
async def _unload(self):
|
|
try:
|
|
self.log.info("Attempting to unload with plugin " + self.name + "'s \"_unload\" function.\n")
|
|
if hasattr(self.Plugin, "_unload"):
|
|
if self.api_version > 0:
|
|
await self.Plugin._unload()
|
|
else:
|
|
await self.Plugin._unload(self.Plugin)
|
|
self.log.info("Unloaded " + self.name + "\n")
|
|
else:
|
|
self.log.info("Could not find \"_unload\" in " + self.name + "'s main.py" + "\n")
|
|
except:
|
|
self.log.error("Failed to unload " + self.name + "!\n" + format_exc())
|
|
exit(0)
|
|
|
|
async def on_new_message(self, message : str) -> str | None:
|
|
data = loads(message)
|
|
|
|
if "stop" in data:
|
|
self.log.info("Calling Loader unload function.")
|
|
await self._unload()
|
|
get_event_loop().stop()
|
|
while get_event_loop().is_running():
|
|
await sleep(0)
|
|
get_event_loop().close()
|
|
raise Exception("Closing message listener")
|
|
|
|
d: SocketResponseDict = {"type": SocketMessageType.RESPONSE, "res": None, "success": True, "id": data["id"]}
|
|
try:
|
|
if data["legacy"]:
|
|
if self.api_version > 0:
|
|
raise Exception("Legacy methods may not be used on api_version > 0")
|
|
# Legacy kwargs
|
|
d["res"] = await getattr(self.Plugin, data["method"])(self.Plugin, **data["args"])
|
|
else:
|
|
if self.api_version < 1 :
|
|
raise Exception("api_version 1 or newer is required to call methods with index-based arguments")
|
|
# New args
|
|
d["res"] = await getattr(self.Plugin, data["method"])(*data["args"])
|
|
except Exception as e:
|
|
d["res"] = str(e)
|
|
d["success"] = False
|
|
finally:
|
|
return dumps(d, ensure_ascii=False) |