plexus.scores.LangGraphScore module

exception plexus.scores.LangGraphScore.BatchProcessingPause(thread_id, state, batch_job_id=None, message=None)

Bases: Exception

Exception raised when execution should pause for batch processing.

__init__(thread_id, state, batch_job_id=None, message=None)
class plexus.scores.LangGraphScore.LangGraphScore(**parameters)

Bases: Score, LangChainUser

A Score implementation that uses LangGraph for orchestrating LLM-based classification.

LangGraphScore enables complex classification logic using a graph of LLM operations. It provides: - Declarative graph definition in YAML - State management and checkpointing - Cost tracking and optimization - Batch processing support - Integration with multiple LLM providers

The graph is defined in the scorecard YAML: ```yaml scores:

ComplexScore:

class: LangGraphScore model_provider: AzureChatOpenAI model_name: gpt-4 graph:

  • name: extract_context type: prompt template: Extract relevant context… output: context

  • name: classify type: prompt template: Based on the context… input: context output: classification

  • name: validate type: condition input: classification conditions:

    • value: Yes goto: explain_yes

    • value: No goto: explain_no

```

Common usage patterns: 1. Basic classification:

score = LangGraphScore(**config) result = await score.predict(context, Score.Input(

text=”content to classify”

))

  1. Batch processing:
    async for result in score.batch_predict(texts):

    # Process each result

  2. Cost optimization:

    score.reset_token_usage() result = await score.predict(…) costs = score.get_accumulated_costs()

Key features: - Graph visualization for debugging - Token usage tracking - State persistence - Error handling and retries - Batch processing optimization

LangGraphScore is commonly used with: - Complex classification tasks requiring multiple steps - Tasks needing explanation or reasoning - High-volume processing requiring cost optimization

Initialize the LangGraphScore.

This method sets up the score parameters and initializes basic attributes. The language model initialization is deferred to the async setup.

Parameters:

parameters – Configuration parameters for the score and language model.

class GraphState(*, text: str, metadata: dict | None = None, results: dict | None = None, messages: ~typing.List[~typing.Dict[str, ~typing.Any]] | None = None, is_not_empty: bool | None = None, value: str | None = None, explanation: str | None = None, reasoning: str | None = None, chat_history: ~typing.List[~typing.Any] = <factory>, completion: str | None = None, classification: str | None = None, confidence: float | None = None, retry_count: int | None = 0, at_llm_breakpoint: bool | None = False, good_call: str | None = None, good_call_explanation: str | None = None, non_qualifying_reason: str | None = None, non_qualifying_explanation: str | None = None, **extra_data: ~typing.Any)

Bases: BaseModel

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

at_llm_breakpoint: bool | None
chat_history: List[Any]
classification: str | None
completion: str | None
confidence: float | None
explanation: str | None
good_call: str | None
good_call_explanation: str | None
is_not_empty: bool | None
messages: List[Dict[str, Any]] | None
metadata: dict | None
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'allow', 'validate_default': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

non_qualifying_explanation: str | None
non_qualifying_reason: str | None
reasoning: str | None
results: dict | None
retry_count: int | None
text: str
value: str | None
class Parameters(*, scorecard_name: str | None = None, name: str | None = None, id: str | int | None = None, key: str | None = None, dependencies: List[dict] | None = None, data: dict | None = None, number_of_classes: int | None = None, label_score_name: str | None = None, label_field: str | None = None, model_provider: Literal['ChatOpenAI', 'AzureChatOpenAI', 'BedrockChat', 'ChatVertexAI'] = 'AzureChatOpenAI', model_name: str | None = None, model_region: str | None = None, temperature: float | None = 0, max_tokens: int | None = 500, graph: list[dict] | None = None, input: dict | None = None, output: dict | None = None, depends_on: List[str] | Dict[str, str | Dict[str, Any]] | None = None, single_line_messages: bool = False, checkpoint_db_path: str | None = './.plexus/checkpoints/langgraph.db', thread_id: str | None = None, postgres_url: str | None = None)

Bases: Parameters

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

checkpoint_db_path: str | None
depends_on: List[str] | Dict[str, str | Dict[str, Any]] | None
graph: list[dict] | None
input: dict | None
max_tokens: int | None
model_config: ClassVar[ConfigDict] = {'protected_namespaces': ()}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_name: str | None
model_provider: Literal['ChatOpenAI', 'AzureChatOpenAI', 'BedrockChat', 'ChatVertexAI']
model_region: str | None
output: dict | None
postgres_url: str | None
single_line_messages: bool
temperature: float | None
thread_id: str | None
class Result(*, parameters: Parameters, value: str | bool, metadata: dict = {}, error: str | None = None, explanation: str)

Bases: Result

Model output containing the validation result.

Parameters:

explanation – Detailed explanation of the validation result.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

explanation: str
model_config: ClassVar[ConfigDict] = {'protected_namespaces': ()}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

__init__(**parameters)

Initialize the LangGraphScore.

This method sets up the score parameters and initializes basic attributes. The language model initialization is deferred to the async setup.

Parameters:

parameters – Configuration parameters for the score and language model.

static add_edges(workflow, node_instances, entry_point, graph_config)

Add edges between nodes in the workflow.

async async_setup()

Asynchronous setup for LangGraphScore.

async build_compiled_workflow()

Build the LangGraph workflow with optional persistence.

async cleanup()

Cleanup resources.

async classmethod create(**parameters)
create_combined_graphstate_class(instances: list) Type[GraphState]

Dynamically create a combined GraphState class based on all nodes in the workflow.

static create_value_setter_node(output_mapping: dict) LambdaType
generate_graph_visualization(output_path: str = './tmp/workflow_graph.png')

Generate and save a visual representation of the workflow graph using LangChain’s built-in visualization capabilities.

Parameters:

output_path – The file path where the graph image will be saved.

static generate_input_aliasing_function(input_mapping: dict) LambdaType
static generate_output_aliasing_function(output_mapping: dict) LambdaType
get_accumulated_costs()

Calculate and return the accumulated costs for all computed elements.

This method computes the costs based on the token usage and the specific model being used. It includes input costs, output costs, and total costs.

Returns:

A dictionary containing cost and usage information.

get_example_refinement_templates()

Get the example refinement templates for the score by iterating over the graph nodes and asking each node for its example refinement template.

get_prompt_templates()

Get the prompt templates for the score by iterating over the graph nodes and asking each node for its prompt templates.

async get_scoring_jobs_for_batch(batch_job_id: str) List[Dict[str, Any]]

Get all scoring jobs associated with a batch job.

get_token_usage()

Retrieve the current token usage statistics.

This method returns a dictionary containing the number of prompt tokens, completion tokens, total tokens, and successful requests made to the language model.

Returns:

A dictionary with token usage statistics.

predict(model_input: Input, thread_id: str | None = None, batch_data: Dict[str, Any] | None = None, **kwargs) Result

Make predictions using the LangGraph workflow.

Parameters

model_inputScore.Input

The input data containing text and metadata

thread_idOptional[str]

Thread ID for checkpointing

batch_dataOptional[Dict[str, Any]]

Additional data for batch processing

**kwargsAny

Additional keyword arguments

Returns

Score.Result

The prediction result with value and explanation

predict_validation()

Placeholder method to satisfy the base class requirement. This validator doesn’t require traditional training.

preprocess_text(text)
static process_node(node_data)

Process a single node to build its workflow.

register_model()

Register the model.

reset_token_usage()

Reset the token usage counters.

This method resets all token usage statistics to zero, allowing for fresh tracking of token usage in subsequent operations.

save_model()

Save the model.

train_model()

Placeholder method to satisfy the base class requirement. This validator doesn’t require traditional training.