plexus.scores.LangGraphScore module
- exception plexus.scores.LangGraphScore.BatchProcessingPause(thread_id, state, batch_job_id=None, message=None)
Bases:
Exception
Exception raised when execution should pause for batch processing.
- __init__(thread_id, state, batch_job_id=None, message=None)
- class plexus.scores.LangGraphScore.LangGraphScore(**parameters)
Bases:
Score
,LangChainUser
A Score implementation that uses LangGraph for orchestrating LLM-based classification.
LangGraphScore enables complex classification logic using a graph of LLM operations. It provides: - Declarative graph definition in YAML - State management and checkpointing - Cost tracking and optimization - Batch processing support - Integration with multiple LLM providers
The graph is defined in the scorecard YAML: ```yaml scores:
- ComplexScore:
class: LangGraphScore model_provider: AzureChatOpenAI model_name: gpt-4 graph:
name: extract_context type: prompt template: Extract relevant context… output: context
name: classify type: prompt template: Based on the context… input: context output: classification
name: validate type: condition input: classification conditions:
value: Yes goto: explain_yes
value: No goto: explain_no
Common usage patterns: 1. Basic classification:
score = LangGraphScore(**config) result = await score.predict(context, Score.Input(
text=”content to classify”
))
- Batch processing:
- async for result in score.batch_predict(texts):
# Process each result
- Cost optimization:
score.reset_token_usage() result = await score.predict(…) costs = score.get_accumulated_costs()
Key features: - Graph visualization for debugging - Token usage tracking - State persistence - Error handling and retries - Batch processing optimization
LangGraphScore is commonly used with: - Complex classification tasks requiring multiple steps - Tasks needing explanation or reasoning - High-volume processing requiring cost optimization
Initialize the LangGraphScore.
This method sets up the score parameters and initializes basic attributes. The language model initialization is deferred to the async setup.
- Parameters:
parameters – Configuration parameters for the score and language model.
- class GraphState(*, text: str, metadata: dict | None = None, results: dict | None = None, messages: ~typing.List[~typing.Dict[str, ~typing.Any]] | None = None, is_not_empty: bool | None = None, value: str | None = None, explanation: str | None = None, reasoning: str | None = None, chat_history: ~typing.List[~typing.Any] = <factory>, completion: str | None = None, classification: str | None = None, confidence: float | None = None, retry_count: int | None = 0, at_llm_breakpoint: bool | None = False, good_call: str | None = None, good_call_explanation: str | None = None, non_qualifying_reason: str | None = None, non_qualifying_explanation: str | None = None, **extra_data: ~typing.Any)
Bases:
BaseModel
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- at_llm_breakpoint: bool | None
- chat_history: List[Any]
- classification: str | None
- completion: str | None
- confidence: float | None
- explanation: str | None
- good_call: str | None
- good_call_explanation: str | None
- is_not_empty: bool | None
- messages: List[Dict[str, Any]] | None
- metadata: dict | None
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'allow', 'validate_default': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- non_qualifying_explanation: str | None
- non_qualifying_reason: str | None
- reasoning: str | None
- results: dict | None
- retry_count: int | None
- text: str
- value: str | None
- class Parameters(*, scorecard_name: str | None = None, name: str | None = None, id: str | int | None = None, key: str | None = None, dependencies: List[dict] | None = None, data: dict | None = None, number_of_classes: int | None = None, label_score_name: str | None = None, label_field: str | None = None, model_provider: Literal['ChatOpenAI', 'AzureChatOpenAI', 'BedrockChat', 'ChatVertexAI'] = 'AzureChatOpenAI', model_name: str | None = None, model_region: str | None = None, temperature: float | None = 0, max_tokens: int | None = 500, graph: list[dict] | None = None, input: dict | None = None, output: dict | None = None, depends_on: List[str] | Dict[str, str | Dict[str, Any]] | None = None, single_line_messages: bool = False, checkpoint_db_path: str | None = './.plexus/checkpoints/langgraph.db', thread_id: str | None = None, postgres_url: str | None = None)
Bases:
Parameters
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- checkpoint_db_path: str | None
- depends_on: List[str] | Dict[str, str | Dict[str, Any]] | None
- graph: list[dict] | None
- input: dict | None
- max_tokens: int | None
- model_config: ClassVar[ConfigDict] = {'protected_namespaces': ()}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_name: str | None
- model_provider: Literal['ChatOpenAI', 'AzureChatOpenAI', 'BedrockChat', 'ChatVertexAI']
- model_region: str | None
- output: dict | None
- postgres_url: str | None
- single_line_messages: bool
- temperature: float | None
- thread_id: str | None
- class Result(*, parameters: Parameters, value: str | bool, metadata: dict = {}, error: str | None = None, explanation: str)
Bases:
Result
Model output containing the validation result.
- Parameters:
explanation – Detailed explanation of the validation result.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- explanation: str
- model_config: ClassVar[ConfigDict] = {'protected_namespaces': ()}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- __init__(**parameters)
Initialize the LangGraphScore.
This method sets up the score parameters and initializes basic attributes. The language model initialization is deferred to the async setup.
- Parameters:
parameters – Configuration parameters for the score and language model.
- static add_edges(workflow, node_instances, entry_point, graph_config)
Add edges between nodes in the workflow.
- async async_setup()
Asynchronous setup for LangGraphScore.
- async build_compiled_workflow()
Build the LangGraph workflow with optional persistence.
- async cleanup()
Cleanup resources.
- async classmethod create(**parameters)
- create_combined_graphstate_class(instances: list) Type[GraphState]
Dynamically create a combined GraphState class based on all nodes in the workflow.
- static create_value_setter_node(output_mapping: dict) LambdaType
- generate_graph_visualization(output_path: str = './tmp/workflow_graph.png')
Generate and save a visual representation of the workflow graph using LangChain’s built-in visualization capabilities.
- Parameters:
output_path – The file path where the graph image will be saved.
- static generate_input_aliasing_function(input_mapping: dict) LambdaType
- static generate_output_aliasing_function(output_mapping: dict) LambdaType
- get_accumulated_costs()
Calculate and return the accumulated costs for all computed elements.
This method computes the costs based on the token usage and the specific model being used. It includes input costs, output costs, and total costs.
- Returns:
A dictionary containing cost and usage information.
- get_example_refinement_templates()
Get the example refinement templates for the score by iterating over the graph nodes and asking each node for its example refinement template.
- get_prompt_templates()
Get the prompt templates for the score by iterating over the graph nodes and asking each node for its prompt templates.
- async get_scoring_jobs_for_batch(batch_job_id: str) List[Dict[str, Any]]
Get all scoring jobs associated with a batch job.
- get_token_usage()
Retrieve the current token usage statistics.
This method returns a dictionary containing the number of prompt tokens, completion tokens, total tokens, and successful requests made to the language model.
- Returns:
A dictionary with token usage statistics.
- predict(model_input: Input, thread_id: str | None = None, batch_data: Dict[str, Any] | None = None, **kwargs) Result
Make predictions using the LangGraph workflow.
Parameters
- model_inputScore.Input
The input data containing text and metadata
- thread_idOptional[str]
Thread ID for checkpointing
- batch_dataOptional[Dict[str, Any]]
Additional data for batch processing
- **kwargsAny
Additional keyword arguments
Returns
- Score.Result
The prediction result with value and explanation
- predict_validation()
Placeholder method to satisfy the base class requirement. This validator doesn’t require traditional training.
- preprocess_text(text)
- static process_node(node_data)
Process a single node to build its workflow.
- register_model()
Register the model.
- reset_token_usage()
Reset the token usage counters.
This method resets all token usage statistics to zero, allowing for fresh tracking of token usage in subsequent operations.
- save_model()
Save the model.
- train_model()
Placeholder method to satisfy the base class requirement. This validator doesn’t require traditional training.