plexus.cli.shared.task_progress_tracker module

class plexus.cli.shared.task_progress_tracker.Stage(name: str, order: int, total_items: int | None = None, processed_items: int | None = None, status_message: str | None = None, start_time: float | None = None, end_time: float | None = None, status: str = 'PENDING')

Bases: object

Internal representation of a task stage.

This class tracks the state of a single stage in a task’s execution. Progress bars in the UI are only shown for stages that have total_items set. Typically, only the main processing stages should have total_items set, while setup and finalizing stages should not show progress bars.

__init__(name: str, order: int, total_items: int | None = None, processed_items: int | None = None, status_message: str | None = None, start_time: float | None = None, end_time: float | None = None, status: str = 'PENDING') None
complete()

Complete this stage.

end_time: float | None = None
name: str
order: int
processed_items: int | None = None
start()

Start this stage.

start_time: float | None = None
status: str = 'PENDING'
status_message: str | None = None
total_items: int | None = None
update_processed_count(current_count: int)

Update the processed items count with the current total count.

Args:

current_count: The current total count of processed items

class plexus.cli.shared.task_progress_tracker.StageConfig(order: int, total_items: int | None = None, status_message: str | None = None)

Bases: object

Configuration for a task stage.

Args:

order: The order of this stage in the task sequence total_items: Optional number of items to process in this stage.

Only set this if you want the stage to show a progress bar in the UI. Typically only set for main processing stages, not for setup or finalizing stages.

status_message: Optional status message to display for this stage

__init__(order: int, total_items: int | None = None, status_message: str | None = None) None
order: int
status_message: str | None = None
total_items: int | None = None
class plexus.cli.shared.task_progress_tracker.TaskProgressTracker(stage_configs: Dict[str, StageConfig], task_object: Task | None = None, task_id: str | None = None, total_items: int = 0, target: str | None = None, command: str | None = None, description: str | None = None, dispatch_status: str | None = None, prevent_new_task: bool = True, metadata: Dict | None = None, account_id: str | None = None, client: PlexusDashboardClient | None = None)

Bases: object

Tracks progress of a multi-stage task with optional API integration.

This class manages the progress of a task through multiple stages, with proper UI integration for progress bars and status updates. Key concepts:

  1. Stage Progress Bars: - Only stages with total_items set will show progress bars in the UI - Typically, only main processing stages should show progress - Setup and finalizing stages usually should not have total_items set

  2. Stage Types: - Setup stages: Quick preparation steps, usually no progress bar needed - Processing stages: Main work stages that show progress bars - Finalizing stages: Cleanup/completion steps, usually no progress bar needed

Example usage:

# Configure stages - only Processing shows progress stages = {

“Setup”: StageConfig(order=1, status_message=”Setting up…”), “Processing”: StageConfig(order=2, total_items=100), “Finalizing”: StageConfig(order=3, status_message=”Finishing…”)

}

# Create tracker tracker = TaskProgressTracker(total_items=100, stage_configs=stages)

# Update progress (only affects stages with total_items set) tracker.update(current_items=50)

Initialize progress tracker.

Args:

stage_configs: Dict mapping stage names to their configs. task_object: Optional pre-fetched Task object to track. task_id: Optional API task ID (used if task_object is None). total_items: Total items (can be updated later). target: Target string for new task records command: Command string for new task records description: Description string for task status display dispatch_status: Dispatch status for new task records prevent_new_task: If True, don’t create new task records metadata: Optional metadata to store with the task account_id: Optional account ID to associate with the task client: Optional existing PlexusDashboardClient instance.

__init__(stage_configs: Dict[str, StageConfig], task_object: Task | None = None, task_id: str | None = None, total_items: int = 0, target: str | None = None, command: str | None = None, description: str | None = None, dispatch_status: str | None = None, prevent_new_task: bool = True, metadata: Dict | None = None, account_id: str | None = None, client: PlexusDashboardClient | None = None)

Initialize progress tracker.

Args:

stage_configs: Dict mapping stage names to their configs. task_object: Optional pre-fetched Task object to track. task_id: Optional API task ID (used if task_object is None). total_items: Total items (can be updated later). target: Target string for new task records command: Command string for new task records description: Description string for task status display dispatch_status: Dispatch status for new task records prevent_new_task: If True, don’t create new task records metadata: Optional metadata to store with the task account_id: Optional account ID to associate with the task client: Optional existing PlexusDashboardClient instance.

advance_stage()

Advance to next stage and update API task if we have one.

complete()

Complete tracking and update API task if we have one.

complete_with_message(message: str)

Complete tracking with a custom completion message.

Args:

message: A custom message to display as completion status

classmethod create_for_evaluation(scorecard_name: str, total_items: int, task_id: str | None = None, account_id: str | None = None, client: PlexusDashboardClient | None = None, **kwargs) TaskProgressTracker

Create a TaskProgressTracker configured for evaluation operations.

Args:

scorecard_name: Name of the scorecard being evaluated total_items: Total number of items to evaluate task_id: Optional existing task ID account_id: Account ID for the task client: Optional API client **kwargs: Additional arguments passed to TaskProgressTracker

Returns:

TaskProgressTracker: Configured tracker for evaluation

classmethod create_for_operation_type(operation_type: str, total_items: int = 1, target: str | None = None, command: str | None = None, description: str | None = None, task_id: str | None = None, account_id: str | None = None, metadata: Dict | None = None, client: PlexusDashboardClient | None = None, **kwargs) TaskProgressTracker

Create a TaskProgressTracker configured for a specific operation type.

Args:

operation_type: Type of operation (‘evaluation’, ‘prediction’, etc.) total_items: Total number of items to process target: Target string for the task command: Command string for the task description: Description for the task task_id: Optional existing task ID account_id: Account ID for the task metadata: Optional metadata for the task client: Optional API client **kwargs: Additional arguments passed to TaskProgressTracker

Returns:

TaskProgressTracker: Configured tracker for the operation type

classmethod create_for_prediction(scorecard_identifier: str, score_names: list, total_items: int = 1, task_id: str | None = None, account_id: str | None = None, client: PlexusDashboardClient | None = None, **kwargs) TaskProgressTracker

Create a TaskProgressTracker configured for prediction operations.

Args:

scorecard_identifier: Identifier of the scorecard score_names: List of score names being predicted total_items: Total number of predictions to run task_id: Optional existing task ID account_id: Account ID for the task client: Optional API client **kwargs: Additional arguments passed to TaskProgressTracker

Returns:

TaskProgressTracker: Configured tracker for prediction

property current_stage: Stage | None
property elapsed_time: float

Return elapsed time since task started, regardless of current stage.

property estimated_completion_time: datetime | None
property estimated_time_remaining: float | None
fail(error_message: str)

Mark the task as failed and stop progress tracking.

fail_current_stage(error_message: str, error_details: Dict | None = None, current_items: int | None = None)

Mark the current stage as failed but don’t fail the entire task.

This method only fails the current stage, allowing execution to potentially continue with other stages. To fail the entire task, use the fail() method instead.

Args:

error_message: The error message to display error_details: Optional dictionary of additional error details current_items: The number of items processed when the failure occurred

fail_processing(error_message: str, error_details: Dict | None = None, current_items: int | None = None)

Mark the task as failed with detailed error information.

This is a comprehensive failure handling method that: 1. Fails the current stage with the error message 2. Fails the entire task with the same error 3. Updates the API task if available

Args:

error_message: The error message to display error_details: Optional dictionary of additional error details current_items: The number of items processed when the failure occurred

get_all_stages() Dict[str, Stage]

Get all stages in this tracker.

Returns:

Dict[str, Stage]: Dictionary mapping stage names to Stage objects

get_stage_by_name(stage_name: str) Stage | None

Get a stage by its name.

Args:

stage_name: Name of the stage to retrieve

Returns:

Stage: The stage object if found, None otherwise

property items_per_second: float | None
property progress: int
set_stage_message(stage_name: str, message: str) None

Set the status message for a specific stage.

Args:

stage_name: Name of the stage message: Status message to set

set_total_items(total_items: int)

Set the total items count - this is our single source of truth for the total count.

property task: Task | None

Return the API task instance if one exists.

update(current_items: int, status: str | None = None)

Update progress with current count and optional status message.

Args:

current_items: Current number of items processed. status: Optional status message.