plexus.cli.procedure.lua_dsl.primitives.procedure module

Procedure Primitive - Sub-procedure invocation and orchestration.

Provides: - Procedure.run(procedure_id, params) - Execute sub-procedure synchronously - Procedure.spawn(procedure_id, params) - Launch sub-procedure asynchronously - Procedure.wait(procedure_id) - Wait for sub-procedure completion - Procedure.status(procedure_id) - Check sub-procedure status - Procedure.cancel(procedure_id) - Cancel running sub-procedure

class plexus.cli.procedure.lua_dsl.primitives.procedure.ProcedurePrimitive(client, account_id: str, parent_procedure_id: str, mcp_server, openai_api_key: str | None = None, lua_sandbox=None)

Bases: object

Handles sub-procedure invocation and orchestration.

Enables workflows to: - Execute sub-procedures synchronously or asynchronously - Monitor sub-procedure status - Cancel running sub-procedures - Build hierarchical workflow compositions

Initialize Procedure primitive.

Args:

client: PlexusDashboardClient instance account_id: Account ID for procedure operations parent_procedure_id: ID of the parent procedure mcp_server: MCP server for tool access openai_api_key: Optional OpenAI API key lua_sandbox: Optional LuaSandbox for Lua table construction

__init__(client, account_id: str, parent_procedure_id: str, mcp_server, openai_api_key: str | None = None, lua_sandbox=None)

Initialize Procedure primitive.

Args:

client: PlexusDashboardClient instance account_id: Account ID for procedure operations parent_procedure_id: ID of the parent procedure mcp_server: MCP server for tool access openai_api_key: Optional OpenAI API key lua_sandbox: Optional LuaSandbox for Lua table construction

all_complete(handles: Any) bool

Check if all tasks have completed.

Args:

handles: List of task IDs

Returns:

True if all tasks complete, False if any still running

Example (Lua):

local batch = {} for i = 1, 10 do

table.insert(batch, Procedure.spawn(“item”, {id = i}))

end

while not Procedure.all_complete(batch) do

Log.info(“Processing batch…”) Sleep(5)

end

cancel(task_id: str) bool

Cancel a running sub-procedure.

Args:

task_id: Task ID returned by spawn()

Returns:

True if cancelled successfully, False if already completed

Raises:

RuntimeError: If task not found

Example (Lua):

local task_id = Procedure.spawn(“slow-task”, {})

– Cancel if taking too long Sleep(10) if Procedure.status(task_id) == “RUNNING” then

Procedure.cancel(task_id) Log.info(“Task cancelled”)

end

inject(handle: str, message: str) bool

Inject a message into a running sub-procedure’s queue.

Args:

handle: Task ID from spawn() message: Message to inject

Returns:

True if injected successfully, False otherwise

Raises:

RuntimeError: If task not found

Example (Lua):

local task = Procedure.spawn(“analysis”, {}) Procedure.inject(task, “Focus on security patterns”)

is_complete(handle: str) bool

Check if a task has completed (success, failure, or cancellation).

Args:

handle: Task ID from spawn()

Returns:

True if task is in terminal state, False if still running

Raises:

RuntimeError: If task not found

Example (Lua):

local task = Procedure.spawn(“analysis”, {}) while not Procedure.is_complete(task) do

Log.info(“Still running…”) Sleep(2)

end

run(procedure_id: str, params: Any | None = None) Any

Execute a sub-procedure synchronously and wait for completion.

Args:

procedure_id: ID of the procedure to execute params: Optional parameters to pass to the procedure

Returns:

Result from the sub-procedure

Raises:

RuntimeError: If procedure execution fails

Example (Lua):
local result = Procedure.run(“score-analysis-proc”, {

scorecard = “compliance”, days = 7

}) Log.info(“Sub-procedure complete”, {status = result.status})

spawn(procedure_id: str, params: Any | None = None) str

Launch a sub-procedure asynchronously without waiting.

Args:

procedure_id: ID of the procedure to execute params: Optional parameters to pass to the procedure

Returns:

Task ID or execution identifier for the spawned procedure

Example (Lua):
local task_id = Procedure.spawn(“data-processing”, {

input_file = “/tmp/data.json”

}) Log.info(“Spawned sub-procedure”, {task_id = task_id})

– Continue with other work…

– Later, wait for completion local result = Procedure.wait(task_id)

status(task_id: str) str

Check the status of a spawned sub-procedure.

Args:

task_id: Task ID returned by spawn()

Returns:

Status string: ‘RUNNING’, ‘COMPLETED’, ‘FAILED’, ‘CANCELLED’, ‘PENDING’

Raises:

RuntimeError: If task not found

Example (Lua):

local task_id = Procedure.spawn(“long-running”, {})

while Procedure.status(task_id) == “RUNNING” do

Log.info(“Still running…”) Sleep(5)

end

Log.info(“Task finished”, {status = Procedure.status(task_id)})

wait(task_id: str, timeout: float | None = None) Any

Wait for a spawned sub-procedure to complete.

Args:

task_id: Task ID returned by spawn() timeout: Optional timeout in seconds (None = wait forever)

Returns:

Result from the sub-procedure

Raises:

RuntimeError: If task not found or execution fails TimeoutError: If timeout exceeded

Example (Lua):

local task_id = Procedure.spawn(“analysis”, {data = data})

– Do other work…

local result = Procedure.wait(task_id, 60) – Wait up to 60 seconds Log.info(“Analysis complete”, {result = result})

wait_all(handles: Any, timeout: float | None = None) Any

Wait for all sub-procedures to complete.

Args:

handles: List of task IDs timeout: Optional timeout in seconds

Returns:

Dict/Lua table mapping handle -> result

Example (Lua):
local tasks = {

Procedure.spawn(“process-1”, {}), Procedure.spawn(“process-2”, {})

} local results = Procedure.wait_all(tasks, 120) for handle, result in pairs(results) do

Log.info(“Result”, {handle = handle, success = result.success})

end

wait_any(handles: Any, timeout: float | None = None) tuple

Wait for first sub-procedure to complete.

Args:

handles: List of task IDs (Lua table or Python list) timeout: Optional timeout in seconds

Returns:

Tuple of (completed_handle, result)

Raises:

RuntimeError: If no handles or handle not found TimeoutError: If timeout exceeded

Example (Lua):
local tasks = {

Procedure.spawn(“fast”, {}), Procedure.spawn(“slow”, {})

} local winner, result = Procedure.wait_any(tasks, 30) Log.info(“First complete”, {winner = winner})