Skip to main content

Overview

The GraphStore provides unified storage for entities, timepoints, graphs, dialogs, and all simulation artifacts. Built on SQLModel (Pydantic + SQLAlchemy) with SQLite backend and WAL mode for concurrent access. Module: storage.py Database: timepoint.db (SQLite with WAL mode)

Initialization

Signature:
class GraphStore:
    def __init__(self, db_url: str = "sqlite:///timepoint.db")
Example:
from storage import GraphStore

store = GraphStore("sqlite:///timepoint.db")
# Or custom location:
store = GraphStore("sqlite:////path/to/my_simulation.db")
Features:
  • Automatic table creation from SQLModel schemas
  • WAL mode enabled for concurrent reads/writes
  • 30-second busy timeout for write locks

Transactions

transaction()

Atomic database operations with automatic commit/rollback. Signature:
@contextmanager
def transaction(self) -> Generator[TransactionContext, None, None]
Example:
with store.transaction() as tx:
    tx.save_entity(entity)
    tx.save_timepoint(timepoint)
    tx.save_exposure_event(event)
    # All succeed or all rollback
TransactionContext Methods:
  • save_entity(entity: Entity) -> Entity
  • save_timepoint(timepoint: Timepoint) -> Timepoint
  • save_exposure_event(event: ExposureEvent) -> ExposureEvent
  • save_exposure_events(events: list[ExposureEvent])
  • save_dialog(dialog: Dialog) -> Dialog
  • save_relationship_trajectory(trajectory: RelationshipTrajectory)
  • save_timeline(timeline: Timeline) -> Timeline
  • save_query_history(query_history: QueryHistory)
  • save_prospective_state(prospective_state: ProspectiveState)

Entity Operations

save_entity()

Save or update an entity. Signature:
def save_entity(self, entity: Entity) -> Entity
Behavior:
  • If entity_id exists: Updates all fields
  • If new: Inserts entity
  • Marks entity_metadata as modified (JSON column)
Example:
from schemas import Entity, ResolutionLevel

entity = Entity(
    entity_id="alexander_hamilton",
    entity_type="human",
    resolution_level=ResolutionLevel.DIALOG,
    entity_metadata={
        "knowledge_state": ["Founded First Bank"],
        "energy_budget": 85.0,
        "personality_traits": [0.8, -0.3, 0.6, 0.9, -0.2]
    }
)

saved = store.save_entity(entity)
print(f"Saved: {saved.entity_id} (id={saved.id})")

get_entity()

Retrieve entity by ID. Signature:
def get_entity(
    self,
    entity_id: str,
    timepoint: str | None = None
) -> Entity | None
Example:
entity = store.get_entity("alexander_hamilton")
if entity:
    print(f"Resolution: {entity.resolution_level}")
    print(f"Centrality: {entity.eigenvector_centrality}")

get_all_entities()

Retrieve all entities. Signature:
def get_all_entities(self) -> list[Entity]
Example:
entities = store.get_all_entities()
print(f"Total entities: {len(entities)}")

for entity in entities:
    print(f"{entity.entity_id}: {entity.resolution_level}")

Exposure Events (M3)

save_exposure_event()

Save a single exposure event. Signature:
def save_exposure_event(self, event: ExposureEvent) -> ExposureEvent
Example:
from schemas import ExposureEvent
from datetime import datetime

event = ExposureEvent(
    entity_id="alexander_hamilton",
    event_type="learned",
    information="Madison supports bicameral legislature",
    source="james_madison",
    timestamp=datetime(1787, 5, 25, 14, 0),
    confidence=0.9,
    timepoint_id="tp_001",
    run_id="run_123"
)

saved = store.save_exposure_event(event)

save_exposure_events()

Batch save exposure events. Signature:
def save_exposure_events(self, exposure_events: list[ExposureEvent])

get_exposure_events()

Get exposure events for an entity. Signature:
def get_exposure_events(
    self,
    entity_id: str,
    limit: int | None = None
) -> list[ExposureEvent]
Parameters:
  • entity_id: Entity identifier
  • limit: Maximum events to return (most recent first)
Example:
events = store.get_exposure_events(
    "alexander_hamilton",
    limit=10
)

for event in events:
    print(f"{event.timestamp}: {event.information}")
    print(f"  Source: {event.source}, Confidence: {event.confidence}")

get_exposure_events_by_run()

Get all exposure events for a simulation run. Signature:
def get_exposure_events_by_run(self, run_id: str) -> list[ExposureEvent]

get_entity_knowledge_at_timepoint()

Get what an entity knew at a specific timepoint. Signature:
def get_entity_knowledge_at_timepoint(
    self,
    entity_id: str,
    timepoint_id: str
) -> list[str]
Example:
knowledge = store.get_entity_knowledge_at_timepoint(
    "alexander_hamilton",
    "tp_005"
)

print("Hamilton knew at tp_005:")
for item in knowledge:
    print(f"- {item}")

Timepoint Operations

save_timepoint()

Save a timepoint. Signature:
def save_timepoint(self, timepoint: Timepoint) -> Timepoint
Example:
from schemas import Timepoint, ResolutionLevel
from datetime import datetime

timepoint = Timepoint(
    timepoint_id="tp_001",
    timeline_id="timeline_001",
    timestamp=datetime(1787, 5, 25, 9, 0),
    event_description="Constitutional Convention begins",
    entities_present=["hamilton", "madison", "washington"],
    causal_parent=None,  # First timepoint
    resolution_level=ResolutionLevel.DIALOG,
    run_id="run_123"
)

saved = store.save_timepoint(timepoint)

get_timepoint()

Get timepoint by ID (with LRU caching). Signature:
@lru_cache(maxsize=500)
def get_timepoint(self, timepoint_id: str) -> Timepoint | None

get_all_timepoints()

Get all timepoints ordered by timestamp. Signature:
def get_all_timepoints(self) -> list[Timepoint]

get_timepoints_by_run()

Get all timepoints for a simulation run. Signature:
def get_timepoints_by_run(self, run_id: str) -> list[Timepoint]

get_timepoints_in_range()

Get timepoints within a time range. Signature:
def get_timepoints_in_range(
    self,
    start_time=None,
    end_time=None
) -> list[Timepoint]
Example:
from datetime import datetime

start = datetime(1787, 5, 25)
end = datetime(1787, 9, 17)

timepoints = store.get_timepoints_in_range(start, end)
print(f"Found {len(timepoints)} timepoints in range")

get_successor_timepoints()

Get timepoints that have this timepoint as causal parent. Signature:
def get_successor_timepoints(self, timepoint_id: str) -> list[Timepoint]

get_predecessor_timepoints()

Get causal parent(s) of a timepoint. Signature:
def get_predecessor_timepoints(self, timepoint_id: str) -> list[Timepoint]

Graph Operations

save_graph()

Serialize NetworkX graph to database. Signature:
def save_graph(self, graph: nx.Graph, timepoint_id: str)
Example:
import networkx as nx

graph = nx.Graph()
graph.add_node("hamilton", role="primary")
graph.add_node("madison", role="primary")
graph.add_edge("hamilton", "madison", relationship="ally", weight=0.9)

store.save_graph(graph, "tp_001")

load_graph()

Deserialize NetworkX graph from database. Signature:
def load_graph(self, timepoint_id: str) -> nx.Graph | None
Example:
graph = store.load_graph("tp_001")
if graph:
    print(f"Nodes: {graph.number_of_nodes()}")
    print(f"Edges: {graph.number_of_edges()}")
    
    for u, v, data in graph.edges(data=True):
        print(f"{u} <-{data['relationship']}-> {v}")

Dialog Storage (M11)

save_dialog()

Save a dialog conversation. Signature:
def save_dialog(self, dialog: Dialog) -> Dialog
Example:
import json
from schemas import Dialog
from datetime import datetime

turns = [
    {
        "speaker": "hamilton",
        "content": "We need a strong central bank.",
        "timestamp": "1791-01-15T10:00:00",
        "emotional_tone": "confident",
        "knowledge_references": ["banking_expertise"]
    },
    {
        "speaker": "jefferson",
        "content": "That concentrates too much power.",
        "timestamp": "1791-01-15T10:01:00",
        "emotional_tone": "skeptical",
        "knowledge_references": ["states_rights"]
    }
]

dialog = Dialog(
    dialog_id="dialog_001",
    timepoint_id="tp_010",
    participants=json.dumps(["hamilton", "jefferson"]),
    turns=json.dumps(turns),
    context_used=json.dumps({"location": "cabinet_room"}),
    duration_seconds=120,
    information_transfer_count=2,
    run_id="run_123"
)

saved = store.save_dialog(dialog)

get_dialog()

Get dialog by ID. Signature:
def get_dialog(self, dialog_id: str) -> Dialog | None

get_dialogs_at_timepoint()

Get all dialogs at a timepoint. Signature:
def get_dialogs_at_timepoint(self, timepoint_id: str) -> list[Dialog]

load_all_dialogs()

Load all dialogs (for narrative export). Signature:
def load_all_dialogs(self) -> list[Dialog]

Relationship Trajectories (M13)

save_relationship_trajectory()

Save relationship evolution over time. Signature:
def save_relationship_trajectory(
    self,
    trajectory: RelationshipTrajectory
) -> RelationshipTrajectory

get_relationship_trajectory_between()

Get most recent trajectory between two entities. Signature:
def get_relationship_trajectory_between(
    self,
    entity_a: str,
    entity_b: str
) -> RelationshipTrajectory | None

get_entity_relationships()

Get all relationships involving an entity. Signature:
def get_entity_relationships(
    self,
    entity_id: str
) -> list[RelationshipTrajectory]

Timeline Management (M12)

save_timeline()

Save a timeline (for counterfactual branching). Signature:
def save_timeline(self, timeline: Timeline) -> Timeline

get_timeline()

Get timeline by ID. Signature:
def get_timeline(self, timeline_id: str) -> Timeline | None

get_child_timelines()

Get all child timelines of a parent. Signature:
def get_child_timelines(self, parent_timeline_id: str) -> list[Timeline]
Example:
baseline = store.get_timeline("timeline_baseline")
children = store.get_child_timelines("timeline_baseline")

for child in children:
    print(f"Branch: {child.timeline_id}")
    print(f"  Point: {child.branch_point}")
    print(f"  Intervention: {child.intervention_description}")

Query History (M5)

save_query_history()

Save query history for resolution tracking. Signature:
def save_query_history(self, query_history: QueryHistory) -> QueryHistory

get_query_history_for_entity()

Get query history for an entity. Signature:
def get_query_history_for_entity(
    self,
    entity_id: str,
    limit: int = 100
) -> list[QueryHistory]

get_entity_query_count()

Get total queries for an entity. Signature:
def get_entity_query_count(self, entity_id: str) -> int

get_entity_elevation_count()

Get number of resolution elevations. Signature:
def get_entity_elevation_count(self, entity_id: str) -> int

Prospective State (M15)

save_prospective_state()

Save entity’s expectations and forecasts. Signature:
def save_prospective_state(self, prospective_state) -> ProspectiveState

get_prospective_states_for_entity()

Get all prospective states for an entity. Signature:
def get_prospective_states_for_entity(
    self,
    entity_id: str
) -> list[ProspectiveState]

Convergence Evaluation

save_convergence_set()

Save convergence analysis results. Signature:
def save_convergence_set(
    self,
    convergence_set: ConvergenceSet
) -> ConvergenceSet

get_convergence_sets()

Get convergence sets with filtering. Signature:
def get_convergence_sets(
    self,
    template_id: str | None = None,
    min_score: float | None = None,
    limit: int = 100
) -> list[ConvergenceSet]

get_convergence_stats()

Get aggregate convergence statistics. Signature:
def get_convergence_stats(self) -> dict
Returns: Dictionary with:
  • total_sets: Number of convergence sets
  • average_score: Mean convergence score
  • min_score: Minimum score
  • max_score: Maximum score
  • grade_distribution: Count by grade (A/B/C/D/F)
  • template_coverage: Count by template

SQLite Schema

Tables:
  • entity: Entities with resolution levels and metadata
  • timepoint: Temporal events with causal chains
  • exposureevent: Knowledge exposure tracking
  • dialog: Dialog conversations
  • relationshiptrajectory: Relationship evolution
  • timeline: Timeline branching (counterfactuals)
  • queryhistory: Query pattern tracking
  • prospectivestate: Entity expectations
  • convergenceset: Cross-run causal analysis
  • systemprompt: Prompt templates
  • validationrule: Validation configurations
  • environmententity: Scene environments
  • atmosphereentity: Scene atmosphere
  • crowdentity: Crowd dynamics
Indexes:
  • entity_id (unique on entity)
  • timepoint_id (unique on timepoint)
  • timeline_id (on timepoint)
  • causal_parent (on timepoint)
  • run_id (on timepoint, exposureevent, dialog)

Performance

WAL Mode Benefits:
  • Multiple readers + one writer simultaneously
  • Faster writes (no lock blocking)
  • Better concurrency
LRU Caching:
  • get_timepoint() cached (500 entries)
  • Speeds up temporal traversal
Batch Operations:
  • save_exposure_events() for bulk inserts
  • save_dialogs() for batch dialog storage
  • Transaction contexts for atomic multi-table writes

Best Practices

  1. Use transactions for multi-table operations
  2. Batch exposure events instead of one-by-one
  3. Set run_id for convergence analysis
  4. Check entity existence before updates
  5. Use LRU cache by calling get_timepoint()
  6. Clean old data periodically with _clear_database()
  7. Monitor database size (WAL can grow)

Example Workflow

from storage import GraphStore
from schemas import Entity, Timepoint, ExposureEvent, ResolutionLevel
from datetime import datetime
import uuid

store = GraphStore()
run_id = str(uuid.uuid4())

# Atomic multi-table write
with store.transaction() as tx:
    # Save entity
    entity = Entity(
        entity_id="hamilton",
        entity_type="human",
        resolution_level=ResolutionLevel.DIALOG
    )
    tx.save_entity(entity)
    
    # Save timepoint
    timepoint = Timepoint(
        timepoint_id="tp_001",
        timestamp=datetime.now(),
        event_description="Scene begins",
        entities_present=["hamilton"],
        run_id=run_id
    )
    tx.save_timepoint(timepoint)
    
    # Save exposure event
    event = ExposureEvent(
        entity_id="hamilton",
        event_type="witnessed",
        information="Convention begins",
        source="scene",
        timestamp=datetime.now(),
        timepoint_id="tp_001",
        run_id=run_id
    )
    tx.save_exposure_event(event)

print("Transaction committed successfully")