plexus.cli.procedure.lua_dsl.execution_context module
Execution context abstraction for Procedure DSL.
Provides dual execution backend support: - LocalExecutionContext: Database checkpoints with exit-and-resume - LambdaDurableExecutionContext: AWS Lambda Durable Functions (future)
Same procedure code works in both contexts.
- class plexus.cli.procedure.lua_dsl.execution_context.ExecutionContext
Bases:
ABCProtocol for execution backends.
Abstracts over local execution (DB checkpoints) and Lambda Durable Functions (native suspend/resume).
- abstractmethod checkpoint_clear_after(name: str) None
Clear checkpoint and all subsequent ones. Used for testing.
- abstractmethod checkpoint_clear_all() None
Clear all checkpoints. Used for testing.
- abstractmethod checkpoint_exists(name: str) bool
Check if checkpoint exists.
- abstractmethod checkpoint_get(name: str) Any | None
Get cached checkpoint value, or None if not found.
- abstractmethod sleep(seconds: int) None
Sleep without consuming resources.
Local context: Creates checkpoint and exits Lambda context: Uses Lambda wait (zero cost)
- abstractmethod step_run(name: str, fn: Callable[[], Any]) Any
Execute fn with checkpointing. On replay, return stored result.
- Args:
name: Unique checkpoint name fn: Function to execute (must be deterministic)
- Returns:
Result of fn() on first execution, cached result on replay
- abstractmethod wait_for_human(request_type: str, message: str, timeout_seconds: int | None, default_value: Any, options: List[dict] | None, metadata: dict) HumanResponse
Suspend until human responds.
- Args:
request_type: ‘approval’, ‘input’, or ‘review’ message: Message to display to human timeout_seconds: Timeout in seconds, None = wait forever default_value: Value to return on timeout options: For review requests: [{label, type}, …] metadata: Additional context data
- Returns:
HumanResponse with value and timestamp
- Raises:
ProcedureWaitingForHuman: (Local context) Exits to wait for resume
- class plexus.cli.procedure.lua_dsl.execution_context.GraphQLServiceAdapter(client)
Bases:
objectAdapter that provides query() and mutate() interface for PlexusDashboardClient.
ExecutionContext expects a GraphQL service with query() and mutate() methods, but PlexusDashboardClient has a unified execute() method for both operations. This adapter bridges the gap.
- Args:
client: PlexusDashboardClient instance with synchronous execute() method
- __init__(client)
- Args:
client: PlexusDashboardClient instance with synchronous execute() method
- mutate(mutation: str, variables: Dict[str, Any] | None = None) Dict[str, Any]
Execute a GraphQL mutation.
- query(query: str, variables: Dict[str, Any] | None = None) Dict[str, Any]
Execute a GraphQL query.
- class plexus.cli.procedure.lua_dsl.execution_context.HumanResponse(value: Any, responded_at: str)
Bases:
objectResponse from a human interaction.
- __init__(value: Any, responded_at: str) None
- responded_at: str
- value: Any
- class plexus.cli.procedure.lua_dsl.execution_context.LocalExecutionContext(procedure_id: str, session_id: str, graphql_service: Any, chat_recorder: Any)
Bases:
ExecutionContextLocal execution context using database checkpoints.
Checkpoints stored in Procedure.metadata JSON field: {
- “checkpoints”: {
- “step_name”: {
“result”: <value>, “completed_at”: “2024-12-04T10:00:00Z”
}
}, “state”: {…}, “lua_state”: {…}
}
Initialize local execution context.
- Args:
procedure_id: ID of the running procedure session_id: Associated chat session ID graphql_service: Service for GraphQL queries/mutations chat_recorder: ChatRecorder for message operations
- __init__(procedure_id: str, session_id: str, graphql_service: Any, chat_recorder: Any)
Initialize local execution context.
- Args:
procedure_id: ID of the running procedure session_id: Associated chat session ID graphql_service: Service for GraphQL queries/mutations chat_recorder: ChatRecorder for message operations
- checkpoint_clear_after(name: str) None
Clear checkpoint and all subsequent ones.
- checkpoint_clear_all() None
Clear all checkpoints.
- checkpoint_exists(name: str) bool
Check if checkpoint exists.
- checkpoint_get(name: str) Any | None
Get cached checkpoint value.
- sleep(seconds: int) None
Sleep by creating a checkpoint and exiting.
Note: Not fully implemented - would need a scheduled resume mechanism. For now, just checkpoint the sleep and continue.
- step_run(name: str, fn: Callable[[], Any]) Any
Execute with checkpoint replay.
- wait_for_human(request_type: str, message: str, timeout_seconds: int | None, default_value: Any, options: List[dict] | None, metadata: dict) HumanResponse
Wait for human response with exit-and-resume pattern.
Flow: 1. Check if procedure already has waitingOnMessageId (resuming) 2. If yes, check for response to that specific message 3. If response found, return it and continue 4. If no response, check timeout 5. If timed out, return default 6. If still waiting, raise ProcedureWaitingForHuman to exit 7. If no waitingOnMessageId, create new pending message (first time)
- exception plexus.cli.procedure.lua_dsl.execution_context.ProcedureWaitingForHuman(procedure_id: str, pending_message_id: str)
Bases:
ExceptionRaised to exit workflow when waiting for human response.
In LocalExecutionContext, this signals the runtime to: 1. Update Procedure.status to ‘WAITING_FOR_HUMAN’ 2. Save the pending message ID 3. Exit cleanly 4. Wait for resume trigger
- __init__(procedure_id: str, pending_message_id: str)