Skip to main content

Overview

The workflows module provides LangGraph-based workflows for entity training, dialog synthesis, relationship analysis, and temporal simulation. Each workflow is a directed acyclic graph (DAG) of nodes with typed state. Module: workflows/ Architecture:
LangGraph Workflows

StateGraph (typed state)

Nodes (Python functions)

Edges (data flow)

create_entity_training_workflow()

LangGraph workflow for parallel entity training with progressive resolution elevation. Signature:
def create_entity_training_workflow(
    llm_client: LLMClient,
    store: GraphStore
) -> CompiledGraph
Workflow DAG:
load_graph

populate_entities_parallel

aggregate_populations

validate_entities

compress_tensors

trigger_prospection_batch

progressive_training_check

END
Example:
from workflows import create_entity_training_workflow
from llm_v2 import LLMClient
from storage import GraphStore
from schemas import ResolutionLevel
import networkx as nx

llm = LLMClient.from_hydra_config(cfg)
store = GraphStore()

# Create workflow
workflow = create_entity_training_workflow(llm, store)

# Prepare state
graph = nx.Graph()
graph.add_node("hamilton", role="primary")
graph.add_node("madison", role="secondary")

state = {
    "graph": graph,
    "entities": [],  # Will be populated
    "timepoint": "tp_001",
    "resolution": ResolutionLevel.DIALOG,
    "violations": [],
    "results": {},
    "entity_populations": {}
}

# Run workflow
final_state = workflow.invoke(state)

print(f"Trained {len(final_state['entities'])} entities")
print(f"Violations: {len(final_state['violations'])}")

WorkflowState

Typed state for entity training workflow. Schema:
class WorkflowState(TypedDict):
    graph: nx.Graph
    entities: list[Entity]
    timepoint: str
    resolution: ResolutionLevel
    violations: list[dict]
    results: dict
    entity_populations: dict[str, EntityPopulation]

Workflow Nodes

load_graph:
  • Load NetworkX graph from storage
  • Fallback to test graph if not found
populate_entities_parallel:
  • Populate all entities concurrently using asyncio
  • Call LLMClient.populate_entity() for each node
  • Store results in entity_populations
aggregate_populations:
  • Convert EntityPopulation → Entity
  • Preserve orchestrator metadata if present
  • Generate TTM tensors
  • Merge cognitive tensors with existing data
validate_entities:
  • Validate all entities using Validator.validate_all()
  • Check temporal causality
  • Check circadian constraints (M14)
  • Check knowledge network flow
  • Store violations in state
compress_tensors:
  • Apply PCA/SVD compression for TENSOR_ONLY entities
  • Store compressed versions for all entities
  • Remove full tensors for TENSOR_ONLY to save space
trigger_prospection_batch:
  • Trigger prospection (M15) for eligible entities
  • Generate expectations and forecasts
  • Refine tensors from prospective states
progressive_training_check:
  • Check entities needing resolution elevation (M2.4)
  • Elevate based on centrality and query patterns
  • Update entities in storage

LangGraph Node Examples

Custom Node: Knowledge Extraction

from langgraph.graph import StateGraph
from typing import TypedDict

class KnowledgeState(TypedDict):
    dialog: Dialog
    knowledge_items: list[KnowledgeItem]
    confidence: float

def extract_knowledge_node(state: KnowledgeState) -> KnowledgeState:
    """Extract knowledge items from dialog using LLM"""
    from workflows.knowledge_extraction import extract_knowledge_from_dialog
    
    result = extract_knowledge_from_dialog(
        state['dialog'],
        llm_client,
        model="meta-llama/llama-3.1-70b-instruct"
    )
    
    state['knowledge_items'] = result.items
    state['confidence'] = sum(i.confidence for i in result.items) / len(result.items)
    return state

# Build workflow
workflow = StateGraph(KnowledgeState)
workflow.add_node("extract_knowledge", extract_knowledge_node)
workflow.add_edge("extract_knowledge", END)
workflow.set_entry_point("extract_knowledge")

app = workflow.compile()

Custom Node: Relationship Evolution

class RelationshipState(TypedDict):
    entity_a: str
    entity_b: str
    timepoints: list[Timepoint]
    trajectory: RelationshipTrajectory

def analyze_trajectory_node(state: RelationshipState) -> RelationshipState:
    """Analyze relationship evolution across timepoints"""
    from workflows.relationship_analysis import analyze_relationship_evolution
    
    trajectory = analyze_relationship_evolution(
        state['entity_a'],
        state['entity_b'],
        state['timepoints'],
        store,
        llm_client
    )
    
    state['trajectory'] = trajectory
    return state

TemporalAgent

Modal temporal causality agent for time-as-entity modeling. Signature:
class TemporalAgent:
    def __init__(
        self,
        mode: TemporalMode | None = None,
        config: dict | None = None,
        store: GraphStore | None = None,
        llm_client: LLMClient | None = None,
        temporal_config: TemporalConfig | None = None
    )
Modes:
  • FORWARD: Standard causality (no anachronisms)
  • PORTAL: Backward inference from endpoint
  • DIRECTORIAL: Narrative structure with dramatic arcs
  • CYCLICAL: Time loops and prophecy
  • BRANCHING: Counterfactual what-if scenarios

Main Methods

determine_fidelity_temporal_strategy:
@track_mechanism("M1+M17", "adaptive_fidelity_temporal_strategy")
def determine_fidelity_temporal_strategy(
    self,
    config: TemporalConfig,
    context: dict
) -> FidelityTemporalStrategy
Determine optimal fidelity + temporal allocation based on mode. Example:
from workflows import TemporalAgent
from schemas import TemporalMode

agent = TemporalAgent(
    mode=TemporalMode.PORTAL,
    store=store,
    llm_client=llm
)

strategy = agent.determine_fidelity_temporal_strategy(
    config=temporal_config,
    context={
        "portal_state": portal_state,
        "origin_year": 1776,
        "entities": entities,
        "token_budget": 15000
    }
)

print(f"Timepoints: {strategy.timepoint_count}")
print(f"Fidelity schedule: {strategy.fidelity_schedule}")
print(f"Temporal steps: {strategy.temporal_steps}")
print(f"Estimated tokens: {strategy.estimated_tokens}")
get_entity_token_budget:
@track_mechanism("M1", "fidelity_token_budget")
def get_entity_token_budget(self, entity: Entity) -> int
Get token budget based on entity’s resolution level. Token Budgets by Resolution:
RESOLUTION_TOKEN_BUDGET = {
    ResolutionLevel.TENSOR_ONLY: 100,
    ResolutionLevel.SCENE: 500,
    ResolutionLevel.GRAPH: 1000,
    ResolutionLevel.DIALOG: 3000,
    ResolutionLevel.TRAINED: 5000,
    ResolutionLevel.FULL_DETAIL: 8000
}

Fidelity Templates

Pre-configured fidelity allocation patterns.

Minimalist Template

Distribution: 70% TENSOR, 21% SCENE, 7% DIALOG Use case: Token budget limited, broad coverage needed
strategy = agent._apply_fidelity_template(
    "minimalist",
    suggested_steps=10,
    total_months=120
)

Balanced Template

Distribution: 33% TENSOR, 33% SCENE, 20% GRAPH, 13% DIALOG Use case: Standard simulations, balanced detail
strategy = agent._apply_fidelity_template(
    "balanced",
    suggested_steps=15,
    total_months=120
)

Portal Pivots Template

Distribution:
  • 2 endpoints: TRAINED
  • Middle: 50% TENSOR, 25% SCENE, 13% DIALOG
Use case: Portal mode with pivot detection
strategy = agent._apply_fidelity_template(
    "portal_pivots",
    suggested_steps=20,
    total_months=120
)

Workflow Utilities

retrain_high_traffic_entities()

Progressive training for high-usage entities. Signature:
@track_mechanism("M2", "progressive_training")
def retrain_high_traffic_entities(
    graph: nx.Graph,
    store: GraphStore,
    llm_client: LLMClient
) -> tuple[int, int]
Returns: (elevated_count, retrained_count) Example:
from workflows import retrain_high_traffic_entities

elevated, retrained = retrain_high_traffic_entities(
    graph,
    store,
    llm
)

print(f"Elevated {elevated} entities")
print(f"Retrained {retrained} entities")

Dialog Synthesis Functions

synthesize_dialog:
def synthesize_dialog(
    participants: list[Entity],
    timepoint: Timepoint,
    context: dict,
    llm_client: LLMClient,
    store: GraphStore
) -> Dialog
couple_pain_to_cognition:
def couple_pain_to_cognition(
    entity: Entity,
    pain_level: float,
    pain_location: str | None
) -> CognitiveTensor
compute_relationship_metrics:
def compute_relationship_metrics(
    entity_a: Entity,
    entity_b: Entity,
    store: GraphStore
) -> RelationshipMetrics

Prospection Functions

generate_prospective_state:
def generate_prospective_state(
    entity: Entity,
    timepoint: Timepoint,
    llm_client: LLMClient,
    forecast_horizon_days: int = 30
) -> ProspectiveState
compute_anxiety_from_expectations:
def compute_anxiety_from_expectations(
    expectations: list[Expectation]
) -> float

Counterfactual Functions

create_counterfactual_branch:
def create_counterfactual_branch(
    baseline_timeline: Timeline,
    intervention: Intervention,
    branch_point: Timepoint,
    store: GraphStore
) -> Timeline
compare_timelines:
def compare_timelines(
    baseline: Timeline,
    counterfactual: Timeline,
    store: GraphStore
) -> BranchComparison
Mode-specific strategy classes for temporal simulation.

PortalStrategy

Backward simulation from endpoint to origin.
from workflows import PortalStrategy
from schemas import TemporalMode

strategy = PortalStrategy()
paths = strategy.run(config)

for path in paths:
    print(f"Path: {len(path.states)} states")
    for state in path.states:
        print(f"  {state['timepoint_id']}: {state['event']}")

DirectorialStrategy

Narrative-driven simulation with dramatic structure.
from workflows import DirectorialStrategy

strategy = DirectorialStrategy()
paths = strategy.run(config)

for path in paths:
    for state in path.states:
        tension = state.get('tension_score', 0)
        print(f"{state['timepoint_id']}: tension={tension:.2f}")

CyclicalStrategy

Time loops and prophecy.
from workflows import CyclicalStrategy

strategy = CyclicalStrategy()
paths = strategy.run(config)

for path in paths:
    for state in path.states:
        cycle_pos = state.get('cycle_position', 0)
        print(f"{state['timepoint_id']}: cycle_pos={cycle_pos}")

BranchingStrategy

Counterfactual what-if scenarios.
from workflows import BranchingStrategy

strategy = BranchingStrategy()
paths = strategy.run(config)

print(f"Generated {len(paths)} counterfactual branches")

Scene Environment Functions

create_environment_entity:
def create_environment_entity(
    scene_id: str,
    timepoint_id: str,
    location: str,
    capacity: int,
    temperature: float,
    lighting: float
) -> EnvironmentEntity
compute_scene_atmosphere:
def compute_scene_atmosphere(
    entities: list[Entity],
    environment: EnvironmentEntity,
    relationships: nx.Graph
) -> AtmosphereEntity
compute_crowd_dynamics:
def compute_crowd_dynamics(
    entities: list[Entity],
    scene_id: str,
    timepoint_id: str
) -> CrowdEntity

Animistic Entity Functions

create_animistic_entity:
def create_animistic_entity(
    entity_id: str,
    entity_type: str,
    context: dict,
    llm_client: LLMClient
) -> Entity
should_create_animistic_entity:
def should_create_animistic_entity(
    entity_type: str,
    context: dict
) -> bool
infer_species_from_context:
def infer_species_from_context(
    entity_id: str,
    context: dict
) -> str

Best Practices

  1. Use typed state (TypedDict) for all workflows
  2. Track mechanisms with @track_mechanism decorator
  3. Batch LLM calls in parallel nodes
  4. Validate state between nodes
  5. Use transactions for multi-table writes
  6. Cache graphs to avoid reloading
  7. Monitor token usage via TemporalAgent
  8. Choose appropriate templates for fidelity allocation
  9. Handle node failures gracefully
  10. Log workflow execution for debugging

Example: Custom Workflow

from langgraph.graph import StateGraph, END
from typing import TypedDict

class CustomState(TypedDict):
    input_data: dict
    processed_data: dict
    validation_passed: bool

def process_node(state: CustomState) -> CustomState:
    # Process input data
    state['processed_data'] = {
        'result': state['input_data']['value'] * 2
    }
    return state

def validate_node(state: CustomState) -> CustomState:
    # Validate processed data
    state['validation_passed'] = (
        state['processed_data']['result'] > 0
    )
    return state

def route_validation(state: CustomState) -> str:
    if state['validation_passed']:
        return "success"
    else:
        return "failure"

# Build workflow
workflow = StateGraph(CustomState)
workflow.add_node("process", process_node)
workflow.add_node("validate", validate_node)
workflow.add_node("success", lambda s: s)
workflow.add_node("failure", lambda s: s)

workflow.add_edge("process", "validate")
workflow.add_conditional_edges(
    "validate",
    route_validation,
    {"success": "success", "failure": "failure"}
)
workflow.add_edge("success", END)
workflow.add_edge("failure", END)

workflow.set_entry_point("process")

app = workflow.compile()

# Run workflow
result = app.invoke({
    "input_data": {"value": 42},
    "processed_data": {},
    "validation_passed": False
})

print(result)