plexus.cli.procedure.lua_dsl.execution_context module

Execution context abstraction for Procedure DSL.

Provides dual execution backend support: - LocalExecutionContext: Database checkpoints with exit-and-resume - LambdaDurableExecutionContext: AWS Lambda Durable Functions (future)

Same procedure code works in both contexts.

class plexus.cli.procedure.lua_dsl.execution_context.ExecutionContext

Bases: ABC

Protocol for execution backends.

Abstracts over local execution (DB checkpoints) and Lambda Durable Functions (native suspend/resume).

abstractmethod checkpoint_clear_after(name: str) None

Clear checkpoint and all subsequent ones. Used for testing.

abstractmethod checkpoint_clear_all() None

Clear all checkpoints. Used for testing.

abstractmethod checkpoint_exists(name: str) bool

Check if checkpoint exists.

abstractmethod checkpoint_get(name: str) Any | None

Get cached checkpoint value, or None if not found.

abstractmethod sleep(seconds: int) None

Sleep without consuming resources.

Local context: Creates checkpoint and exits Lambda context: Uses Lambda wait (zero cost)

abstractmethod step_run(name: str, fn: Callable[[], Any]) Any

Execute fn with checkpointing. On replay, return stored result.

Args:

name: Unique checkpoint name fn: Function to execute (must be deterministic)

Returns:

Result of fn() on first execution, cached result on replay

abstractmethod wait_for_human(request_type: str, message: str, timeout_seconds: int | None, default_value: Any, options: List[dict] | None, metadata: dict) HumanResponse

Suspend until human responds.

Args:

request_type: ‘approval’, ‘input’, or ‘review’ message: Message to display to human timeout_seconds: Timeout in seconds, None = wait forever default_value: Value to return on timeout options: For review requests: [{label, type}, …] metadata: Additional context data

Returns:

HumanResponse with value and timestamp

Raises:

ProcedureWaitingForHuman: (Local context) Exits to wait for resume

class plexus.cli.procedure.lua_dsl.execution_context.GraphQLServiceAdapter(client)

Bases: object

Adapter that provides query() and mutate() interface for PlexusDashboardClient.

ExecutionContext expects a GraphQL service with query() and mutate() methods, but PlexusDashboardClient has a unified execute() method for both operations. This adapter bridges the gap.

Args:

client: PlexusDashboardClient instance with synchronous execute() method

__init__(client)
Args:

client: PlexusDashboardClient instance with synchronous execute() method

mutate(mutation: str, variables: Dict[str, Any] | None = None) Dict[str, Any]

Execute a GraphQL mutation.

query(query: str, variables: Dict[str, Any] | None = None) Dict[str, Any]

Execute a GraphQL query.

class plexus.cli.procedure.lua_dsl.execution_context.HumanResponse(value: Any, responded_at: str)

Bases: object

Response from a human interaction.

__init__(value: Any, responded_at: str) None
responded_at: str
value: Any
class plexus.cli.procedure.lua_dsl.execution_context.LocalExecutionContext(procedure_id: str, session_id: str, graphql_service: Any, chat_recorder: Any)

Bases: ExecutionContext

Local execution context using database checkpoints.

Checkpoints stored in Procedure.metadata JSON field: {

“checkpoints”: {
“step_name”: {

“result”: <value>, “completed_at”: “2024-12-04T10:00:00Z”

}

}, “state”: {…}, “lua_state”: {…}

}

Initialize local execution context.

Args:

procedure_id: ID of the running procedure session_id: Associated chat session ID graphql_service: Service for GraphQL queries/mutations chat_recorder: ChatRecorder for message operations

__init__(procedure_id: str, session_id: str, graphql_service: Any, chat_recorder: Any)

Initialize local execution context.

Args:

procedure_id: ID of the running procedure session_id: Associated chat session ID graphql_service: Service for GraphQL queries/mutations chat_recorder: ChatRecorder for message operations

checkpoint_clear_after(name: str) None

Clear checkpoint and all subsequent ones.

checkpoint_clear_all() None

Clear all checkpoints.

checkpoint_exists(name: str) bool

Check if checkpoint exists.

checkpoint_get(name: str) Any | None

Get cached checkpoint value.

sleep(seconds: int) None

Sleep by creating a checkpoint and exiting.

Note: Not fully implemented - would need a scheduled resume mechanism. For now, just checkpoint the sleep and continue.

step_run(name: str, fn: Callable[[], Any]) Any

Execute with checkpoint replay.

wait_for_human(request_type: str, message: str, timeout_seconds: int | None, default_value: Any, options: List[dict] | None, metadata: dict) HumanResponse

Wait for human response with exit-and-resume pattern.

Flow: 1. Check if procedure already has waitingOnMessageId (resuming) 2. If yes, check for response to that specific message 3. If response found, return it and continue 4. If no response, check timeout 5. If timed out, return default 6. If still waiting, raise ProcedureWaitingForHuman to exit 7. If no waitingOnMessageId, create new pending message (first time)

exception plexus.cli.procedure.lua_dsl.execution_context.ProcedureWaitingForHuman(procedure_id: str, pending_message_id: str)

Bases: Exception

Raised to exit workflow when waiting for human response.

In LocalExecutionContext, this signals the runtime to: 1. Update Procedure.status to ‘WAITING_FOR_HUMAN’ 2. Save the pending message ID 3. Exit cleanly 4. Wait for resume trigger

__init__(procedure_id: str, pending_message_id: str)