Skip to content

DDR-003 — Protocol-based architecture for Engrama v1.0

Version: 0.2.0 | Date: 2026-04-12 | Status: Proposed Supersedes: DDR-003 v0.1.0 (Neo4j-only hybrid search)

Context

The DAFO analysis identified seven features Engrama needs to compete. Implementing them independently creates rework: hybrid search (#1) hard-wired to Neo4j conflicts with database abstraction (#2); temporal reasoning (#3) touches every query path; security (#4) and multi-scope (#6) cut across all storage operations.

This DDR designs a unified protocol-based architecture where all seven features compose cleanly. The key insight: #2 (database abstraction) is the foundation, not #1 (hybrid search). Every other feature builds on the storage protocols. We define those protocols first, then each feature becomes a layer that plugs in without refactoring what came before.

The seven features and their dependencies

#5 Benchmarks ← needs #1, #3
#6 Multi-scope ← needs #2, #4
#4 Security   ← needs #2, #3
#1 Hybrid search ← needs #2, #7
#3 Temporal   ← needs #2
#7 LLM-agnostic ← independent
#2 DB abstraction ← FOUNDATION

Implementation order: #2 + #7 → #1 + #3 → #4 → #6 → #5

Decision

Replace Engrama's direct Neo4j coupling with three protocol interfaces. Every skill, adapter, and MCP tool talks to these protocols — never to a specific database. Neo4j becomes the first (and default) implementation.


Part 1 — Storage protocols (#2)

1.1 GraphStore protocol

Covers: node CRUD, relationships, fulltext search, pattern queries.

from typing import Protocol, Any, runtime_checkable
from datetime import datetime

@runtime_checkable
class GraphStore(Protocol):
    """Abstract interface for graph storage backends."""

    # --- Node operations ---
    async def merge_node(
        self, label: str, key_field: str, key_value: str,
        properties: dict[str, Any],
        embedding: list[float] | None = None,
    ) -> dict[str, Any]:
        """Create or update a node. Always MERGE semantics."""
        ...

    async def get_node(
        self, label: str, key_field: str, key_value: str
    ) -> dict[str, Any] | None:
        """Retrieve a single node by its unique key."""
        ...

    async def delete_node(
        self, label: str, key_field: str, key_value: str,
        soft: bool = True,
    ) -> bool:
        """Delete or archive a node. soft=True sets status='archived'."""
        ...

    # --- Relationship operations ---
    async def merge_relation(
        self, from_label: str, from_key: str, from_value: str,
        rel_type: str,
        to_label: str, to_key: str, to_value: str,
    ) -> dict[str, Any]:
        """Create a relationship (idempotent)."""
        ...

    # --- Query operations ---
    async def get_neighbours(
        self, label: str, key_field: str, key_value: str,
        hops: int = 1, limit: int = 50,
    ) -> list[dict[str, Any]]:
        """Traverse N hops from a node."""
        ...

    async def fulltext_search(
        self, query: str, limit: int = 10,
    ) -> list[dict[str, Any]]:
        """Keyword search across all text properties.
        Returns: [{node, score, label, key}]"""
        ...

    async def run_cypher(
        self, query: str, params: dict[str, Any] | None = None,
    ) -> list[dict[str, Any]]:
        """Execute a raw query (backend-specific).
        For reflect patterns that need full query power.
        Backends that don't support Cypher raise NotImplementedError."""
        ...

    # --- Schema operations ---
    async def init_schema(self, schema: "SchemaDefinition") -> None:
        """Apply constraints, indexes, and seed data."""
        ...

    async def health_check(self) -> dict[str, Any]:
        """Return backend status and version info."""
        ...

    # --- Temporal operations (#3) ---
    async def get_node_history(
        self, label: str, key_field: str, key_value: str,
    ) -> list[dict[str, Any]]:
        """Return the temporal history of a node's property changes.
        Each entry: {properties, valid_from, valid_to, ingested_at}"""
        ...

    async def decay_scores(
        self, max_age_days: int = 90, decay_rate: float = 0.01,
    ) -> int:
        """Apply confidence decay to stale nodes.
        Returns count of nodes affected."""
        ...

    # --- Scope operations (#6) ---
    async def set_scope(
        self, scope: "MemoryScope",
    ) -> None:
        """Set the active scope for all subsequent operations.
        Scope filters are applied automatically to every query."""
        ...

1.2 VectorStore protocol

Covers: embedding storage, similarity search. May be the same backend as GraphStore (Neo4j) or a separate one (ChromaDB, pgvector).

@runtime_checkable
class VectorStore(Protocol):
    """Abstract interface for vector similarity search."""

    dimensions: int

    async def store_vectors(
        self, items: list[tuple[str, list[float]]],
    ) -> int:
        """Store embeddings for nodes. items: [(node_id, embedding)].
        Returns count stored."""
        ...

    async def search_vectors(
        self, query_embedding: list[float],
        limit: int = 10,
        scope: "MemoryScope | None" = None,
    ) -> list[dict[str, Any]]:
        """k-ANN similarity search.
        Returns: [{node_id, score, label, key}]"""
        ...

    async def delete_vectors(
        self, node_ids: list[str],
    ) -> int:
        """Remove embeddings for deleted/archived nodes."""
        ...

    async def count(self) -> int:
        """Total vectors stored."""
        ...

1.3 Why two protocols, not one

Some backends implement both (Neo4j has native graph + native vectors). Others don't (NetworkX has no vector index; ChromaDB has no graph). Keeping them separate lets us mix backends:

Combination GraphStore VectorStore Use case
Neo4j only Neo4jGraphStore Neo4jVectorStore Default, simplest
Neo4j + Chroma Neo4jGraphStore ChromaVectorStore Better vector perf
Kuzu + Chroma KuzuGraphStore ChromaVectorStore Embedded, no Docker
NetworkX + None NetworkXGraphStore NullVectorStore Zero-dep prototyping
PG+AGE + pgvector PgGraphStore PgVectorStore Single Postgres

1.4 Neo4j implementation (first backend)

The Neo4j adapter implements both protocols. It wraps the exact same Cypher queries that exist today in server.py — zero logic change, just extraction behind the interface.

class Neo4jBackend:
    """Implements both GraphStore and VectorStore using Neo4j."""

    def __init__(self, driver: AsyncDriver, config: dict):
        self.driver = driver
        self.config = config
        self._scope: MemoryScope | None = None

    # --- GraphStore ---
    async def merge_node(self, label, key_field, key_value,
                         properties, embedding=None):
        # Same MERGE Cypher as current engine.py
        # + stores embedding property if provided
        ...

    async def fulltext_search(self, query, limit=10):
        # Same db.index.fulltext.queryNodes('memory_search', ...)
        ...

    # --- VectorStore ---
    async def search_vectors(self, query_embedding, limit=10, scope=None):
        # CALL db.index.vector.queryNodes('memory_vectors', $k, $emb)
        ...

    async def store_vectors(self, items):
        # SET n.embedding = $embedding (already on the same node)
        ...

1.5 Configuration

# Storage backend
GRAPH_BACKEND=neo4j          # neo4j | kuzu | networkx | postgres
VECTOR_BACKEND=neo4j         # neo4j | chroma | pgvector | none

# Neo4j (when GRAPH_BACKEND=neo4j or VECTOR_BACKEND=neo4j)
NEO4J_URI=bolt://localhost:7687
NEO4J_USER=neo4j
NEO4J_PASSWORD=changeme

# ChromaDB (when VECTOR_BACKEND=chroma)
CHROMA_PATH=./chroma_data    # local persistent directory
CHROMA_COLLECTION=engrama

# Kuzu (when GRAPH_BACKEND=kuzu)
KUZU_PATH=./kuzu_data

The factory reads .env and returns the correct implementations:

def create_stores(config: dict) -> tuple[GraphStore, VectorStore]:
    graph = _create_graph_store(config)
    vector = _create_vector_store(config, graph)
    return graph, vector

When VECTOR_BACKEND matches GRAPH_BACKEND (e.g., both neo4j), the factory returns the same object for both — no wasted connections.


Part 2 — Embedding provider (#7)

Decoupled from any LLM vendor. Three implementations at launch.

@runtime_checkable
class EmbeddingProvider(Protocol):
    dimensions: int
    async def embed(self, text: str) -> list[float]: ...
    async def embed_batch(self, texts: list[str]) -> list[list[float]]: ...
    async def health_check(self) -> bool: ...
Provider Model Dims Local Cost
OllamaProvider nomic-embed-text 768 yes free
OllamaProvider nomic-embed-text-v2-moe 768 yes free, multilingual
OpenAIProvider text-embedding-3-small 1536 no $0.02/1M tokens
SentenceTransformerProvider all-MiniLM-L6-v2 384 yes free, no Ollama
NullProvider 0
EMBEDDING_PROVIDER=ollama           # ollama | openai | sentence_transformer | none
EMBEDDING_MODEL=nomic-embed-text
EMBEDDING_DIMENSIONS=768
OLLAMA_URL=http://localhost:11434

Text representation for embedding

Every node gets embedded from its concatenated text properties:

def node_to_text(label: str, props: dict) -> str:
    parts = [f"{label}:"]
    parts.append(props.get("name") or props.get("title", ""))
    for field in ("description", "notes", "rationale",
                  "solution", "context", "body"):
        if value := props.get(field):
            parts.append(value)
    return " ".join(parts)

Part 3 — Hybrid search engine (#1)

The search engine lives in core/search.py. It talks only to the protocols — zero database-specific code.

Algorithm

query ──► EmbeddingProvider.embed(query)
               │
     ┌─────────┴──────────┐
     ▼                    ▼
VectorStore          GraphStore
.search_vectors()    .fulltext_search()
     │                    │
     └─────────┬──────────┘
               ▼
         merge by node_id
         normalize scores to [0,1]
               │
               ▼
    final = α·v_score + (1-α)·f_score + β·graph_boost
               │
               ▼
         optional: GraphStore.get_neighbours()
         for top-K results (1-hop expansion)
               │
               ▼
         ranked results

Scoring

@dataclass
class HybridConfig:
    alpha: float = 0.6       # vector weight
    graph_beta: float = 0.15 # graph boost weight
    boost_cap: float = 0.3   # max graph boost per node
    vector_k: int = 20       # candidates from vector search
    fulltext_k: int = 20     # candidates from fulltext search
class HybridSearchEngine:
    def __init__(self, graph: GraphStore, vector: VectorStore,
                 embedder: EmbeddingProvider, config: HybridConfig):
        self.graph = graph
        self.vector = vector
        self.embedder = embedder
        self.config = config

    async def search(self, query: str, limit: int = 10) -> list[SearchResult]:
        # 1. Embed query
        query_vec = await self.embedder.embed(query)

        # 2. Parallel search (or sequential if same backend)
        v_results = await self.vector.search_vectors(
            query_vec, limit=self.config.vector_k
        ) if query_vec else []
        f_results = await self.graph.fulltext_search(
            query, limit=self.config.fulltext_k
        )

        # 3. Merge by node identity
        merged = self._merge(v_results, f_results)

        # 4. Score
        for r in merged:
            r.final_score = (
                self.config.alpha * r.vector_score
                + (1 - self.config.alpha) * r.fulltext_score
                + self.config.graph_beta * r.graph_boost
            )

        # 5. Rank and return
        merged.sort(key=lambda r: r.final_score, reverse=True)
        return merged[:limit]

Graceful degradation

Scenario Behaviour
EMBEDDING_PROVIDER=none α forced to 0.0, fulltext only
VECTOR_BACKEND=none Same as above
Ollama not running Fallback to fulltext + warning
Node has no embedding Appears in fulltext results only
No fulltext index (Kuzu) α forced to 1.0, vector only

Zero breaking changes. Current behaviour (fulltext-only on Neo4j) is the default configuration.


Part 4 — Temporal reasoning (#3)

Bi-temporal model

Every node carries four timestamps:

Field Meaning Set by
created_at When the node was first created engine (exists today)
updated_at Last modification time engine (exists today)
valid_from When the fact became true in the real world user/agent (new)
valid_to When the fact stopped being true user/agent (new)

valid_from and valid_to enable temporal queries: "What technologies was the project using in January?" — filter by valid_from <= date AND (valid_to IS NULL OR valid_to >= date).

Confidence decay

Nodes get a confidence property (float, 0.0–1.0, default 1.0). Decay runs on GraphStore.decay_scores():

confidence = initial × exp(-decay_rate × days_since_updated)

The reflect skill already uses confidence scores on Insight nodes. Extending this to all nodes lets search results prioritize recent, actively-maintained knowledge over stale entries.

TTL and lifecycle

The existing ForgetSkill already supports forget_by_ttl() with soft-delete (archive) and hard-delete (purge). Temporal reasoning extends this with:

  • Auto-decay on a schedule (CLI: engrama decay --rate 0.01)
  • Conflict detection: when engrama_remember updates a node whose valid_to is already set, flag it for review rather than silently overwriting

Schema additions

# Added to all nodes via GraphStore.merge_node()
temporal_fields = {
    "valid_from": "datetime | None",   # when fact became true
    "valid_to": "datetime | None",     # when fact stopped being true
    "confidence": "float",             # 0.0–1.0, decays over time
}

No new indexes needed — valid_from and valid_to are filtered in queries, not searched. The existing updated_at range index covers the decay calculation.


Part 5 — Memory security (#4)

Threat model

The OWASP Top 10 for Agentic Applications (Dec 2025) classifies memory poisoning as ASI06. The MINJA attack achieves >95% injection success rate. For Engrama — targeting security professionals — this is reputationally critical.

Defence layers

Layer 1 — Input sanitisation (in the engine, above the stores):

class Sanitiser:
    """Validates and cleans all inputs before they reach storage."""

    def sanitise_properties(self, props: dict) -> dict:
        """Strip injection attempts from property values."""
        ...

    def validate_label(self, label: str) -> str:
        """Whitelist-check against schema labels."""
        ...

    def validate_relation(self, rel_type: str) -> str:
        """Whitelist-check against schema relation types."""
        ...

Layer 2 — Provenance tracking (metadata on every write):

provenance_fields = {
    "source": "str",         # "mcp" | "sdk" | "cli" | "sync"
    "source_agent": "str",   # which agent wrote this
    "source_session": "str", # session identifier
    "trust_level": "float",  # 0.0–1.0, based on source
}

Layer 3 — Trust-aware retrieval (in the search engine):

Search results are weighted by trust level. Nodes written by verified sources (vault sync, CLI) get higher trust than those from agent conversations. The hybrid score formula extends:

final = α·vector + (1-α)·fulltext + β·graph_boost + γ·trust_level

Where γ = 0.1 by default. This means a low-trust node needs higher semantic/keyword relevance to rank above a high-trust node.

Layer 4 — Scope isolation (see Part 6).


Part 6 — Multi-scope memory (#6)

Scope model

@dataclass
class MemoryScope:
    user_id: str | None = None      # whose memory
    agent_id: str | None = None     # which agent
    session_id: str | None = None   # which conversation
    org_id: str | None = None       # which organisation

When a scope is set via GraphStore.set_scope(), every query automatically filters by the scope fields. Nodes created in a scope carry those scope fields as properties.

Scope hierarchy

org_id (broadest)
  └── user_id
        └── agent_id
              └── session_id (narrowest)

A query with user_id="alice" sees: - All nodes with user_id="alice" (her personal memory) - All nodes with org_id="acme" and no user_id (shared org memory) - All nodes with no scope fields (global/public memory)

Implementation

For Neo4j, scopes become property filters on every MATCH clause. The GraphStore.set_scope() method stores the active scope, and merge_node() / fulltext_search() / search_vectors() all apply it automatically.

For v1, Engrama remains single-user (scope fields exist but default to None). Multi-user support is a configuration change, not a code change.


Part 7 — Benchmarks (#5)

Target benchmarks

Benchmark What it measures Leader (2026)
LOCOMO Long conversation memory (1,986 questions) MemMachine 91.7%
LongMemEval Long-term memory evaluation (500 questions) Mem0 93.0%

What we need before benchmarking

  • Hybrid search (#1) — LOCOMO heavily tests semantic recall
  • Temporal reasoning (#3) — LongMemEval tests temporal questions
  • A benchmark harness that loads test data, runs queries, scores results

Realistic targets

With hybrid search (graph+vector+fulltext), Engrama should target: - LOCOMO: 70–80% (competitive, not leading) - LongMemEval: 75–85% (graph boost helps temporal questions)

Even modest scores published transparently establish credibility. The graph boost term is Engrama's structural advantage — no competitor uses graph topology as a ranking signal.


Revised directory structure

engrama/
├── core/
│   ├── protocols.py       # GraphStore, VectorStore, EmbeddingProvider
│   ├── search.py          # HybridSearchEngine (protocol-based)
│   ├── security.py        # Sanitiser, provenance, trust
│   ├── scope.py           # MemoryScope dataclass + filtering
│   ├── temporal.py        # Decay, bi-temporal queries
│   ├── engine.py          # Orchestrator (uses protocols)
│   ├── client.py          # (deprecated, kept for backward compat)
│   └── schema.py          # SchemaDefinition, node dataclasses
│
├── backends/
│   ├── __init__.py        # create_stores() factory
│   ├── neo4j/
│   │   ├── graph.py       # Neo4jGraphStore
│   │   ├── vector.py      # Neo4jVectorStore
│   │   └── backend.py     # Neo4jBackend (unified, implements both)
│   ├── kuzu/              # future
│   ├── networkx/          # future
│   └── null.py            # NullGraphStore, NullVectorStore
│
├── embeddings/
│   ├── __init__.py        # create_provider() factory
│   ├── ollama.py          # OllamaProvider
│   ├── openai.py          # OpenAIProvider
│   ├── sentence_transformer.py
│   └── null.py            # NullProvider
│
├── skills/                # unchanged — use protocols via engine
├── adapters/              # unchanged — use protocols via engine
└── ...

Migration from current code

The refactoring extracts, not rewrites:

Current Becomes Change
core/engine.py merge logic backends/neo4j/graph.py Extract
core/engine.py fulltext search backends/neo4j/graph.py Extract
core/engine.py orchestration core/engine.py (now uses protocols) Thin
adapters/mcp/server.py Same file, uses engine Minimal
skills/*.py Same files, use engine None

The MCP tools and skills don't change at all — they call engine.* methods, and the engine delegates to the protocols. The only code that moves is the Neo4j-specific Cypher, from engine.py into backends/neo4j/.


Implementation phases

Phase A — Protocols + Neo4j extraction (foundation)

Estimated: 4–6h | No new features, no regressions

  1. Create core/protocols.py with GraphStore, VectorStore, EmbeddingProvider
  2. Create backends/neo4j/backend.py — extract existing Cypher from engine.py
  3. Create backends/null.py — NullGraphStore, NullVectorStore
  4. Create embeddings/null.py — NullProvider
  5. Create backends/__init__.py — factory that reads .env
  6. Refactor core/engine.py to accept protocols via constructor
  7. Update MCP server lifespan to use factory
  8. Run all 100 existing tests — must pass unchanged

Definition of done: All existing tests pass. MCP tools work identically. Zero user-visible change.

Phase B — Embedding providers (#7)

Estimated: 2–3h | Enables vector search

  1. Create embeddings/ollama.py — OllamaProvider
  2. Create embeddings/openai.py — OpenAIProvider (optional)
  3. Create embeddings/sentence_transformer.py (optional)
  4. Create embeddings/__init__.py — factory
  5. Add .env variables: EMBEDDING_PROVIDER, EMBEDDING_MODEL, etc.
  6. Tests: mock Ollama API, verify embed/embed_batch

Phase C — Vector storage + hybrid search (#1)

Estimated: 4–5h | The big feature

  1. Add vector index creation to Neo4j schema init
  2. Implement Neo4jVectorStore.store_vectors() and search_vectors()
  3. Modify engine.merge_node() to embed + store in one call
  4. Create core/search.py — HybridSearchEngine
  5. Update engrama_search MCP tool to use hybrid engine
  6. Update engrama_remember to embed on write
  7. CLI: engrama reindex — batch re-embed all nodes
  8. Tests: hybrid scoring, graceful degradation, fulltext fallback

Phase D — Temporal reasoning (#3)

Estimated: 3–4h

  1. Add valid_from, valid_to, confidence to merge_node
  2. Implement decay_scores() in Neo4j backend
  3. CLI: engrama decay --rate 0.01 --max-age 90
  4. Modify recall/search to factor confidence into scoring
  5. Conflict detection in remember (flag when valid_to is set)
  6. Tests: decay calculation, temporal filtering

Phase E — Security hardening (#4)

Estimated: 3–4h

  1. Create core/security.py — Sanitiser class
  2. Add provenance fields to merge_node
  3. Add trust_level to scoring formula
  4. Input validation on all MCP tool inputs
  5. Tests: injection attempts, provenance tracking

Phase F — Multi-scope (#6)

Estimated: 2–3h

  1. Create core/scope.py — MemoryScope dataclass
  2. Add scope fields to merge_node
  3. Add scope filtering to all query methods
  4. MCP: optional scope parameters on tools
  5. Tests: scope isolation, hierarchy resolution

Phase G — Benchmarks (#5)

Estimated: 3–4h

  1. Benchmark harness: load LOCOMO/LongMemEval data
  2. Run queries through hybrid search engine
  3. Score and publish results in docs/benchmarks/
  4. Iterate on α, β, γ parameters based on results

Total estimated: 22–29 hours across all phases


.env reference (complete)

# === Storage backends ===
GRAPH_BACKEND=neo4j
VECTOR_BACKEND=neo4j

# === Neo4j ===
NEO4J_URI=bolt://localhost:7687
NEO4J_USER=neo4j
NEO4J_PASSWORD=CHANGE_ME_BEFORE_FIRST_RUN

# === Embeddings ===
EMBEDDING_PROVIDER=none
EMBEDDING_MODEL=nomic-embed-text
EMBEDDING_DIMENSIONS=768
OLLAMA_URL=http://localhost:11434

# === Hybrid search ===
HYBRID_ALPHA=0.6
HYBRID_GRAPH_BETA=0.15
HYBRID_TRUST_GAMMA=0.1

# === Obsidian ===
VAULT_PATH=

# === Temporal ===
DECAY_RATE=0.01
DECAY_MAX_AGE_DAYS=90

# === Scope (v1: single user, leave empty) ===
DEFAULT_USER_ID=
DEFAULT_ORG_ID=

Consequences

Positive

  • Build once, extend forever. New backends (Kuzu, PostgreSQL+AGE) implement the protocols without touching skills, adapters, or MCP tools.
  • No rework. Hybrid search, temporal, security, and multi-scope all compose on the same protocol layer. Each phase adds, never refactors.
  • Neo4j risk mitigated. If Neo4j's licensing changes or a lighter alternative is needed, swap the backend — the rest of Engrama is unaffected.
  • Zero breaking changes. Default configuration reproduces today's exact behaviour. Every new feature is opt-in via .env.
  • Testable in isolation. Each protocol implementation can be tested independently. NullStore enables pure unit tests without any database.

Negative

  • Abstraction tax. One extra layer of indirection between skills and storage. Mitigated: the protocols are thin (~200 lines total), and the Neo4j implementation wraps the exact same Cypher we have today.
  • Phase A produces zero new features. The extraction is invisible to users. Necessary investment, but delivers no immediate user value.
  • Not all backends will be equal. NetworkX can't run Cypher; Kuzu has different query syntax. The run_cypher() method is backend- specific and may raise NotImplementedError. Reflect patterns that depend on complex Cypher will need per-backend translations — or will only work on Cypher-capable backends.
  • Embedding model lock-in within a graph. Changing the embedding model requires re-indexing all nodes. Mitigated by engrama reindex.

References

  • DDR-001: Faceted classification system
  • DDR-002: Bidirectional sync and vault portability
  • Neo4j vector indexes: https://neo4j.com/docs/cypher-manual/5/indexes/semantic-indexes/vector-indexes/
  • nomic-embed-text: https://ollama.com/library/nomic-embed-text
  • OWASP Agentic AI Top 10: https://genaisecurityproject.com
  • Mem0 LOCOMO benchmark: https://mem0.ai/research
  • Zep temporal architecture: https://arxiv.org/abs/2501.13956