plexus.cli.procedure.lua_dsl.primitives.procedure module
Procedure Primitive - Sub-procedure invocation and orchestration.
Provides: - Procedure.run(procedure_id, params) - Execute sub-procedure synchronously - Procedure.spawn(procedure_id, params) - Launch sub-procedure asynchronously - Procedure.wait(procedure_id) - Wait for sub-procedure completion - Procedure.status(procedure_id) - Check sub-procedure status - Procedure.cancel(procedure_id) - Cancel running sub-procedure
- class plexus.cli.procedure.lua_dsl.primitives.procedure.ProcedurePrimitive(client, account_id: str, parent_procedure_id: str, mcp_server, openai_api_key: str | None = None, lua_sandbox=None)
Bases:
objectHandles sub-procedure invocation and orchestration.
Enables workflows to: - Execute sub-procedures synchronously or asynchronously - Monitor sub-procedure status - Cancel running sub-procedures - Build hierarchical workflow compositions
Initialize Procedure primitive.
- Args:
client: PlexusDashboardClient instance account_id: Account ID for procedure operations parent_procedure_id: ID of the parent procedure mcp_server: MCP server for tool access openai_api_key: Optional OpenAI API key lua_sandbox: Optional LuaSandbox for Lua table construction
- __init__(client, account_id: str, parent_procedure_id: str, mcp_server, openai_api_key: str | None = None, lua_sandbox=None)
Initialize Procedure primitive.
- Args:
client: PlexusDashboardClient instance account_id: Account ID for procedure operations parent_procedure_id: ID of the parent procedure mcp_server: MCP server for tool access openai_api_key: Optional OpenAI API key lua_sandbox: Optional LuaSandbox for Lua table construction
- all_complete(handles: Any) bool
Check if all tasks have completed.
- Args:
handles: List of task IDs
- Returns:
True if all tasks complete, False if any still running
- Example (Lua):
local batch = {} for i = 1, 10 do
table.insert(batch, Procedure.spawn(“item”, {id = i}))
end
- while not Procedure.all_complete(batch) do
Log.info(“Processing batch…”) Sleep(5)
end
- cancel(task_id: str) bool
Cancel a running sub-procedure.
- Args:
task_id: Task ID returned by spawn()
- Returns:
True if cancelled successfully, False if already completed
- Raises:
RuntimeError: If task not found
- Example (Lua):
local task_id = Procedure.spawn(“slow-task”, {})
– Cancel if taking too long Sleep(10) if Procedure.status(task_id) == “RUNNING” then
Procedure.cancel(task_id) Log.info(“Task cancelled”)
end
- inject(handle: str, message: str) bool
Inject a message into a running sub-procedure’s queue.
- Args:
handle: Task ID from spawn() message: Message to inject
- Returns:
True if injected successfully, False otherwise
- Raises:
RuntimeError: If task not found
- Example (Lua):
local task = Procedure.spawn(“analysis”, {}) Procedure.inject(task, “Focus on security patterns”)
- is_complete(handle: str) bool
Check if a task has completed (success, failure, or cancellation).
- Args:
handle: Task ID from spawn()
- Returns:
True if task is in terminal state, False if still running
- Raises:
RuntimeError: If task not found
- Example (Lua):
local task = Procedure.spawn(“analysis”, {}) while not Procedure.is_complete(task) do
Log.info(“Still running…”) Sleep(2)
end
- run(procedure_id: str, params: Any | None = None) Any
Execute a sub-procedure synchronously and wait for completion.
- Args:
procedure_id: ID of the procedure to execute params: Optional parameters to pass to the procedure
- Returns:
Result from the sub-procedure
- Raises:
RuntimeError: If procedure execution fails
- Example (Lua):
- local result = Procedure.run(“score-analysis-proc”, {
scorecard = “compliance”, days = 7
}) Log.info(“Sub-procedure complete”, {status = result.status})
- spawn(procedure_id: str, params: Any | None = None) str
Launch a sub-procedure asynchronously without waiting.
- Args:
procedure_id: ID of the procedure to execute params: Optional parameters to pass to the procedure
- Returns:
Task ID or execution identifier for the spawned procedure
- Example (Lua):
- local task_id = Procedure.spawn(“data-processing”, {
input_file = “/tmp/data.json”
}) Log.info(“Spawned sub-procedure”, {task_id = task_id})
– Continue with other work…
– Later, wait for completion local result = Procedure.wait(task_id)
- status(task_id: str) str
Check the status of a spawned sub-procedure.
- Args:
task_id: Task ID returned by spawn()
- Returns:
Status string: ‘RUNNING’, ‘COMPLETED’, ‘FAILED’, ‘CANCELLED’, ‘PENDING’
- Raises:
RuntimeError: If task not found
- Example (Lua):
local task_id = Procedure.spawn(“long-running”, {})
- while Procedure.status(task_id) == “RUNNING” do
Log.info(“Still running…”) Sleep(5)
end
Log.info(“Task finished”, {status = Procedure.status(task_id)})
- wait(task_id: str, timeout: float | None = None) Any
Wait for a spawned sub-procedure to complete.
- Args:
task_id: Task ID returned by spawn() timeout: Optional timeout in seconds (None = wait forever)
- Returns:
Result from the sub-procedure
- Raises:
RuntimeError: If task not found or execution fails TimeoutError: If timeout exceeded
- Example (Lua):
local task_id = Procedure.spawn(“analysis”, {data = data})
– Do other work…
local result = Procedure.wait(task_id, 60) – Wait up to 60 seconds Log.info(“Analysis complete”, {result = result})
- wait_all(handles: Any, timeout: float | None = None) Any
Wait for all sub-procedures to complete.
- Args:
handles: List of task IDs timeout: Optional timeout in seconds
- Returns:
Dict/Lua table mapping handle -> result
- Example (Lua):
- local tasks = {
Procedure.spawn(“process-1”, {}), Procedure.spawn(“process-2”, {})
} local results = Procedure.wait_all(tasks, 120) for handle, result in pairs(results) do
Log.info(“Result”, {handle = handle, success = result.success})
end
- wait_any(handles: Any, timeout: float | None = None) tuple
Wait for first sub-procedure to complete.
- Args:
handles: List of task IDs (Lua table or Python list) timeout: Optional timeout in seconds
- Returns:
Tuple of (completed_handle, result)
- Raises:
RuntimeError: If no handles or handle not found TimeoutError: If timeout exceeded
- Example (Lua):
- local tasks = {
Procedure.spawn(“fast”, {}), Procedure.spawn(“slow”, {})
} local winner, result = Procedure.wait_any(tasks, 30) Log.info(“First complete”, {winner = winner})