plexus.cli.procedure.lua_dsl.primitives package

Lua Primitives - Python implementations of Lua-callable operations.

These primitives provide high-level operations that hide LLM mechanics and enable declarative workflow orchestration.

class plexus.cli.procedure.lua_dsl.primitives.AgentPrimitive(name: str, system_prompt: str, initial_message: str, llm, available_tools: List[Any], tool_primitive, stop_primitive, iterations_primitive, chat_recorder=None)

Bases: object

Executes LLM agent turns with tool calling.

Each agent has: - System prompt (instructions) - Initial message (kickoff) - Conversation history - Available tools - LLM instance

Initialize an agent primitive.

Args:

name: Agent name (e.g., “worker”, “assistant”) system_prompt: System prompt for the agent initial_message: Initial user message to kickoff llm: LangChain LLM instance with tools bound available_tools: List of available tool objects tool_primitive: ToolPrimitive instance for recording calls stop_primitive: StopPrimitive instance for stop detection iterations_primitive: IterationsPrimitive for tracking turns chat_recorder: Optional ProcedureChatRecorder for logging conversations

__init__(name: str, system_prompt: str, initial_message: str, llm, available_tools: List[Any], tool_primitive, stop_primitive, iterations_primitive, chat_recorder=None)

Initialize an agent primitive.

Args:

name: Agent name (e.g., “worker”, “assistant”) system_prompt: System prompt for the agent initial_message: Initial user message to kickoff llm: LangChain LLM instance with tools bound available_tools: List of available tool objects tool_primitive: ToolPrimitive instance for recording calls stop_primitive: StopPrimitive instance for stop detection iterations_primitive: IterationsPrimitive for tracking turns chat_recorder: Optional ProcedureChatRecorder for logging conversations

async flush_recordings()

Flush queued recordings to chat session (called by runtime after workflow).

get_conversation() List[Any]

Get the current conversation history.

reset()

Reset the agent (mainly for testing).

turn(options: Dict[str, Any] | None = None) Dict[str, Any]

Execute one agent turn.

Args:
options: Optional dict with:
  • inject: Additional context message to inject for this turn

Returns:

Dictionary with response information (for Lua access)

Example (Lua):

local response = Worker.turn() Log.info(“Agent said: “ .. response.content)

for i, call in ipairs(response.tool_calls) do

Log.info(“Called tool: “ .. call.name)

end

class plexus.cli.procedure.lua_dsl.primitives.GraphNodePrimitive(client, procedure_id: str, account_id: str)

Bases: object

Manages graph node operations for procedure execution.

Graph nodes provide tree-based data storage for procedures, useful for tree search, hypothesis tracking, etc.

Initialize GraphNode primitive.

Args:

client: PlexusDashboardClient instance procedure_id: Procedure ID for node creation account_id: Account ID for node creation

__init__(client, procedure_id: str, account_id: str)

Initialize GraphNode primitive.

Args:

client: PlexusDashboardClient instance procedure_id: Procedure ID for node creation account_id: Account ID for node creation

create(content: str, metadata: Dict[str, Any] | None = None, parent_node_id: str | None = None)

Create a new graph node.

Args:

content: Text content for the node metadata: Optional metadata dict parent_node_id: Optional parent node ID (None for root)

Returns:

NodeWrapper with methods: children(), parent(), score(), metadata(), set_metadata()

Example (Lua):

local node = GraphNode.create(“My limerick here”, {type = “poem”}) Log.info(“Created node: “ .. node.id)

– Node methods available: local children = node:children() local parent = node:parent() node:set_metadata(“status”, “reviewed”)

current()

Get the current working node.

Returns:

NodeWrapper or None if no current node set

Example (Lua):

local node = GraphNode.current() if node then

Log.info(“Current node: “ .. node.id) node:set_metadata(“visited”, true) – Node methods available

end

root()

Get the root node for this procedure (if exists).

Returns:

NodeWrapper or None if no root node exists

Example (Lua):

local root = GraphNode.root() if root then

Log.info(“Root node exists: “ .. root.id) local children = root:children() – Node methods available

end

set_current(node_id: str) bool

Set the current working node (for internal use).

Args:

node_id: Node ID to set as current

Returns:

True if successful

class plexus.cli.procedure.lua_dsl.primitives.HumanPrimitive(chat_recorder, execution_context, hitl_config: Dict[str, Any] | None = None)

Bases: object

Manages human-in-the-loop operations for procedures.

Enables procedures to: - Request approval from humans - Get input from humans - Send notifications - Escalate issues

All blocking methods support timeouts and defaults.

Initialize Human primitive.

Args:

chat_recorder: ProcedureChatRecorder for recording messages execution_context: ExecutionContext for blocking HITL operations hitl_config: Optional HITL declarations from YAML

__init__(chat_recorder, execution_context, hitl_config: Dict[str, Any] | None = None)

Initialize Human primitive.

Args:

chat_recorder: ProcedureChatRecorder for recording messages execution_context: ExecutionContext for blocking HITL operations hitl_config: Optional HITL declarations from YAML

approve(options: Dict[str, Any] | None = None) bool

Request yes/no approval from human (BLOCKING).

Args:
options: Dict with:
  • message: str - Message to show human

  • context: Dict - Additional context

  • timeout: int - Timeout in seconds (None = no timeout)

  • default: bool - Default if timeout (default: False)

  • config_key: str - Reference to hitl: declaration

Returns:

bool - True if approved, False if rejected/timeout

Example (Lua):
local approved = Human.approve({

message = “Deploy to production?”, context = {environment = “prod”}, timeout = 3600, default = false

})

if approved then

deploy()

end

escalate(options: Dict[str, Any] | None = None) None

Escalate to human (BLOCKING).

Stops workflow execution until human resolves the issue. Unlike approve/input/review, escalate has NO timeout - it blocks indefinitely until a human manually resumes the procedure.

Args:
options: Dict with:
  • message: str - Escalation message

  • context: Dict - Error context

  • severity: str - Severity level (info/warning/error/critical)

  • config_key: str - Reference to hitl: declaration

Returns:

None - Execution resumes when human resolves

Example (Lua):
if attempts > 3 then
Human.escalate({

message = “Cannot resolve automatically”, context = {attempts = attempts, error = last_error}, severity = “error”

}) – Workflow continues here after human resolves

end

async flush_recordings() None

Flush all queued messages to chat recorder.

This is called by the runtime after workflow execution to record all Human.notify() messages to the chat session.

input(options: Dict[str, Any] | None = None) str | None

Request free-form input from human (BLOCKING).

Args:
options: Dict with:
  • message: str - Prompt for human

  • placeholder: str - Input placeholder

  • timeout: int - Timeout in seconds

  • default: str - Default if timeout

  • config_key: str - Reference to hitl: declaration

Returns:

str or None - Human’s input, or None if timeout with no default

Example (Lua):
local topic = Human.input({

message = “What topic?”, placeholder = “Enter topic…”, timeout = 600

})

if topic then

State.set(“topic”, topic)

end

notify(options: Dict[str, Any] | None = None) None

Send notification to human (NON-BLOCKING).

Args:
options: Dict with:
  • message: str - Notification message (required)

  • content: str - Rich markdown content (optional, overrides message in UI)

  • details: List[Dict] - Collapsible sections with title and content

  • level: str - info, warning, error (default: info)

  • context: Dict - Additional context (deprecated, use details instead)

Example (Lua):

– Simple notification Human.notify({

message = “Processing complete”

})

– Rich notification with details Human.notify({

message = “Phase 2 complete”, content = “Status: Phase 2 processing finishednnProcessed 150 items successfully”, details = {

{title = “Statistics”, content = “Success: 148n**Failed**: 2n**Duration**: 5 min”}, {title = “Next Steps”, content = “Starting phase 3 validation”}

}

})

review(options: Dict[str, Any] | None = None) Dict[str, Any] | None

Request human review (BLOCKING).

Args:
options: Dict with:
  • message: str - Review prompt

  • artifact: Any - Thing to review

  • artifact_type: str - Type of artifact

  • options: List[str] - Available actions

  • timeout: int - Timeout in seconds

  • config_key: str - Reference to hitl: declaration

Returns:
Dict with:
  • decision: str - Selected option

  • edited_artifact: Any - Modified artifact (if edited)

  • feedback: str - Human feedback

Example (Lua):
local review = Human.review({

message = “Review this document”, artifact = document, artifact_type = “document”, options = {“approve”, “edit”, “reject”}

})

if review.decision == “approve” then

publish(review.artifact)

end

class plexus.cli.procedure.lua_dsl.primitives.IterationsPrimitive

Bases: object

Tracks iteration count for procedure execution.

Provides safety limits and iteration-based control flow.

Initialize iteration counter.

__init__()

Initialize iteration counter.

current() int

Get the current iteration number.

Returns:

Current iteration count (0-indexed)

Example (Lua):

local iter = Iterations.current() Log.info(“Iteration: “ .. iter)

exceeded(max_iterations: int) bool

Check if current iteration has exceeded the maximum.

Args:

max_iterations: Maximum allowed iterations

Returns:

True if current iteration >= max_iterations

Example (Lua):
if Iterations.exceeded(100) then

return {success = false, reason = “Max iterations exceeded”}

end

increment() int

Increment the iteration counter.

Returns:

New iteration count

Note: This is called internally by the runtime, not from Lua

reset() None

Reset iteration counter (mainly for testing).

class plexus.cli.procedure.lua_dsl.primitives.StatePrimitive

Bases: object

Manages mutable state for procedure execution.

State is preserved across agent turns and can be used to track progress, accumulate results, and coordinate between agents.

Initialize state storage.

__init__()

Initialize state storage.

all() Dict[str, Any]

Get all state as a dictionary.

Returns:

Complete state dictionary

Example (Lua):

local state = State.all() for k, v in pairs(state) do

print(k, v)

end

append(key: str, value: Any) None

Append a value to a list in state.

Args:

key: State key (will be created as list if doesn’t exist) value: Value to append

Example (Lua):

State.append(“nodes_created”, node_id)

clear() None

Clear all state (mainly for testing).

get(key: str, default: Any = None) Any

Get a value from state.

Args:

key: State key to retrieve default: Default value if key not found

Returns:

Stored value or default

Example (Lua):

local count = State.get(“hypothesis_count”, 0)

increment(key: str, amount: float = 1) float

Increment a numeric value in state.

Args:

key: State key to increment amount: Amount to increment by (default 1)

Returns:

New value after increment

Example (Lua):

State.increment(“hypotheses_filed”) State.increment(“score”, 10)

set(key: str, value: Any) None

Set a value in state.

Args:

key: State key to set value: Value to store

Example (Lua):

State.set(“current_phase”, “exploration”)

class plexus.cli.procedure.lua_dsl.primitives.StopPrimitive

Bases: object

Manages procedure termination state.

Tracks when a stop was requested and the reason/success status.

Initialize stop state.

__init__()

Initialize stop state.

reason() str | None

Get the reason for stopping.

Returns:

Stop reason string or None if not stopped

Example (Lua):
if Stop.requested() then

Log.info(“Stopped because: “ .. Stop.reason())

end

request(reason: str, success: bool = True) None

Request a stop (called by tools or runtime).

Args:

reason: Reason for stopping success: Whether this is a successful completion

Note: This is called internally, not from Lua

requested() bool

Check if a stop was requested.

Returns:

True if stop was requested

Example (Lua):
if Stop.requested() then

return {success = true, message = “Procedure stopped”}

end

reset() None

Reset stop state (mainly for testing).

success() bool

Check if the stop was due to successful completion.

Returns:

True if stopped successfully, False if stopped due to error

Example (Lua):
if Stop.requested() and Stop.success() then

return {success = true}

else

return {success = false}

end

class plexus.cli.procedure.lua_dsl.primitives.SystemPrimitive(chat_recorder)

Bases: object

Manages system-level operations and monitoring.

Enables procedures and external systems to send alerts through the unified message infrastructure.

Initialize System primitive.

Args:

chat_recorder: ProcedureChatRecorder for recording alerts

__init__(chat_recorder)

Initialize System primitive.

Args:

chat_recorder: ProcedureChatRecorder for recording alerts

alert(options: Dict[str, Any] | None = None) None

Send system alert (NON-BLOCKING).

Args:
options: Dict with:
  • message: str - Alert message

  • level: str - info, warning, error, critical (default: info)

  • source: str - Alert source identifier

  • context: Dict - Additional context

Example (Lua):
System.alert({

message = “Memory threshold exceeded”, level = “warning”, source = “resource_monitor”, context = {memory_mb = 3500, threshold_mb = 3000}

})

async flush_recordings() None

Flush all queued messages to chat recorder.

This is called by the runtime after workflow execution to record all System.alert() messages to the chat session.

class plexus.cli.procedure.lua_dsl.primitives.ToolPrimitive

Bases: object

Tracks tool calls and provides access to results.

Maintains a history of tool calls and their results, allowing Lua code to check what tools were used and access their outputs.

Initialize tool tracking.

__init__()

Initialize tool tracking.

called(tool_name: str) bool

Check if a tool was called at least once.

Args:

tool_name: Name of the tool to check

Returns:

True if the tool was called

Example (Lua):
if Tool.called(“done”) then

Log.info(“Done tool was called”)

end

get_all_calls() List[ToolCall]

Get all tool calls (for debugging/logging).

Returns:

List of all ToolCall objects

get_call_count(tool_name: str | None = None) int

Get the number of times a tool was called.

Args:

tool_name: Name of tool (or None for total count)

Returns:

Number of calls

last_call(tool_name: str) Dict[str, Any] | None

Get full information about the last call to a tool.

Args:

tool_name: Name of the tool

Returns:

Dictionary with ‘name’, ‘args’, ‘result’ or None if never called

Example (Lua):

local call = Tool.last_call(“search”) if call then

Log.info(“Search was called with: “ .. call.args.query) Log.info(“Result: “ .. call.result)

end

last_result(tool_name: str) Any

Get the last result from a named tool.

Args:

tool_name: Name of the tool

Returns:

Last result from the tool, or None if never called

Example (Lua):

local result = Tool.last_result(“search”) if result then

Log.info(“Search found: “ .. result)

end

record_call(tool_name: str, args: Dict[str, Any], result: Any) None

Record a tool call (called by runtime after tool execution).

Args:

tool_name: Name of the tool args: Arguments passed to the tool result: Result returned by the tool

Note: This is called internally by the runtime, not from Lua

reset() None

Reset tool tracking (mainly for testing).

Submodules