Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/timepoint-ai/timepoint-pro/llms.txt

Use this file to discover all available pages before exploring further.

Overview

The GraphStore provides unified storage for entities, timepoints, graphs, dialogs, and all simulation artifacts. Built on SQLModel (Pydantic + SQLAlchemy) with SQLite backend and WAL mode for concurrent access. Module: storage.py Database: timepoint.db (SQLite with WAL mode)

Initialization

Signature:
class GraphStore:
    def __init__(self, db_url: str = "sqlite:///timepoint.db")
Example:
from storage import GraphStore

store = GraphStore("sqlite:///timepoint.db")
# Or custom location:
store = GraphStore("sqlite:////path/to/my_simulation.db")
Features:
  • Automatic table creation from SQLModel schemas
  • WAL mode enabled for concurrent reads/writes
  • 30-second busy timeout for write locks

Transactions

transaction()

Atomic database operations with automatic commit/rollback. Signature:
@contextmanager
def transaction(self) -> Generator[TransactionContext, None, None]
Example:
with store.transaction() as tx:
    tx.save_entity(entity)
    tx.save_timepoint(timepoint)
    tx.save_exposure_event(event)
    # All succeed or all rollback
TransactionContext Methods:
  • save_entity(entity: Entity) -> Entity
  • save_timepoint(timepoint: Timepoint) -> Timepoint
  • save_exposure_event(event: ExposureEvent) -> ExposureEvent
  • save_exposure_events(events: list[ExposureEvent])
  • save_dialog(dialog: Dialog) -> Dialog
  • save_relationship_trajectory(trajectory: RelationshipTrajectory)
  • save_timeline(timeline: Timeline) -> Timeline
  • save_query_history(query_history: QueryHistory)
  • save_prospective_state(prospective_state: ProspectiveState)

Entity Operations

save_entity()

Save or update an entity. Signature:
def save_entity(self, entity: Entity) -> Entity
Behavior:
  • If entity_id exists: Updates all fields
  • If new: Inserts entity
  • Marks entity_metadata as modified (JSON column)
Example:
from schemas import Entity, ResolutionLevel

entity = Entity(
    entity_id="alexander_hamilton",
    entity_type="human",
    resolution_level=ResolutionLevel.DIALOG,
    entity_metadata={
        "knowledge_state": ["Founded First Bank"],
        "energy_budget": 85.0,
        "personality_traits": [0.8, -0.3, 0.6, 0.9, -0.2]
    }
)

saved = store.save_entity(entity)
print(f"Saved: {saved.entity_id} (id={saved.id})")

get_entity()

Retrieve entity by ID. Signature:
def get_entity(
    self,
    entity_id: str,
    timepoint: str | None = None
) -> Entity | None
Example:
entity = store.get_entity("alexander_hamilton")
if entity:
    print(f"Resolution: {entity.resolution_level}")
    print(f"Centrality: {entity.eigenvector_centrality}")

get_all_entities()

Retrieve all entities. Signature:
def get_all_entities(self) -> list[Entity]
Example:
entities = store.get_all_entities()
print(f"Total entities: {len(entities)}")

for entity in entities:
    print(f"{entity.entity_id}: {entity.resolution_level}")

Exposure Events (M3)

save_exposure_event()

Save a single exposure event. Signature:
def save_exposure_event(self, event: ExposureEvent) -> ExposureEvent
Example:
from schemas import ExposureEvent
from datetime import datetime

event = ExposureEvent(
    entity_id="alexander_hamilton",
    event_type="learned",
    information="Madison supports bicameral legislature",
    source="james_madison",
    timestamp=datetime(1787, 5, 25, 14, 0),
    confidence=0.9,
    timepoint_id="tp_001",
    run_id="run_123"
)

saved = store.save_exposure_event(event)

save_exposure_events()

Batch save exposure events. Signature:
def save_exposure_events(self, exposure_events: list[ExposureEvent])

get_exposure_events()

Get exposure events for an entity. Signature:
def get_exposure_events(
    self,
    entity_id: str,
    limit: int | None = None
) -> list[ExposureEvent]
Parameters:
  • entity_id: Entity identifier
  • limit: Maximum events to return (most recent first)
Example:
events = store.get_exposure_events(
    "alexander_hamilton",
    limit=10
)

for event in events:
    print(f"{event.timestamp}: {event.information}")
    print(f"  Source: {event.source}, Confidence: {event.confidence}")

get_exposure_events_by_run()

Get all exposure events for a simulation run. Signature:
def get_exposure_events_by_run(self, run_id: str) -> list[ExposureEvent]

get_entity_knowledge_at_timepoint()

Get what an entity knew at a specific timepoint. Signature:
def get_entity_knowledge_at_timepoint(
    self,
    entity_id: str,
    timepoint_id: str
) -> list[str]
Example:
knowledge = store.get_entity_knowledge_at_timepoint(
    "alexander_hamilton",
    "tp_005"
)

print("Hamilton knew at tp_005:")
for item in knowledge:
    print(f"- {item}")

Timepoint Operations

save_timepoint()

Save a timepoint. Signature:
def save_timepoint(self, timepoint: Timepoint) -> Timepoint
Example:
from schemas import Timepoint, ResolutionLevel
from datetime import datetime

timepoint = Timepoint(
    timepoint_id="tp_001",
    timeline_id="timeline_001",
    timestamp=datetime(1787, 5, 25, 9, 0),
    event_description="Constitutional Convention begins",
    entities_present=["hamilton", "madison", "washington"],
    causal_parent=None,  # First timepoint
    resolution_level=ResolutionLevel.DIALOG,
    run_id="run_123"
)

saved = store.save_timepoint(timepoint)

get_timepoint()

Get timepoint by ID (with LRU caching). Signature:
@lru_cache(maxsize=500)
def get_timepoint(self, timepoint_id: str) -> Timepoint | None

get_all_timepoints()

Get all timepoints ordered by timestamp. Signature:
def get_all_timepoints(self) -> list[Timepoint]

get_timepoints_by_run()

Get all timepoints for a simulation run. Signature:
def get_timepoints_by_run(self, run_id: str) -> list[Timepoint]

get_timepoints_in_range()

Get timepoints within a time range. Signature:
def get_timepoints_in_range(
    self,
    start_time=None,
    end_time=None
) -> list[Timepoint]
Example:
from datetime import datetime

start = datetime(1787, 5, 25)
end = datetime(1787, 9, 17)

timepoints = store.get_timepoints_in_range(start, end)
print(f"Found {len(timepoints)} timepoints in range")

get_successor_timepoints()

Get timepoints that have this timepoint as causal parent. Signature:
def get_successor_timepoints(self, timepoint_id: str) -> list[Timepoint]

get_predecessor_timepoints()

Get causal parent(s) of a timepoint. Signature:
def get_predecessor_timepoints(self, timepoint_id: str) -> list[Timepoint]

Graph Operations

save_graph()

Serialize NetworkX graph to database. Signature:
def save_graph(self, graph: nx.Graph, timepoint_id: str)
Example:
import networkx as nx

graph = nx.Graph()
graph.add_node("hamilton", role="primary")
graph.add_node("madison", role="primary")
graph.add_edge("hamilton", "madison", relationship="ally", weight=0.9)

store.save_graph(graph, "tp_001")

load_graph()

Deserialize NetworkX graph from database. Signature:
def load_graph(self, timepoint_id: str) -> nx.Graph | None
Example:
graph = store.load_graph("tp_001")
if graph:
    print(f"Nodes: {graph.number_of_nodes()}")
    print(f"Edges: {graph.number_of_edges()}")
    
    for u, v, data in graph.edges(data=True):
        print(f"{u} <-{data['relationship']}-> {v}")

Dialog Storage (M11)

save_dialog()

Save a dialog conversation. Signature:
def save_dialog(self, dialog: Dialog) -> Dialog
Example:
import json
from schemas import Dialog
from datetime import datetime

turns = [
    {
        "speaker": "hamilton",
        "content": "We need a strong central bank.",
        "timestamp": "1791-01-15T10:00:00",
        "emotional_tone": "confident",
        "knowledge_references": ["banking_expertise"]
    },
    {
        "speaker": "jefferson",
        "content": "That concentrates too much power.",
        "timestamp": "1791-01-15T10:01:00",
        "emotional_tone": "skeptical",
        "knowledge_references": ["states_rights"]
    }
]

dialog = Dialog(
    dialog_id="dialog_001",
    timepoint_id="tp_010",
    participants=json.dumps(["hamilton", "jefferson"]),
    turns=json.dumps(turns),
    context_used=json.dumps({"location": "cabinet_room"}),
    duration_seconds=120,
    information_transfer_count=2,
    run_id="run_123"
)

saved = store.save_dialog(dialog)

get_dialog()

Get dialog by ID. Signature:
def get_dialog(self, dialog_id: str) -> Dialog | None

get_dialogs_at_timepoint()

Get all dialogs at a timepoint. Signature:
def get_dialogs_at_timepoint(self, timepoint_id: str) -> list[Dialog]

load_all_dialogs()

Load all dialogs (for narrative export). Signature:
def load_all_dialogs(self) -> list[Dialog]

Relationship Trajectories (M13)

save_relationship_trajectory()

Save relationship evolution over time. Signature:
def save_relationship_trajectory(
    self,
    trajectory: RelationshipTrajectory
) -> RelationshipTrajectory

get_relationship_trajectory_between()

Get most recent trajectory between two entities. Signature:
def get_relationship_trajectory_between(
    self,
    entity_a: str,
    entity_b: str
) -> RelationshipTrajectory | None

get_entity_relationships()

Get all relationships involving an entity. Signature:
def get_entity_relationships(
    self,
    entity_id: str
) -> list[RelationshipTrajectory]

Timeline Management (M12)

save_timeline()

Save a timeline (for counterfactual branching). Signature:
def save_timeline(self, timeline: Timeline) -> Timeline

get_timeline()

Get timeline by ID. Signature:
def get_timeline(self, timeline_id: str) -> Timeline | None

get_child_timelines()

Get all child timelines of a parent. Signature:
def get_child_timelines(self, parent_timeline_id: str) -> list[Timeline]
Example:
baseline = store.get_timeline("timeline_baseline")
children = store.get_child_timelines("timeline_baseline")

for child in children:
    print(f"Branch: {child.timeline_id}")
    print(f"  Point: {child.branch_point}")
    print(f"  Intervention: {child.intervention_description}")

Query History (M5)

save_query_history()

Save query history for resolution tracking. Signature:
def save_query_history(self, query_history: QueryHistory) -> QueryHistory

get_query_history_for_entity()

Get query history for an entity. Signature:
def get_query_history_for_entity(
    self,
    entity_id: str,
    limit: int = 100
) -> list[QueryHistory]

get_entity_query_count()

Get total queries for an entity. Signature:
def get_entity_query_count(self, entity_id: str) -> int

get_entity_elevation_count()

Get number of resolution elevations. Signature:
def get_entity_elevation_count(self, entity_id: str) -> int

Prospective State (M15)

save_prospective_state()

Save entity’s expectations and forecasts. Signature:
def save_prospective_state(self, prospective_state) -> ProspectiveState

get_prospective_states_for_entity()

Get all prospective states for an entity. Signature:
def get_prospective_states_for_entity(
    self,
    entity_id: str
) -> list[ProspectiveState]

Convergence Evaluation

save_convergence_set()

Save convergence analysis results. Signature:
def save_convergence_set(
    self,
    convergence_set: ConvergenceSet
) -> ConvergenceSet

get_convergence_sets()

Get convergence sets with filtering. Signature:
def get_convergence_sets(
    self,
    template_id: str | None = None,
    min_score: float | None = None,
    limit: int = 100
) -> list[ConvergenceSet]

get_convergence_stats()

Get aggregate convergence statistics. Signature:
def get_convergence_stats(self) -> dict
Returns: Dictionary with:
  • total_sets: Number of convergence sets
  • average_score: Mean convergence score
  • min_score: Minimum score
  • max_score: Maximum score
  • grade_distribution: Count by grade (A/B/C/D/F)
  • template_coverage: Count by template

SQLite Schema

Tables:
  • entity: Entities with resolution levels and metadata
  • timepoint: Temporal events with causal chains
  • exposureevent: Knowledge exposure tracking
  • dialog: Dialog conversations
  • relationshiptrajectory: Relationship evolution
  • timeline: Timeline branching (counterfactuals)
  • queryhistory: Query pattern tracking
  • prospectivestate: Entity expectations
  • convergenceset: Cross-run causal analysis
  • systemprompt: Prompt templates
  • validationrule: Validation configurations
  • environmententity: Scene environments
  • atmosphereentity: Scene atmosphere
  • crowdentity: Crowd dynamics
Indexes:
  • entity_id (unique on entity)
  • timepoint_id (unique on timepoint)
  • timeline_id (on timepoint)
  • causal_parent (on timepoint)
  • run_id (on timepoint, exposureevent, dialog)

Performance

WAL Mode Benefits:
  • Multiple readers + one writer simultaneously
  • Faster writes (no lock blocking)
  • Better concurrency
LRU Caching:
  • get_timepoint() cached (500 entries)
  • Speeds up temporal traversal
Batch Operations:
  • save_exposure_events() for bulk inserts
  • save_dialogs() for batch dialog storage
  • Transaction contexts for atomic multi-table writes

Best Practices

  1. Use transactions for multi-table operations
  2. Batch exposure events instead of one-by-one
  3. Set run_id for convergence analysis
  4. Check entity existence before updates
  5. Use LRU cache by calling get_timepoint()
  6. Clean old data periodically with _clear_database()
  7. Monitor database size (WAL can grow)

Example Workflow

from storage import GraphStore
from schemas import Entity, Timepoint, ExposureEvent, ResolutionLevel
from datetime import datetime
import uuid

store = GraphStore()
run_id = str(uuid.uuid4())

# Atomic multi-table write
with store.transaction() as tx:
    # Save entity
    entity = Entity(
        entity_id="hamilton",
        entity_type="human",
        resolution_level=ResolutionLevel.DIALOG
    )
    tx.save_entity(entity)
    
    # Save timepoint
    timepoint = Timepoint(
        timepoint_id="tp_001",
        timestamp=datetime.now(),
        event_description="Scene begins",
        entities_present=["hamilton"],
        run_id=run_id
    )
    tx.save_timepoint(timepoint)
    
    # Save exposure event
    event = ExposureEvent(
        entity_id="hamilton",
        event_type="witnessed",
        information="Convention begins",
        source="scene",
        timestamp=datetime.now(),
        timepoint_id="tp_001",
        run_id=run_id
    )
    tx.save_exposure_event(event)

print("Transaction committed successfully")