plexus.cli.procedure.tactus_adapters package

Plexus adapters for Tactus protocols.

These adapters implement Tactus protocols to integrate the standalone Tactus library with Plexus’s GraphQL-based infrastructure.

class plexus.cli.procedure.tactus_adapters.PlexusChatAdapter(chat_recorder)

Bases: object

Implements Tactus ChatRecorder protocol by wrapping ProcedureChatRecorder.

This is a thin adapter that converts Pydantic ChatMessage models to the kwargs format expected by ProcedureChatRecorder.

Initialize Plexus chat adapter.

Args:

chat_recorder: ProcedureChatRecorder instance

__init__(chat_recorder)

Initialize Plexus chat adapter.

Args:

chat_recorder: ProcedureChatRecorder instance

async end_session(session_id: str, status: str = 'COMPLETED') None

End the chat session.

Args:

session_id: Session ID to end status: Final status (COMPLETED, FAILED, CANCELLED)

async get_session_history(session_id: str) list[ChatMessage]

Get the message history for a session.

Args:

session_id: Session ID

Returns:

List of ChatMessage objects

async record_message(message: ChatMessage) str

Record a message in the chat session.

Args:

message: ChatMessage to record

Returns:

Message ID

async start_session(context: Dict[str, Any] | None = None) str

Start a new chat session.

Note: This signature matches how TactusRuntime actually calls it (runtime.py:214), which only passes context. The Tactus ChatRecorder protocol definition incorrectly specifies procedure_id as the first parameter, but this is not how it’s used.

Args:

context: Optional context data

Returns:

Session ID

class plexus.cli.procedure.tactus_adapters.PlexusHITLAdapter(client, procedure_id: str, chat_recorder=None, storage_adapter=None)

Bases: object

Tactus HITLHandler implementation backed by Plexus ChatMessage records.

__init__(client, procedure_id: str, chat_recorder=None, storage_adapter=None)
cancel_pending_request(procedure_id: str, message_id: str) None
check_pending_response(procedure_id: str, message_id: str) HITLResponse | None
request_interaction(procedure_id: str, request: HITLRequest, execution_context: Any = None) HITLResponse
class plexus.cli.procedure.tactus_adapters.PlexusStorageAdapter(client, procedure_id: str)

Bases: object

Implements Tactus StorageBackend protocol using Plexus GraphQL.

Stores all procedure data (checkpoints, state, lua_state) in the Procedure.metadata JSON field via GraphQL mutations.

Initialize Plexus storage adapter.

Args:

client: PlexusDashboardClient instance procedure_id: ID of the procedure

__init__(client, procedure_id: str)

Initialize Plexus storage adapter.

Args:

client: PlexusDashboardClient instance procedure_id: ID of the procedure

checkpoint_clear_after(procedure_id: str, name: str) None

Clear checkpoint and all subsequent ones.

checkpoint_clear_all(procedure_id: str) None

Clear all checkpoints (but preserve state).

checkpoint_exists(procedure_id: str, name: str) bool

Check if checkpoint exists.

checkpoint_get(procedure_id: str, name: str) Any | None

Get checkpoint value.

checkpoint_save(procedure_id: str, name: str, result: Any) None

Save a checkpoint.

get_state(procedure_id: str) Dict[str, Any]

Get mutable state dictionary.

load_procedure_metadata(procedure_id: str) ProcedureMetadata

Load procedure metadata from Plexus via GraphQL.

Args:

procedure_id: Procedure ID to load

Returns:

ProcedureMetadata with checkpoints, state, and lua_state

save_procedure_metadata(procedure_id: str, metadata: ProcedureMetadata) None

Save procedure metadata to Plexus via GraphQL.

Args:

procedure_id: Procedure ID (for API compatibility) metadata: ProcedureMetadata to save

set_state(procedure_id: str, state: Dict[str, Any]) None

Set mutable state dictionary.

state_clear(procedure_id: str) None

Clear all state.

state_delete(procedure_id: str, key: str) None

Delete state key.

state_get(procedure_id: str, key: str, default: Any = None) Any

Get state value.

state_set(procedure_id: str, key: str, value: Any) None

Set state value.

update_procedure_status(procedure_id: str, status: str, waiting_on_message_id: str | None = None) None

Update procedure status in Plexus.

Args:

procedure_id: Procedure ID status: New status waiting_on_message_id: Optional message ID if waiting for human

class plexus.cli.procedure.tactus_adapters.PlexusTraceSink(chat_recorder)

Bases: object

Persist Tactus trace records into Plexus ChatSession/ChatMessage models.

STREAM_UPDATE_MAX_INTERVAL_SECONDS = 1.2
STREAM_UPDATE_MIN_CHARS_DELTA = 48
__init__(chat_recorder)
async end_session(status: str = 'COMPLETED') None
async flush() None
mark_runtime_execute_started(value: str | None = None) None
async record(event: Any) str | None
async start_session(context: Dict[str, Any] | None = None) str | None
class plexus.cli.procedure.tactus_adapters.TerminalHITLAdapter(auto_approve: bool = False)

Bases: object

Tactus HITLHandler that resolves HITL requests via terminal prompts.

Used when running procedures from the CLI so that Human.approve() and Human.input() calls block at the terminal rather than suspending the procedure and waiting for a dashboard response.

Args:

auto_approve: If True, automatically approve all requests without prompting.

__init__(auto_approve: bool = False)
Args:

auto_approve: If True, automatically approve all requests without prompting.

request_interaction(procedure_id: str, request: HITLRequest, execution_context: Any = None) HITLResponse

Submodules