plexus.cli.procedure.tactus_adapters package

Plexus adapters for Tactus protocols.

These adapters implement Tactus protocols to integrate the standalone Tactus library with Plexus’s GraphQL-based infrastructure.

class plexus.cli.procedure.tactus_adapters.PlexusChatAdapter(chat_recorder)

Bases: object

Implements Tactus ChatRecorder protocol by wrapping ProcedureChatRecorder.

This is a thin adapter that converts Pydantic ChatMessage models to the kwargs format expected by ProcedureChatRecorder.

Initialize Plexus chat adapter.

Args:

chat_recorder: ProcedureChatRecorder instance

__init__(chat_recorder)

Initialize Plexus chat adapter.

Args:

chat_recorder: ProcedureChatRecorder instance

async end_session(session_id: str, status: str = 'COMPLETED') None

End the chat session.

Args:

session_id: Session ID to end status: Final status (COMPLETED, FAILED, CANCELLED)

async get_session_history(session_id: str) list[ChatMessage]

Get the message history for a session.

Args:

session_id: Session ID

Returns:

List of ChatMessage objects

async record_message(message: ChatMessage) str

Record a message in the chat session.

Args:

message: ChatMessage to record

Returns:

Message ID

async start_session(context: Dict[str, Any] | None = None) str

Start a new chat session.

Note: This signature matches how TactusRuntime actually calls it (runtime.py:214), which only passes context. The Tactus ChatRecorder protocol definition incorrectly specifies procedure_id as the first parameter, but this is not how it’s used.

Args:

context: Optional context data

Returns:

Session ID

class plexus.cli.procedure.tactus_adapters.PlexusHITLAdapter(client, procedure_id: str, chat_recorder=None, storage_adapter=None)

Bases: object

Implements Tactus HITLHandler protocol using Plexus’s exit-and-resume pattern.

When a HITL interaction is requested: 1. Creates a message in the chat session 2. Updates procedure status to WAITING_FOR_HUMAN 3. Raises ProcedureWaitingForHuman to exit runtime 4. On resume, checks for response and returns it

Initialize Plexus HITL adapter.

Args:

client: PlexusDashboardClient instance procedure_id: ID of the procedure chat_recorder: Optional ProcedureChatRecorder for message creation storage_adapter: Optional PlexusStorageAdapter for status updates

__init__(client, procedure_id: str, chat_recorder=None, storage_adapter=None)

Initialize Plexus HITL adapter.

Args:

client: PlexusDashboardClient instance procedure_id: ID of the procedure chat_recorder: Optional ProcedureChatRecorder for message creation storage_adapter: Optional PlexusStorageAdapter for status updates

cancel_pending_request(procedure_id: str, message_id: str) None

Cancel a pending HITL request.

Args:

procedure_id: Procedure ID message_id: Message ID to cancel

check_pending_response(procedure_id: str, message_id: str) HITLResponse | None

Check if there’s a response to a pending HITL request.

Args:

procedure_id: Procedure ID message_id: Message ID to check

Returns:

HITLResponse if response exists, None otherwise

request_interaction(procedure_id: str, request: HITLRequest) HITLResponse

Request human interaction using Plexus’s exit-and-resume pattern.

This creates a HITL message in the chat session, updates the procedure status, and raises ProcedureWaitingForHuman to pause execution.

Args:

procedure_id: Procedure ID request: HITLRequest with interaction details

Returns:

HITLResponse (only reached if response already exists on resume)

Raises:

ProcedureWaitingForHuman: To trigger exit-and-resume

class plexus.cli.procedure.tactus_adapters.PlexusStorageAdapter(client, procedure_id: str)

Bases: object

Implements Tactus StorageBackend protocol using Plexus GraphQL.

Stores all procedure data (checkpoints, state, lua_state) in the Procedure.metadata JSON field via GraphQL mutations.

Initialize Plexus storage adapter.

Args:

client: PlexusDashboardClient instance procedure_id: ID of the procedure

__init__(client, procedure_id: str)

Initialize Plexus storage adapter.

Args:

client: PlexusDashboardClient instance procedure_id: ID of the procedure

checkpoint_clear_after(procedure_id: str, name: str) None

Clear checkpoint and all subsequent ones.

checkpoint_clear_all(procedure_id: str) None

Clear all checkpoints (but preserve state).

checkpoint_exists(procedure_id: str, name: str) bool

Check if checkpoint exists.

checkpoint_get(procedure_id: str, name: str) Any | None

Get checkpoint value.

checkpoint_save(procedure_id: str, name: str, result: Any) None

Save a checkpoint.

get_state(procedure_id: str) Dict[str, Any]

Get mutable state dictionary.

load_procedure_metadata(procedure_id: str) ProcedureMetadata

Load procedure metadata from Plexus via GraphQL.

Args:

procedure_id: Procedure ID to load

Returns:

ProcedureMetadata with checkpoints, state, and lua_state

save_procedure_metadata(metadata: ProcedureMetadata) None

Save procedure metadata to Plexus via GraphQL.

Args:

metadata: ProcedureMetadata to save

set_state(procedure_id: str, state: Dict[str, Any]) None

Set mutable state dictionary.

state_clear(procedure_id: str) None

Clear all state.

state_delete(procedure_id: str, key: str) None

Delete state key.

state_get(procedure_id: str, key: str, default: Any = None) Any

Get state value.

state_set(procedure_id: str, key: str, value: Any) None

Set state value.

update_procedure_status(procedure_id: str, status: str, waiting_on_message_id: str | None = None) None

Update procedure status in Plexus.

Args:

procedure_id: Procedure ID status: New status waiting_on_message_id: Optional message ID if waiting for human

Submodules