plexus.cli.shared.task_progress_tracker module
- class plexus.cli.shared.task_progress_tracker.Stage(name: str, order: int, total_items: int | None = None, processed_items: int | None = None, status_message: str | None = None, start_time: float | None = None, end_time: float | None = None, status: str = 'PENDING')
Bases:
objectInternal representation of a task stage.
This class tracks the state of a single stage in a task’s execution. Progress bars in the UI are only shown for stages that have total_items set. Typically, only the main processing stages should have total_items set, while setup and finalizing stages should not show progress bars.
- __init__(name: str, order: int, total_items: int | None = None, processed_items: int | None = None, status_message: str | None = None, start_time: float | None = None, end_time: float | None = None, status: str = 'PENDING') → None
- complete()
Complete this stage.
- end_time: float | None = None
- name: str
- order: int
- processed_items: int | None = None
- start()
Start this stage.
- start_time: float | None = None
- status: str = 'PENDING'
- status_message: str | None = None
- total_items: int | None = None
- update_processed_count(current_count: int)
Update the processed items count with the current total count.
- Args:
current_count: The current total count of processed items
- class plexus.cli.shared.task_progress_tracker.StageConfig(order: int, total_items: int | None = None, status_message: str | None = None)
Bases:
objectConfiguration for a task stage.
- Args:
order: The order of this stage in the task sequence total_items: Optional number of items to process in this stage.
Only set this if you want the stage to show a progress bar in the UI. Typically only set for main processing stages, not for setup or finalizing stages.
status_message: Optional status message to display for this stage
- __init__(order: int, total_items: int | None = None, status_message: str | None = None) → None
- order: int
- status_message: str | None = None
- total_items: int | None = None
- class plexus.cli.shared.task_progress_tracker.TaskProgressTracker(stage_configs: Dict[str, StageConfig], task_object: Task | None = None, task_id: str | None = None, total_items: int = 0, target: str | None = None, command: str | None = None, description: str | None = None, dispatch_status: str | None = None, prevent_new_task: bool = True, metadata: Dict | None = None, account_id: str | None = None, client: PlexusDashboardClient | None = None)
Bases:
objectTracks progress of a multi-stage task with optional API integration.
This class manages the progress of a task through multiple stages, with proper UI integration for progress bars and status updates. Key concepts:
Stage Progress Bars: - Only stages with total_items set will show progress bars in the UI - Typically, only main processing stages should show progress - Setup and finalizing stages usually should not have total_items set
Stage Types: - Setup stages: Quick preparation steps, usually no progress bar needed - Processing stages: Main work stages that show progress bars - Finalizing stages: Cleanup/completion steps, usually no progress bar needed
- Example usage:
# Configure stages - only Processing shows progress stages = {
“Setup”: StageConfig(order=1, status_message=”Setting up…”), “Processing”: StageConfig(order=2, total_items=100), “Finalizing”: StageConfig(order=3, status_message=”Finishing…”)
}
# Create tracker tracker = TaskProgressTracker(total_items=100, stage_configs=stages)
# Update progress (only affects stages with total_items set) tracker.update(current_items=50)
Initialize progress tracker.
- Args:
stage_configs: Dict mapping stage names to their configs. task_object: Optional pre-fetched Task object to track. task_id: Optional API task ID (used if task_object is None). total_items: Total items (can be updated later). target: Target string for new task records command: Command string for new task records description: Description string for task status display dispatch_status: Dispatch status for new task records prevent_new_task: If True, don’t create new task records metadata: Optional metadata to store with the task account_id: Optional account ID to associate with the task client: Optional existing PlexusDashboardClient instance.
- __init__(stage_configs: Dict[str, StageConfig], task_object: Task | None = None, task_id: str | None = None, total_items: int = 0, target: str | None = None, command: str | None = None, description: str | None = None, dispatch_status: str | None = None, prevent_new_task: bool = True, metadata: Dict | None = None, account_id: str | None = None, client: PlexusDashboardClient | None = None)
Initialize progress tracker.
- Args:
stage_configs: Dict mapping stage names to their configs. task_object: Optional pre-fetched Task object to track. task_id: Optional API task ID (used if task_object is None). total_items: Total items (can be updated later). target: Target string for new task records command: Command string for new task records description: Description string for task status display dispatch_status: Dispatch status for new task records prevent_new_task: If True, don’t create new task records metadata: Optional metadata to store with the task account_id: Optional account ID to associate with the task client: Optional existing PlexusDashboardClient instance.
- advance_stage()
Advance to next stage and update API task if we have one.
- complete()
Complete tracking and update API task if we have one.
- complete_with_message(message: str)
Complete tracking with a custom completion message.
- Args:
message: A custom message to display as completion status
- classmethod create_for_evaluation(scorecard_name: str, total_items: int, task_id: str | None = None, account_id: str | None = None, client: PlexusDashboardClient | None = None, **kwargs) → TaskProgressTracker
Create a TaskProgressTracker configured for evaluation operations.
- Args:
scorecard_name: Name of the scorecard being evaluated total_items: Total number of items to evaluate task_id: Optional existing task ID account_id: Account ID for the task client: Optional API client **kwargs: Additional arguments passed to TaskProgressTracker
- Returns:
TaskProgressTracker: Configured tracker for evaluation
- classmethod create_for_operation_type(operation_type: str, total_items: int = 1, target: str | None = None, command: str | None = None, description: str | None = None, task_id: str | None = None, account_id: str | None = None, metadata: Dict | None = None, client: PlexusDashboardClient | None = None, **kwargs) → TaskProgressTracker
Create a TaskProgressTracker configured for a specific operation type.
- Args:
operation_type: Type of operation (‘evaluation’, ‘prediction’, etc.) total_items: Total number of items to process target: Target string for the task command: Command string for the task description: Description for the task task_id: Optional existing task ID account_id: Account ID for the task metadata: Optional metadata for the task client: Optional API client **kwargs: Additional arguments passed to TaskProgressTracker
- Returns:
TaskProgressTracker: Configured tracker for the operation type
- classmethod create_for_prediction(scorecard_identifier: str, score_names: list, total_items: int = 1, task_id: str | None = None, account_id: str | None = None, client: PlexusDashboardClient | None = None, **kwargs) → TaskProgressTracker
Create a TaskProgressTracker configured for prediction operations.
- Args:
scorecard_identifier: Identifier of the scorecard score_names: List of score names being predicted total_items: Total number of predictions to run task_id: Optional existing task ID account_id: Account ID for the task client: Optional API client **kwargs: Additional arguments passed to TaskProgressTracker
- Returns:
TaskProgressTracker: Configured tracker for prediction
- property elapsed_time: float
Return elapsed time since task started, regardless of current stage.
- property estimated_completion_time: datetime | None
- property estimated_time_remaining: float | None
- fail(error_message: str)
Mark the task as failed and stop progress tracking.
- fail_current_stage(error_message: str, error_details: Dict | None = None, current_items: int | None = None)
Mark the current stage as failed but don’t fail the entire task.
This method only fails the current stage, allowing execution to potentially continue with other stages. To fail the entire task, use the fail() method instead.
- Args:
error_message: The error message to display error_details: Optional dictionary of additional error details current_items: The number of items processed when the failure occurred
- fail_processing(error_message: str, error_details: Dict | None = None, current_items: int | None = None)
Mark the task as failed with detailed error information.
This is a comprehensive failure handling method that: 1. Fails the current stage with the error message 2. Fails the entire task with the same error 3. Updates the API task if available
- Args:
error_message: The error message to display error_details: Optional dictionary of additional error details current_items: The number of items processed when the failure occurred
- get_all_stages() → Dict[str, Stage]
Get all stages in this tracker.
- Returns:
Dict[str, Stage]: Dictionary mapping stage names to Stage objects
- get_stage_by_name(stage_name: str) → Stage | None
Get a stage by its name.
- Args:
stage_name: Name of the stage to retrieve
- Returns:
Stage: The stage object if found, None otherwise
- property items_per_second: float | None
- property progress: int
- set_stage_message(stage_name: str, message: str) → None
Set the status message for a specific stage.
- Args:
stage_name: Name of the stage message: Status message to set
- set_total_items(total_items: int)
Set the total items count - this is our single source of truth for the total count.
- update(current_items: int, status: str | None = None)
Update progress with current count and optional status message.
- Args:
current_items: Current number of items processed. status: Optional status message.