mirror of
https://github.com/SteamDeckHomebrew/decky-loader.git
synced 2026-06-13 04:05:04 +03:00
5a633fdd82
* fix: get rid of title view jank on latest beta * Count the number of installs for each plugin (#557) * Bump aiohttp from 3.8.4 to 3.8.5 in /backend (#558) * fix: include Decky version in request for index.js This avoids the If-Modified-Since logic in aiohttp and ensures Steam doesn't cache old JS, even if the timestamps are normalized. * fix: clean up shellcheck warnings in act runner script * fix: gitignore settings/ * fix: ensure state directories exist when running without the installer * feat: determine root directory correctly when running from in-tree * fix: fix typo in CI script * refactor: build a proper Python package with poetry * refactor: move decky_plugin under the poetry structure There's no need to special case it anymore, just treat it like any other Python module. * sandboxed_plugin: better fix, attempt 2 --------- Co-authored-by: AAGaming <aagaming@riseup.net> Co-authored-by: Party Wumpus <48649272+PartyWumpus@users.noreply.github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
374 lines
14 KiB
Python
374 lines
14 KiB
Python
from __future__ import annotations
|
|
from os import stat_result
|
|
import uuid
|
|
from json.decoder import JSONDecodeError
|
|
from os.path import splitext
|
|
import re
|
|
from traceback import format_exc
|
|
from stat import FILE_ATTRIBUTE_HIDDEN # type: ignore
|
|
|
|
from asyncio import StreamReader, StreamWriter, start_server, gather, open_connection
|
|
from aiohttp import ClientSession, web
|
|
from typing import TYPE_CHECKING, Callable, Coroutine, Dict, Any, List, TypedDict
|
|
|
|
from logging import getLogger
|
|
from pathlib import Path
|
|
|
|
from .browser import PluginInstallRequest, PluginInstallType
|
|
if TYPE_CHECKING:
|
|
from .main import PluginManager
|
|
from .injector import inject_to_tab, get_gamepadui_tab, close_old_tabs, get_tab
|
|
from .localplatform.localplatform import ON_WINDOWS
|
|
from . import helpers
|
|
from .localplatform.localplatform import service_stop, service_start, get_home_path, get_username
|
|
|
|
class FilePickerObj(TypedDict):
|
|
file: Path
|
|
filest: stat_result
|
|
is_dir: bool
|
|
|
|
class Utilities:
|
|
def __init__(self, context: PluginManager) -> None:
|
|
self.context = context
|
|
self.util_methods: Dict[str, Callable[..., Coroutine[Any, Any, Any]]] = {
|
|
"ping": self.ping,
|
|
"http_request": self.http_request,
|
|
"install_plugin": self.install_plugin,
|
|
"install_plugins": self.install_plugins,
|
|
"cancel_plugin_install": self.cancel_plugin_install,
|
|
"confirm_plugin_install": self.confirm_plugin_install,
|
|
"uninstall_plugin": self.uninstall_plugin,
|
|
"execute_in_tab": self.execute_in_tab,
|
|
"inject_css_into_tab": self.inject_css_into_tab,
|
|
"remove_css_from_tab": self.remove_css_from_tab,
|
|
"allow_remote_debugging": self.allow_remote_debugging,
|
|
"disallow_remote_debugging": self.disallow_remote_debugging,
|
|
"set_setting": self.set_setting,
|
|
"get_setting": self.get_setting,
|
|
"filepicker_ls": self.filepicker_ls,
|
|
"disable_rdt": self.disable_rdt,
|
|
"enable_rdt": self.enable_rdt,
|
|
"get_tab_id": self.get_tab_id,
|
|
"get_user_info": self.get_user_info,
|
|
}
|
|
|
|
self.logger = getLogger("Utilities")
|
|
|
|
self.rdt_proxy_server = None
|
|
self.rdt_script_id = None
|
|
self.rdt_proxy_task = None
|
|
|
|
if context:
|
|
context.web_app.add_routes([
|
|
web.post("/methods/{method_name}", self._handle_server_method_call)
|
|
])
|
|
|
|
async def _handle_server_method_call(self, request: web.Request):
|
|
method_name = request.match_info["method_name"]
|
|
try:
|
|
args = await request.json()
|
|
except JSONDecodeError:
|
|
args = {}
|
|
res = {}
|
|
try:
|
|
r = await self.util_methods[method_name](**args)
|
|
res["result"] = r
|
|
res["success"] = True
|
|
except Exception as e:
|
|
res["result"] = str(e)
|
|
res["success"] = False
|
|
return web.json_response(res)
|
|
|
|
async def install_plugin(self, artifact: str="", name: str="No name", version: str="dev", hash: str="", install_type: PluginInstallType=PluginInstallType.INSTALL):
|
|
return await self.context.plugin_browser.request_plugin_install(
|
|
artifact=artifact,
|
|
name=name,
|
|
version=version,
|
|
hash=hash,
|
|
install_type=install_type
|
|
)
|
|
|
|
async def install_plugins(self, requests: List[PluginInstallRequest]):
|
|
return await self.context.plugin_browser.request_multiple_plugin_installs(
|
|
requests=requests
|
|
)
|
|
|
|
async def confirm_plugin_install(self, request_id: str):
|
|
return await self.context.plugin_browser.confirm_plugin_install(request_id)
|
|
|
|
async def cancel_plugin_install(self, request_id: str):
|
|
return self.context.plugin_browser.cancel_plugin_install(request_id)
|
|
|
|
async def uninstall_plugin(self, name: str):
|
|
return await self.context.plugin_browser.uninstall_plugin(name)
|
|
|
|
async def http_request(self, method: str="", url: str="", **kwargs: Any):
|
|
async with ClientSession() as web:
|
|
res = await web.request(method, url, ssl=helpers.get_ssl_context(), **kwargs)
|
|
text = await res.text()
|
|
return {
|
|
"status": res.status,
|
|
"headers": dict(res.headers),
|
|
"body": text
|
|
}
|
|
|
|
async def ping(self, **kwargs: Any):
|
|
return "pong"
|
|
|
|
async def execute_in_tab(self, tab: str, run_async: bool, code: str):
|
|
try:
|
|
result = await inject_to_tab(tab, code, run_async)
|
|
assert result
|
|
if "exceptionDetails" in result["result"]:
|
|
return {
|
|
"success": False,
|
|
"result": result["result"]
|
|
}
|
|
|
|
return {
|
|
"success": True,
|
|
"result": result["result"]["result"].get("value")
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
"success": False,
|
|
"result": e
|
|
}
|
|
|
|
async def inject_css_into_tab(self, tab: str, style: str):
|
|
try:
|
|
css_id = str(uuid.uuid4())
|
|
|
|
result = await inject_to_tab(tab,
|
|
f"""
|
|
(function() {{
|
|
const style = document.createElement('style');
|
|
style.id = "{css_id}";
|
|
document.head.append(style);
|
|
style.textContent = `{style}`;
|
|
}})()
|
|
""", False)
|
|
|
|
if result and "exceptionDetails" in result["result"]:
|
|
return {
|
|
"success": False,
|
|
"result": result["result"]
|
|
}
|
|
|
|
return {
|
|
"success": True,
|
|
"result": css_id
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
"success": False,
|
|
"result": e
|
|
}
|
|
|
|
async def remove_css_from_tab(self, tab: str, css_id: str):
|
|
try:
|
|
result = await inject_to_tab(tab,
|
|
f"""
|
|
(function() {{
|
|
let style = document.getElementById("{css_id}");
|
|
|
|
if (style.nodeName.toLowerCase() == 'style')
|
|
style.parentNode.removeChild(style);
|
|
}})()
|
|
""", False)
|
|
|
|
if result and "exceptionDetails" in result["result"]:
|
|
return {
|
|
"success": False,
|
|
"result": result
|
|
}
|
|
|
|
return {
|
|
"success": True
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
"success": False,
|
|
"result": e
|
|
}
|
|
|
|
async def get_setting(self, key: str, default: Any):
|
|
return self.context.settings.getSetting(key, default)
|
|
|
|
async def set_setting(self, key: str, value: Any):
|
|
return self.context.settings.setSetting(key, value)
|
|
|
|
async def allow_remote_debugging(self):
|
|
await service_start(helpers.REMOTE_DEBUGGER_UNIT)
|
|
return True
|
|
|
|
async def disallow_remote_debugging(self):
|
|
await service_stop(helpers.REMOTE_DEBUGGER_UNIT)
|
|
return True
|
|
|
|
async def filepicker_ls(self,
|
|
path : str | None = None,
|
|
include_files: bool = True,
|
|
include_folders: bool = True,
|
|
include_ext: list[str] = [],
|
|
include_hidden: bool = False,
|
|
order_by: str = "name_asc",
|
|
filter_for: str | None = None,
|
|
page: int = 1,
|
|
max: int = 1000):
|
|
|
|
if path == None:
|
|
path = get_home_path()
|
|
|
|
path_obj = Path(path).resolve()
|
|
|
|
files: List[FilePickerObj] = []
|
|
folders: List[FilePickerObj] = []
|
|
|
|
#Resolving all files/folders in the requested directory
|
|
for file in path_obj.iterdir():
|
|
if file.exists():
|
|
filest = file.stat()
|
|
is_hidden = file.name.startswith('.')
|
|
if ON_WINDOWS and not is_hidden:
|
|
is_hidden = bool(filest.st_file_attributes & FILE_ATTRIBUTE_HIDDEN) # type: ignore
|
|
if include_folders and file.is_dir():
|
|
if (is_hidden and include_hidden) or not is_hidden:
|
|
folders.append({"file": file, "filest": filest, "is_dir": True})
|
|
elif include_files:
|
|
# Handle requested extensions if present
|
|
if len(include_ext) == 0 or 'all_files' in include_ext \
|
|
or splitext(file.name)[1].lstrip('.') in include_ext:
|
|
if (is_hidden and include_hidden) or not is_hidden:
|
|
files.append({"file": file, "filest": filest, "is_dir": False})
|
|
# Filter logic
|
|
if filter_for is not None:
|
|
try:
|
|
if re.compile(filter_for):
|
|
files = list(filter(lambda file: re.search(filter_for, file["file"].name) != None, files))
|
|
except re.error:
|
|
files = list(filter(lambda file: file["file"].name.find(filter_for) != -1, files))
|
|
|
|
# Ordering logic
|
|
ord_arg = order_by.split("_")
|
|
ord = ord_arg[0]
|
|
rev = True if ord_arg[1] == "asc" else False
|
|
match ord:
|
|
case 'name':
|
|
files.sort(key=lambda x: x['file'].name.casefold(), reverse = rev)
|
|
folders.sort(key=lambda x: x['file'].name.casefold(), reverse = rev)
|
|
case 'modified':
|
|
files.sort(key=lambda x: x['filest'].st_mtime, reverse = not rev)
|
|
folders.sort(key=lambda x: x['filest'].st_mtime, reverse = not rev)
|
|
case 'created':
|
|
files.sort(key=lambda x: x['filest'].st_ctime, reverse = not rev)
|
|
folders.sort(key=lambda x: x['filest'].st_ctime, reverse = not rev)
|
|
case 'size':
|
|
files.sort(key=lambda x: x['filest'].st_size, reverse = not rev)
|
|
# Folders has no file size, order by name instead
|
|
folders.sort(key=lambda x: x['file'].name.casefold())
|
|
case _:
|
|
files.sort(key=lambda x: x['file'].name.casefold(), reverse = rev)
|
|
folders.sort(key=lambda x: x['file'].name.casefold(), reverse = rev)
|
|
|
|
#Constructing the final file list, folders first
|
|
all = [{
|
|
"isdir": x['is_dir'],
|
|
"name": str(x['file'].name),
|
|
"realpath": str(x['file']),
|
|
"size": x['filest'].st_size,
|
|
"modified": x['filest'].st_mtime,
|
|
"created": x['filest'].st_ctime,
|
|
} for x in folders + files ]
|
|
|
|
return {
|
|
"realpath": str(path),
|
|
"files": all[(page-1)*max:(page)*max],
|
|
"total": len(all),
|
|
}
|
|
|
|
|
|
# Based on https://stackoverflow.com/a/46422554/13174603
|
|
def start_rdt_proxy(self, ip: str, port: int):
|
|
async def pipe(reader: StreamReader, writer: StreamWriter):
|
|
try:
|
|
while not reader.at_eof():
|
|
writer.write(await reader.read(2048))
|
|
finally:
|
|
writer.close()
|
|
async def handle_client(local_reader: StreamReader, local_writer: StreamWriter):
|
|
try:
|
|
remote_reader, remote_writer = await open_connection(
|
|
ip, port)
|
|
pipe1 = pipe(local_reader, remote_writer)
|
|
pipe2 = pipe(remote_reader, local_writer)
|
|
await gather(pipe1, pipe2)
|
|
finally:
|
|
local_writer.close()
|
|
|
|
self.rdt_proxy_server = start_server(handle_client, "127.0.0.1", port)
|
|
self.rdt_proxy_task = self.context.loop.create_task(self.rdt_proxy_server)
|
|
|
|
def stop_rdt_proxy(self):
|
|
if self.rdt_proxy_server != None:
|
|
self.rdt_proxy_server.close()
|
|
if self.rdt_proxy_task:
|
|
self.rdt_proxy_task.cancel()
|
|
|
|
async def _enable_rdt(self):
|
|
# TODO un-hardcode port
|
|
try:
|
|
self.stop_rdt_proxy()
|
|
ip = self.context.settings.getSetting("developer.rdt.ip", None)
|
|
|
|
if ip != None:
|
|
self.logger.info("Connecting to React DevTools at " + ip)
|
|
async with ClientSession() as web:
|
|
res = await web.request("GET", "http://" + ip + ":8097", ssl=helpers.get_ssl_context())
|
|
script = """
|
|
if (!window.deckyHasConnectedRDT) {
|
|
window.deckyHasConnectedRDT = true;
|
|
// This fixes the overlay when hovering over an element in RDT
|
|
Object.defineProperty(window, '__REACT_DEVTOOLS_TARGET_WINDOW__', {
|
|
enumerable: true,
|
|
configurable: true,
|
|
get: function() {
|
|
return (GamepadNavTree?.m_context?.m_controller || FocusNavController)?.m_ActiveContext?.ActiveWindow || window;
|
|
}
|
|
});
|
|
""" + await res.text() + "\n}"
|
|
if res.status != 200:
|
|
self.logger.error("Failed to connect to React DevTools at " + ip)
|
|
return False
|
|
self.start_rdt_proxy(ip, 8097)
|
|
self.logger.info("Connected to React DevTools, loading script")
|
|
tab = await get_gamepadui_tab()
|
|
# RDT needs to load before React itself to work.
|
|
await close_old_tabs()
|
|
result = await tab.reload_and_evaluate(script)
|
|
self.logger.info(result)
|
|
|
|
except Exception:
|
|
self.logger.error("Failed to connect to React DevTools")
|
|
self.logger.error(format_exc())
|
|
|
|
async def enable_rdt(self):
|
|
self.context.loop.create_task(self._enable_rdt())
|
|
|
|
async def disable_rdt(self):
|
|
self.logger.info("Disabling React DevTools")
|
|
tab = await get_gamepadui_tab()
|
|
self.rdt_script_id = None
|
|
await close_old_tabs()
|
|
await tab.evaluate_js("location.reload();", False, True, False)
|
|
self.logger.info("React DevTools disabled")
|
|
|
|
async def get_user_info(self) -> Dict[str, str]:
|
|
return {
|
|
"username": get_username(),
|
|
"path": get_home_path()
|
|
}
|
|
|
|
async def get_tab_id(self, name: str):
|
|
return (await get_tab(name)).id
|