plexus.cli.metrics.aggregation module

Core counting and aggregation logic for metrics.

This module provides the shared counting logic used by both the Lambda function and the CLI commands. It queries records in a time window and counts them into hierarchical time buckets (1/5/15/60 minutes).

The logic is designed to be O(n) - each record is processed once and assigned to all relevant bucket sizes simultaneously.

class plexus.cli.metrics.aggregation.BucketCounter(account_id: str, record_type: str)

Bases: object

Efficiently counts records into multiple hierarchical time buckets.

For each record, determines which 1min, 5min, 15min, and 60min buckets it belongs to and increments the appropriate counters.

Initialize bucket counter.

Args:

account_id: Account ID for these metrics record_type: Type of records (‘items’, ‘scoreResults’, ‘tasks’, ‘evaluations’)

__init__(account_id: str, record_type: str)

Initialize bucket counter.

Args:

account_id: Account ID for these metrics record_type: Type of records (‘items’, ‘scoreResults’, ‘tasks’, ‘evaluations’)

add_record(timestamp: datetime) None

Add a record to all relevant buckets.

Args:

timestamp: When the record was created/updated

get_bucket_counts() List[Dict[str, Any]]

Get all bucket counts as a list ready for ORM updates.

Returns:

List of dicts with bucket info and counts

plexus.cli.metrics.aggregation.count_records_efficiently(records: List[Dict[str, Any]], account_id: str, record_type: str, verbose: bool = False, filter_field: str | None = None, filter_value: str | None = None) List[Dict[str, Any]]

Efficiently count records into all relevant buckets.

This is an O(n) operation that processes each record once and assigns it to all relevant bucket sizes (1min, 5min, 15min, 60min).

Args:

records: List of records with ‘createdAt’ or ‘updatedAt’ timestamps account_id: Account ID record_type: Type of records verbose: If True, print detailed counting statistics filter_field: Optional field name to filter by (e.g., ‘type’) filter_value: Optional value to filter for (e.g., ‘prediction’)

Returns:

List of bucket counts ready for ORM updates

plexus.cli.metrics.aggregation.parse_iso_datetime(timestamp_str: str) datetime

Parse ISO 8601 datetime string.

Args:

timestamp_str: ISO datetime string (e.g., “2024-01-01T12:00:00Z”)

Returns:

Datetime object with UTC timezone

plexus.cli.metrics.aggregation.query_records_for_counting(client: PlexusDashboardClient, account_id: str, record_type: str, start_time: datetime, end_time: datetime, verbose: bool = False) List[Dict[str, Any]]

Query records from the API for counting.

Uses the appropriate GraphQL query based on record type and handles pagination. For filtered types (predictionScoreResults, evaluationScoreResults), queries the base scoreResults and returns all records (filtering happens in counting).

Args:

client: The API client account_id: Account ID to filter by record_type: Type of records (‘items’, ‘scoreResults’, ‘predictionScoreResults’,

‘evaluationScoreResults’, ‘tasks’, ‘evaluations’)

start_time: Start of time range end_time: End of time range verbose: If True, print query progress

Returns:

List of record dictionaries with timestamps