Spaces:
Runtime error
Runtime error
| """AI-written plugin engine with sandboxed execution.""" | |
| from __future__ import annotations | |
| import json | |
| from pathlib import Path | |
| from typing import Dict | |
| from jenaai.core.module import BaseModule | |
| from jenaai.utils.sandbox import static_scan | |
| class Module(BaseModule): | |
| """Load and execute plugins safely.""" | |
| def __init__(self, event_bus, config): | |
| super().__init__(event_bus, config) | |
| self.plugins_path = Path(config.get("plugins_path", "plugins")) | |
| async def start(self) -> None: | |
| await self.event_bus.publish( | |
| "system.log", | |
| {"message": "Plugin engine ready", "module": self.metadata.name}, | |
| ) | |
| async def run_plugin(self, name: str, payload: dict) -> dict: | |
| plugin_file = self.plugins_path / f"{name}.py" | |
| if not plugin_file.exists(): | |
| return {"ok": False, "error": "Plugin not found"} | |
| source = plugin_file.read_text() | |
| scan = static_scan(source) | |
| if not scan.ok: | |
| return {"ok": False, "error": scan.errors} | |
| local_vars: Dict[str, object] = {"payload": payload} | |
| exec(compile(source, plugin_file.name, "exec"), {}, local_vars) | |
| result = local_vars.get("result") | |
| try: | |
| json.dumps(result) | |
| except TypeError: | |
| result = str(result) | |
| return {"ok": True, "result": result} | |