Documentation Index
Fetch the complete documentation index at: https://mintlify.com/timepoint-ai/timepoint-pro/llms.txt
Use this file to discover all available pages before exploring further.
Overview
The workflows module provides LangGraph-based workflows for entity training, dialog synthesis, relationship analysis, and temporal simulation. Each workflow is a directed acyclic graph (DAG) of nodes with typed state.
Module: workflows/
Architecture:
LangGraph Workflows
↓
StateGraph (typed state)
↓
Nodes (Python functions)
↓
Edges (data flow)
create_entity_training_workflow()
LangGraph workflow for parallel entity training with progressive resolution elevation.
Signature:
def create_entity_training_workflow(
llm_client: LLMClient,
store: GraphStore
) -> CompiledGraph
Workflow DAG:
load_graph
↓
populate_entities_parallel
↓
aggregate_populations
↓
validate_entities
↓
compress_tensors
↓
trigger_prospection_batch
↓
progressive_training_check
↓
END
Example:
from workflows import create_entity_training_workflow
from llm_v2 import LLMClient
from storage import GraphStore
from schemas import ResolutionLevel
import networkx as nx
llm = LLMClient.from_hydra_config(cfg)
store = GraphStore()
# Create workflow
workflow = create_entity_training_workflow(llm, store)
# Prepare state
graph = nx.Graph()
graph.add_node("hamilton", role="primary")
graph.add_node("madison", role="secondary")
state = {
"graph": graph,
"entities": [], # Will be populated
"timepoint": "tp_001",
"resolution": ResolutionLevel.DIALOG,
"violations": [],
"results": {},
"entity_populations": {}
}
# Run workflow
final_state = workflow.invoke(state)
print(f"Trained {len(final_state['entities'])} entities")
print(f"Violations: {len(final_state['violations'])}")
WorkflowState
Typed state for entity training workflow.
Schema:
class WorkflowState(TypedDict):
graph: nx.Graph
entities: list[Entity]
timepoint: str
resolution: ResolutionLevel
violations: list[dict]
results: dict
entity_populations: dict[str, EntityPopulation]
Workflow Nodes
load_graph:
- Load NetworkX graph from storage
- Fallback to test graph if not found
populate_entities_parallel:
- Populate all entities concurrently using asyncio
- Call LLMClient.populate_entity() for each node
- Store results in
entity_populations
aggregate_populations:
- Convert EntityPopulation → Entity
- Preserve orchestrator metadata if present
- Generate TTM tensors
- Merge cognitive tensors with existing data
validate_entities:
- Validate all entities using Validator.validate_all()
- Check temporal causality
- Check circadian constraints (M14)
- Check knowledge network flow
- Store violations in state
compress_tensors:
- Apply PCA/SVD compression for TENSOR_ONLY entities
- Store compressed versions for all entities
- Remove full tensors for TENSOR_ONLY to save space
trigger_prospection_batch:
- Trigger prospection (M15) for eligible entities
- Generate expectations and forecasts
- Refine tensors from prospective states
progressive_training_check:
- Check entities needing resolution elevation (M2.4)
- Elevate based on centrality and query patterns
- Update entities in storage
LangGraph Node Examples
from langgraph.graph import StateGraph
from typing import TypedDict
class KnowledgeState(TypedDict):
dialog: Dialog
knowledge_items: list[KnowledgeItem]
confidence: float
def extract_knowledge_node(state: KnowledgeState) -> KnowledgeState:
"""Extract knowledge items from dialog using LLM"""
from workflows.knowledge_extraction import extract_knowledge_from_dialog
result = extract_knowledge_from_dialog(
state['dialog'],
llm_client,
model="meta-llama/llama-3.1-70b-instruct"
)
state['knowledge_items'] = result.items
state['confidence'] = sum(i.confidence for i in result.items) / len(result.items)
return state
# Build workflow
workflow = StateGraph(KnowledgeState)
workflow.add_node("extract_knowledge", extract_knowledge_node)
workflow.add_edge("extract_knowledge", END)
workflow.set_entry_point("extract_knowledge")
app = workflow.compile()
Custom Node: Relationship Evolution
class RelationshipState(TypedDict):
entity_a: str
entity_b: str
timepoints: list[Timepoint]
trajectory: RelationshipTrajectory
def analyze_trajectory_node(state: RelationshipState) -> RelationshipState:
"""Analyze relationship evolution across timepoints"""
from workflows.relationship_analysis import analyze_relationship_evolution
trajectory = analyze_relationship_evolution(
state['entity_a'],
state['entity_b'],
state['timepoints'],
store,
llm_client
)
state['trajectory'] = trajectory
return state
TemporalAgent
Modal temporal causality agent for time-as-entity modeling.
Signature:
class TemporalAgent:
def __init__(
self,
mode: TemporalMode | None = None,
config: dict | None = None,
store: GraphStore | None = None,
llm_client: LLMClient | None = None,
temporal_config: TemporalConfig | None = None
)
Modes:
FORWARD: Standard causality (no anachronisms)
PORTAL: Backward inference from endpoint
DIRECTORIAL: Narrative structure with dramatic arcs
CYCLICAL: Time loops and prophecy
BRANCHING: Counterfactual what-if scenarios
Main Methods
determine_fidelity_temporal_strategy:
@track_mechanism("M1+M17", "adaptive_fidelity_temporal_strategy")
def determine_fidelity_temporal_strategy(
self,
config: TemporalConfig,
context: dict
) -> FidelityTemporalStrategy
Determine optimal fidelity + temporal allocation based on mode.
Example:
from workflows import TemporalAgent
from schemas import TemporalMode
agent = TemporalAgent(
mode=TemporalMode.PORTAL,
store=store,
llm_client=llm
)
strategy = agent.determine_fidelity_temporal_strategy(
config=temporal_config,
context={
"portal_state": portal_state,
"origin_year": 1776,
"entities": entities,
"token_budget": 15000
}
)
print(f"Timepoints: {strategy.timepoint_count}")
print(f"Fidelity schedule: {strategy.fidelity_schedule}")
print(f"Temporal steps: {strategy.temporal_steps}")
print(f"Estimated tokens: {strategy.estimated_tokens}")
get_entity_token_budget:
@track_mechanism("M1", "fidelity_token_budget")
def get_entity_token_budget(self, entity: Entity) -> int
Get token budget based on entity’s resolution level.
Token Budgets by Resolution:
RESOLUTION_TOKEN_BUDGET = {
ResolutionLevel.TENSOR_ONLY: 100,
ResolutionLevel.SCENE: 500,
ResolutionLevel.GRAPH: 1000,
ResolutionLevel.DIALOG: 3000,
ResolutionLevel.TRAINED: 5000,
ResolutionLevel.FULL_DETAIL: 8000
}
Fidelity Templates
Pre-configured fidelity allocation patterns.
Minimalist Template
Distribution: 70% TENSOR, 21% SCENE, 7% DIALOG
Use case: Token budget limited, broad coverage needed
strategy = agent._apply_fidelity_template(
"minimalist",
suggested_steps=10,
total_months=120
)
Balanced Template
Distribution: 33% TENSOR, 33% SCENE, 20% GRAPH, 13% DIALOG
Use case: Standard simulations, balanced detail
strategy = agent._apply_fidelity_template(
"balanced",
suggested_steps=15,
total_months=120
)
Portal Pivots Template
Distribution:
- 2 endpoints: TRAINED
- Middle: 50% TENSOR, 25% SCENE, 13% DIALOG
Use case: Portal mode with pivot detection
strategy = agent._apply_fidelity_template(
"portal_pivots",
suggested_steps=20,
total_months=120
)
Workflow Utilities
retrain_high_traffic_entities()
Progressive training for high-usage entities.
Signature:
@track_mechanism("M2", "progressive_training")
def retrain_high_traffic_entities(
graph: nx.Graph,
store: GraphStore,
llm_client: LLMClient
) -> tuple[int, int]
Returns: (elevated_count, retrained_count)
Example:
from workflows import retrain_high_traffic_entities
elevated, retrained = retrain_high_traffic_entities(
graph,
store,
llm
)
print(f"Elevated {elevated} entities")
print(f"Retrained {retrained} entities")
Dialog Synthesis Functions
synthesize_dialog:
def synthesize_dialog(
participants: list[Entity],
timepoint: Timepoint,
context: dict,
llm_client: LLMClient,
store: GraphStore
) -> Dialog
couple_pain_to_cognition:
def couple_pain_to_cognition(
entity: Entity,
pain_level: float,
pain_location: str | None
) -> CognitiveTensor
compute_relationship_metrics:
def compute_relationship_metrics(
entity_a: Entity,
entity_b: Entity,
store: GraphStore
) -> RelationshipMetrics
Prospection Functions
generate_prospective_state:
def generate_prospective_state(
entity: Entity,
timepoint: Timepoint,
llm_client: LLMClient,
forecast_horizon_days: int = 30
) -> ProspectiveState
compute_anxiety_from_expectations:
def compute_anxiety_from_expectations(
expectations: list[Expectation]
) -> float
Counterfactual Functions
create_counterfactual_branch:
def create_counterfactual_branch(
baseline_timeline: Timeline,
intervention: Intervention,
branch_point: Timepoint,
store: GraphStore
) -> Timeline
compare_timelines:
def compare_timelines(
baseline: Timeline,
counterfactual: Timeline,
store: GraphStore
) -> BranchComparison
Modal Strategies
Mode-specific strategy classes for temporal simulation.
PortalStrategy
Backward simulation from endpoint to origin.
from workflows import PortalStrategy
from schemas import TemporalMode
strategy = PortalStrategy()
paths = strategy.run(config)
for path in paths:
print(f"Path: {len(path.states)} states")
for state in path.states:
print(f" {state['timepoint_id']}: {state['event']}")
DirectorialStrategy
Narrative-driven simulation with dramatic structure.
from workflows import DirectorialStrategy
strategy = DirectorialStrategy()
paths = strategy.run(config)
for path in paths:
for state in path.states:
tension = state.get('tension_score', 0)
print(f"{state['timepoint_id']}: tension={tension:.2f}")
CyclicalStrategy
Time loops and prophecy.
from workflows import CyclicalStrategy
strategy = CyclicalStrategy()
paths = strategy.run(config)
for path in paths:
for state in path.states:
cycle_pos = state.get('cycle_position', 0)
print(f"{state['timepoint_id']}: cycle_pos={cycle_pos}")
BranchingStrategy
Counterfactual what-if scenarios.
from workflows import BranchingStrategy
strategy = BranchingStrategy()
paths = strategy.run(config)
print(f"Generated {len(paths)} counterfactual branches")
Scene Environment Functions
create_environment_entity:
def create_environment_entity(
scene_id: str,
timepoint_id: str,
location: str,
capacity: int,
temperature: float,
lighting: float
) -> EnvironmentEntity
compute_scene_atmosphere:
def compute_scene_atmosphere(
entities: list[Entity],
environment: EnvironmentEntity,
relationships: nx.Graph
) -> AtmosphereEntity
compute_crowd_dynamics:
def compute_crowd_dynamics(
entities: list[Entity],
scene_id: str,
timepoint_id: str
) -> CrowdEntity
Animistic Entity Functions
create_animistic_entity:
def create_animistic_entity(
entity_id: str,
entity_type: str,
context: dict,
llm_client: LLMClient
) -> Entity
should_create_animistic_entity:
def should_create_animistic_entity(
entity_type: str,
context: dict
) -> bool
infer_species_from_context:
def infer_species_from_context(
entity_id: str,
context: dict
) -> str
Best Practices
- Use typed state (TypedDict) for all workflows
- Track mechanisms with @track_mechanism decorator
- Batch LLM calls in parallel nodes
- Validate state between nodes
- Use transactions for multi-table writes
- Cache graphs to avoid reloading
- Monitor token usage via TemporalAgent
- Choose appropriate templates for fidelity allocation
- Handle node failures gracefully
- Log workflow execution for debugging
Example: Custom Workflow
from langgraph.graph import StateGraph, END
from typing import TypedDict
class CustomState(TypedDict):
input_data: dict
processed_data: dict
validation_passed: bool
def process_node(state: CustomState) -> CustomState:
# Process input data
state['processed_data'] = {
'result': state['input_data']['value'] * 2
}
return state
def validate_node(state: CustomState) -> CustomState:
# Validate processed data
state['validation_passed'] = (
state['processed_data']['result'] > 0
)
return state
def route_validation(state: CustomState) -> str:
if state['validation_passed']:
return "success"
else:
return "failure"
# Build workflow
workflow = StateGraph(CustomState)
workflow.add_node("process", process_node)
workflow.add_node("validate", validate_node)
workflow.add_node("success", lambda s: s)
workflow.add_node("failure", lambda s: s)
workflow.add_edge("process", "validate")
workflow.add_conditional_edges(
"validate",
route_validation,
{"success": "success", "failure": "failure"}
)
workflow.add_edge("success", END)
workflow.add_edge("failure", END)
workflow.set_entry_point("process")
app = workflow.compile()
# Run workflow
result = app.invoke({
"input_data": {"value": 42},
"processed_data": {},
"validation_passed": False
})
print(result)