plexus.cli.procedure.lua_dsl.primitives package
Lua Primitives - Python implementations of Lua-callable operations.
These primitives provide high-level operations that hide LLM mechanics and enable declarative workflow orchestration.
- class plexus.cli.procedure.lua_dsl.primitives.AgentPrimitive(name: str, system_prompt: str, initial_message: str, llm, available_tools: List[Any], tool_primitive, stop_primitive, iterations_primitive, chat_recorder=None)
Bases:
objectExecutes LLM agent turns with tool calling.
Each agent has: - System prompt (instructions) - Initial message (kickoff) - Conversation history - Available tools - LLM instance
Initialize an agent primitive.
- Args:
name: Agent name (e.g., “worker”, “assistant”) system_prompt: System prompt for the agent initial_message: Initial user message to kickoff llm: LangChain LLM instance with tools bound available_tools: List of available tool objects tool_primitive: ToolPrimitive instance for recording calls stop_primitive: StopPrimitive instance for stop detection iterations_primitive: IterationsPrimitive for tracking turns chat_recorder: Optional ProcedureChatRecorder for logging conversations
- __init__(name: str, system_prompt: str, initial_message: str, llm, available_tools: List[Any], tool_primitive, stop_primitive, iterations_primitive, chat_recorder=None)
Initialize an agent primitive.
- Args:
name: Agent name (e.g., “worker”, “assistant”) system_prompt: System prompt for the agent initial_message: Initial user message to kickoff llm: LangChain LLM instance with tools bound available_tools: List of available tool objects tool_primitive: ToolPrimitive instance for recording calls stop_primitive: StopPrimitive instance for stop detection iterations_primitive: IterationsPrimitive for tracking turns chat_recorder: Optional ProcedureChatRecorder for logging conversations
- async flush_recordings()
Flush queued recordings to chat session (called by runtime after workflow).
- get_conversation() List[Any]
Get the current conversation history.
- reset()
Reset the agent (mainly for testing).
- turn(options: Dict[str, Any] | None = None) Dict[str, Any]
Execute one agent turn.
- Args:
- options: Optional dict with:
inject: Additional context message to inject for this turn
- Returns:
Dictionary with response information (for Lua access)
- Example (Lua):
local response = Worker.turn() Log.info(“Agent said: “ .. response.content)
- for i, call in ipairs(response.tool_calls) do
Log.info(“Called tool: “ .. call.name)
end
- class plexus.cli.procedure.lua_dsl.primitives.GraphNodePrimitive(client, procedure_id: str, account_id: str)
Bases:
objectManages graph node operations for procedure execution.
Graph nodes provide tree-based data storage for procedures, useful for tree search, hypothesis tracking, etc.
Initialize GraphNode primitive.
- Args:
client: PlexusDashboardClient instance procedure_id: Procedure ID for node creation account_id: Account ID for node creation
- __init__(client, procedure_id: str, account_id: str)
Initialize GraphNode primitive.
- Args:
client: PlexusDashboardClient instance procedure_id: Procedure ID for node creation account_id: Account ID for node creation
- create(content: str, metadata: Dict[str, Any] | None = None, parent_node_id: str | None = None)
Create a new graph node.
- Args:
content: Text content for the node metadata: Optional metadata dict parent_node_id: Optional parent node ID (None for root)
- Returns:
NodeWrapper with methods: children(), parent(), score(), metadata(), set_metadata()
- Example (Lua):
local node = GraphNode.create(“My limerick here”, {type = “poem”}) Log.info(“Created node: “ .. node.id)
– Node methods available: local children = node:children() local parent = node:parent() node:set_metadata(“status”, “reviewed”)
- current()
Get the current working node.
- Returns:
NodeWrapper or None if no current node set
- Example (Lua):
local node = GraphNode.current() if node then
Log.info(“Current node: “ .. node.id) node:set_metadata(“visited”, true) – Node methods available
end
- root()
Get the root node for this procedure (if exists).
- Returns:
NodeWrapper or None if no root node exists
- Example (Lua):
local root = GraphNode.root() if root then
Log.info(“Root node exists: “ .. root.id) local children = root:children() – Node methods available
end
- set_current(node_id: str) bool
Set the current working node (for internal use).
- Args:
node_id: Node ID to set as current
- Returns:
True if successful
- class plexus.cli.procedure.lua_dsl.primitives.HumanPrimitive(chat_recorder, execution_context, hitl_config: Dict[str, Any] | None = None)
Bases:
objectManages human-in-the-loop operations for procedures.
Enables procedures to: - Request approval from humans - Get input from humans - Send notifications - Escalate issues
All blocking methods support timeouts and defaults.
Initialize Human primitive.
- Args:
chat_recorder: ProcedureChatRecorder for recording messages execution_context: ExecutionContext for blocking HITL operations hitl_config: Optional HITL declarations from YAML
- __init__(chat_recorder, execution_context, hitl_config: Dict[str, Any] | None = None)
Initialize Human primitive.
- Args:
chat_recorder: ProcedureChatRecorder for recording messages execution_context: ExecutionContext for blocking HITL operations hitl_config: Optional HITL declarations from YAML
- approve(options: Dict[str, Any] | None = None) bool
Request yes/no approval from human (BLOCKING).
- Args:
- options: Dict with:
message: str - Message to show human
context: Dict - Additional context
timeout: int - Timeout in seconds (None = no timeout)
default: bool - Default if timeout (default: False)
config_key: str - Reference to hitl: declaration
- Returns:
bool - True if approved, False if rejected/timeout
- Example (Lua):
- local approved = Human.approve({
message = “Deploy to production?”, context = {environment = “prod”}, timeout = 3600, default = false
})
- if approved then
deploy()
end
- escalate(options: Dict[str, Any] | None = None) None
Escalate to human (BLOCKING).
Stops workflow execution until human resolves the issue. Unlike approve/input/review, escalate has NO timeout - it blocks indefinitely until a human manually resumes the procedure.
- Args:
- options: Dict with:
message: str - Escalation message
context: Dict - Error context
severity: str - Severity level (info/warning/error/critical)
config_key: str - Reference to hitl: declaration
- Returns:
None - Execution resumes when human resolves
- Example (Lua):
- if attempts > 3 then
- Human.escalate({
message = “Cannot resolve automatically”, context = {attempts = attempts, error = last_error}, severity = “error”
}) – Workflow continues here after human resolves
end
- async flush_recordings() None
Flush all queued messages to chat recorder.
This is called by the runtime after workflow execution to record all Human.notify() messages to the chat session.
- input(options: Dict[str, Any] | None = None) str | None
Request free-form input from human (BLOCKING).
- Args:
- options: Dict with:
message: str - Prompt for human
placeholder: str - Input placeholder
timeout: int - Timeout in seconds
default: str - Default if timeout
config_key: str - Reference to hitl: declaration
- Returns:
str or None - Human’s input, or None if timeout with no default
- Example (Lua):
- local topic = Human.input({
message = “What topic?”, placeholder = “Enter topic…”, timeout = 600
})
- if topic then
State.set(“topic”, topic)
end
- notify(options: Dict[str, Any] | None = None) None
Send notification to human (NON-BLOCKING).
- Args:
- options: Dict with:
message: str - Notification message (required)
content: str - Rich markdown content (optional, overrides message in UI)
details: List[Dict] - Collapsible sections with title and content
level: str - info, warning, error (default: info)
context: Dict - Additional context (deprecated, use details instead)
- Example (Lua):
– Simple notification Human.notify({
message = “Processing complete”
})
– Rich notification with details Human.notify({
message = “Phase 2 complete”, content = “Status: Phase 2 processing finishednnProcessed 150 items successfully”, details = {
{title = “Statistics”, content = “Success: 148n**Failed**: 2n**Duration**: 5 min”}, {title = “Next Steps”, content = “Starting phase 3 validation”}
}
})
- review(options: Dict[str, Any] | None = None) Dict[str, Any] | None
Request human review (BLOCKING).
- Args:
- options: Dict with:
message: str - Review prompt
artifact: Any - Thing to review
artifact_type: str - Type of artifact
options: List[str] - Available actions
timeout: int - Timeout in seconds
config_key: str - Reference to hitl: declaration
- Returns:
- Dict with:
decision: str - Selected option
edited_artifact: Any - Modified artifact (if edited)
feedback: str - Human feedback
- Example (Lua):
- local review = Human.review({
message = “Review this document”, artifact = document, artifact_type = “document”, options = {“approve”, “edit”, “reject”}
})
- if review.decision == “approve” then
publish(review.artifact)
end
- class plexus.cli.procedure.lua_dsl.primitives.IterationsPrimitive
Bases:
objectTracks iteration count for procedure execution.
Provides safety limits and iteration-based control flow.
Initialize iteration counter.
- __init__()
Initialize iteration counter.
- current() int
Get the current iteration number.
- Returns:
Current iteration count (0-indexed)
- Example (Lua):
local iter = Iterations.current() Log.info(“Iteration: “ .. iter)
- exceeded(max_iterations: int) bool
Check if current iteration has exceeded the maximum.
- Args:
max_iterations: Maximum allowed iterations
- Returns:
True if current iteration >= max_iterations
- Example (Lua):
- if Iterations.exceeded(100) then
return {success = false, reason = “Max iterations exceeded”}
end
- increment() int
Increment the iteration counter.
- Returns:
New iteration count
Note: This is called internally by the runtime, not from Lua
- reset() None
Reset iteration counter (mainly for testing).
- class plexus.cli.procedure.lua_dsl.primitives.StatePrimitive
Bases:
objectManages mutable state for procedure execution.
State is preserved across agent turns and can be used to track progress, accumulate results, and coordinate between agents.
Initialize state storage.
- __init__()
Initialize state storage.
- all() Dict[str, Any]
Get all state as a dictionary.
- Returns:
Complete state dictionary
- Example (Lua):
local state = State.all() for k, v in pairs(state) do
print(k, v)
end
- append(key: str, value: Any) None
Append a value to a list in state.
- Args:
key: State key (will be created as list if doesn’t exist) value: Value to append
- Example (Lua):
State.append(“nodes_created”, node_id)
- clear() None
Clear all state (mainly for testing).
- get(key: str, default: Any = None) Any
Get a value from state.
- Args:
key: State key to retrieve default: Default value if key not found
- Returns:
Stored value or default
- Example (Lua):
local count = State.get(“hypothesis_count”, 0)
- increment(key: str, amount: float = 1) float
Increment a numeric value in state.
- Args:
key: State key to increment amount: Amount to increment by (default 1)
- Returns:
New value after increment
- Example (Lua):
State.increment(“hypotheses_filed”) State.increment(“score”, 10)
- set(key: str, value: Any) None
Set a value in state.
- Args:
key: State key to set value: Value to store
- Example (Lua):
State.set(“current_phase”, “exploration”)
- class plexus.cli.procedure.lua_dsl.primitives.StopPrimitive
Bases:
objectManages procedure termination state.
Tracks when a stop was requested and the reason/success status.
Initialize stop state.
- __init__()
Initialize stop state.
- reason() str | None
Get the reason for stopping.
- Returns:
Stop reason string or None if not stopped
- Example (Lua):
- if Stop.requested() then
Log.info(“Stopped because: “ .. Stop.reason())
end
- request(reason: str, success: bool = True) None
Request a stop (called by tools or runtime).
- Args:
reason: Reason for stopping success: Whether this is a successful completion
Note: This is called internally, not from Lua
- requested() bool
Check if a stop was requested.
- Returns:
True if stop was requested
- Example (Lua):
- if Stop.requested() then
return {success = true, message = “Procedure stopped”}
end
- reset() None
Reset stop state (mainly for testing).
- success() bool
Check if the stop was due to successful completion.
- Returns:
True if stopped successfully, False if stopped due to error
- Example (Lua):
- if Stop.requested() and Stop.success() then
return {success = true}
- else
return {success = false}
end
- class plexus.cli.procedure.lua_dsl.primitives.SystemPrimitive(chat_recorder)
Bases:
objectManages system-level operations and monitoring.
Enables procedures and external systems to send alerts through the unified message infrastructure.
Initialize System primitive.
- Args:
chat_recorder: ProcedureChatRecorder for recording alerts
- __init__(chat_recorder)
Initialize System primitive.
- Args:
chat_recorder: ProcedureChatRecorder for recording alerts
- alert(options: Dict[str, Any] | None = None) None
Send system alert (NON-BLOCKING).
- Args:
- options: Dict with:
message: str - Alert message
level: str - info, warning, error, critical (default: info)
source: str - Alert source identifier
context: Dict - Additional context
- Example (Lua):
- System.alert({
message = “Memory threshold exceeded”, level = “warning”, source = “resource_monitor”, context = {memory_mb = 3500, threshold_mb = 3000}
})
- async flush_recordings() None
Flush all queued messages to chat recorder.
This is called by the runtime after workflow execution to record all System.alert() messages to the chat session.
- class plexus.cli.procedure.lua_dsl.primitives.ToolPrimitive
Bases:
objectTracks tool calls and provides access to results.
Maintains a history of tool calls and their results, allowing Lua code to check what tools were used and access their outputs.
Initialize tool tracking.
- __init__()
Initialize tool tracking.
- called(tool_name: str) bool
Check if a tool was called at least once.
- Args:
tool_name: Name of the tool to check
- Returns:
True if the tool was called
- Example (Lua):
- if Tool.called(“done”) then
Log.info(“Done tool was called”)
end
- get_all_calls() List[ToolCall]
Get all tool calls (for debugging/logging).
- Returns:
List of all ToolCall objects
- get_call_count(tool_name: str | None = None) int
Get the number of times a tool was called.
- Args:
tool_name: Name of tool (or None for total count)
- Returns:
Number of calls
- last_call(tool_name: str) Dict[str, Any] | None
Get full information about the last call to a tool.
- Args:
tool_name: Name of the tool
- Returns:
Dictionary with ‘name’, ‘args’, ‘result’ or None if never called
- Example (Lua):
local call = Tool.last_call(“search”) if call then
Log.info(“Search was called with: “ .. call.args.query) Log.info(“Result: “ .. call.result)
end
- last_result(tool_name: str) Any
Get the last result from a named tool.
- Args:
tool_name: Name of the tool
- Returns:
Last result from the tool, or None if never called
- Example (Lua):
local result = Tool.last_result(“search”) if result then
Log.info(“Search found: “ .. result)
end
- record_call(tool_name: str, args: Dict[str, Any], result: Any) None
Record a tool call (called by runtime after tool execution).
- Args:
tool_name: Name of the tool args: Arguments passed to the tool result: Result returned by the tool
Note: This is called internally by the runtime, not from Lua
- reset() None
Reset tool tracking (mainly for testing).
Submodules
- plexus.cli.procedure.lua_dsl.primitives.agent module
- plexus.cli.procedure.lua_dsl.primitives.control module
- plexus.cli.procedure.lua_dsl.primitives.file module
- plexus.cli.procedure.lua_dsl.primitives.graph module
- plexus.cli.procedure.lua_dsl.primitives.human module
- plexus.cli.procedure.lua_dsl.primitives.json module
- plexus.cli.procedure.lua_dsl.primitives.log module
- plexus.cli.procedure.lua_dsl.primitives.procedure module
ProcedurePrimitiveProcedurePrimitive.__init__()ProcedurePrimitive.all_complete()ProcedurePrimitive.cancel()ProcedurePrimitive.inject()ProcedurePrimitive.is_complete()ProcedurePrimitive.run()ProcedurePrimitive.spawn()ProcedurePrimitive.status()ProcedurePrimitive.wait()ProcedurePrimitive.wait_all()ProcedurePrimitive.wait_any()
- plexus.cli.procedure.lua_dsl.primitives.retry module
- plexus.cli.procedure.lua_dsl.primitives.session module
- plexus.cli.procedure.lua_dsl.primitives.stage module
- plexus.cli.procedure.lua_dsl.primitives.state module
- plexus.cli.procedure.lua_dsl.primitives.step module
- plexus.cli.procedure.lua_dsl.primitives.system module
- plexus.cli.procedure.lua_dsl.primitives.tool module