DDR-003 — Protocol-based architecture for Engrama v1.0¶
Version: 0.2.0 | Date: 2026-04-12 | Status: Proposed Supersedes: DDR-003 v0.1.0 (Neo4j-only hybrid search)
Context¶
The DAFO analysis identified seven features Engrama needs to compete. Implementing them independently creates rework: hybrid search (#1) hard-wired to Neo4j conflicts with database abstraction (#2); temporal reasoning (#3) touches every query path; security (#4) and multi-scope (#6) cut across all storage operations.
This DDR designs a unified protocol-based architecture where all seven features compose cleanly. The key insight: #2 (database abstraction) is the foundation, not #1 (hybrid search). Every other feature builds on the storage protocols. We define those protocols first, then each feature becomes a layer that plugs in without refactoring what came before.
The seven features and their dependencies¶
#5 Benchmarks ← needs #1, #3
#6 Multi-scope ← needs #2, #4
#4 Security ← needs #2, #3
#1 Hybrid search ← needs #2, #7
#3 Temporal ← needs #2
#7 LLM-agnostic ← independent
#2 DB abstraction ← FOUNDATION
Implementation order: #2 + #7 → #1 + #3 → #4 → #6 → #5
Decision¶
Replace Engrama's direct Neo4j coupling with three protocol interfaces. Every skill, adapter, and MCP tool talks to these protocols — never to a specific database. Neo4j becomes the first (and default) implementation.
Part 1 — Storage protocols (#2)¶
1.1 GraphStore protocol¶
Covers: node CRUD, relationships, fulltext search, pattern queries.
from typing import Protocol, Any, runtime_checkable
from datetime import datetime
@runtime_checkable
class GraphStore(Protocol):
"""Abstract interface for graph storage backends."""
# --- Node operations ---
async def merge_node(
self, label: str, key_field: str, key_value: str,
properties: dict[str, Any],
embedding: list[float] | None = None,
) -> dict[str, Any]:
"""Create or update a node. Always MERGE semantics."""
...
async def get_node(
self, label: str, key_field: str, key_value: str
) -> dict[str, Any] | None:
"""Retrieve a single node by its unique key."""
...
async def delete_node(
self, label: str, key_field: str, key_value: str,
soft: bool = True,
) -> bool:
"""Delete or archive a node. soft=True sets status='archived'."""
...
# --- Relationship operations ---
async def merge_relation(
self, from_label: str, from_key: str, from_value: str,
rel_type: str,
to_label: str, to_key: str, to_value: str,
) -> dict[str, Any]:
"""Create a relationship (idempotent)."""
...
# --- Query operations ---
async def get_neighbours(
self, label: str, key_field: str, key_value: str,
hops: int = 1, limit: int = 50,
) -> list[dict[str, Any]]:
"""Traverse N hops from a node."""
...
async def fulltext_search(
self, query: str, limit: int = 10,
) -> list[dict[str, Any]]:
"""Keyword search across all text properties.
Returns: [{node, score, label, key}]"""
...
async def run_cypher(
self, query: str, params: dict[str, Any] | None = None,
) -> list[dict[str, Any]]:
"""Execute a raw query (backend-specific).
For reflect patterns that need full query power.
Backends that don't support Cypher raise NotImplementedError."""
...
# --- Schema operations ---
async def init_schema(self, schema: "SchemaDefinition") -> None:
"""Apply constraints, indexes, and seed data."""
...
async def health_check(self) -> dict[str, Any]:
"""Return backend status and version info."""
...
# --- Temporal operations (#3) ---
async def get_node_history(
self, label: str, key_field: str, key_value: str,
) -> list[dict[str, Any]]:
"""Return the temporal history of a node's property changes.
Each entry: {properties, valid_from, valid_to, ingested_at}"""
...
async def decay_scores(
self, max_age_days: int = 90, decay_rate: float = 0.01,
) -> int:
"""Apply confidence decay to stale nodes.
Returns count of nodes affected."""
...
# --- Scope operations (#6) ---
async def set_scope(
self, scope: "MemoryScope",
) -> None:
"""Set the active scope for all subsequent operations.
Scope filters are applied automatically to every query."""
...
1.2 VectorStore protocol¶
Covers: embedding storage, similarity search. May be the same backend as GraphStore (Neo4j) or a separate one (ChromaDB, pgvector).
@runtime_checkable
class VectorStore(Protocol):
"""Abstract interface for vector similarity search."""
dimensions: int
async def store_vectors(
self, items: list[tuple[str, list[float]]],
) -> int:
"""Store embeddings for nodes. items: [(node_id, embedding)].
Returns count stored."""
...
async def search_vectors(
self, query_embedding: list[float],
limit: int = 10,
scope: "MemoryScope | None" = None,
) -> list[dict[str, Any]]:
"""k-ANN similarity search.
Returns: [{node_id, score, label, key}]"""
...
async def delete_vectors(
self, node_ids: list[str],
) -> int:
"""Remove embeddings for deleted/archived nodes."""
...
async def count(self) -> int:
"""Total vectors stored."""
...
1.3 Why two protocols, not one¶
Some backends implement both (Neo4j has native graph + native vectors). Others don't (NetworkX has no vector index; ChromaDB has no graph). Keeping them separate lets us mix backends:
| Combination | GraphStore | VectorStore | Use case |
|---|---|---|---|
| Neo4j only | Neo4jGraphStore | Neo4jVectorStore | Default, simplest |
| Neo4j + Chroma | Neo4jGraphStore | ChromaVectorStore | Better vector perf |
| Kuzu + Chroma | KuzuGraphStore | ChromaVectorStore | Embedded, no Docker |
| NetworkX + None | NetworkXGraphStore | NullVectorStore | Zero-dep prototyping |
| PG+AGE + pgvector | PgGraphStore | PgVectorStore | Single Postgres |
1.4 Neo4j implementation (first backend)¶
The Neo4j adapter implements both protocols. It wraps the exact same
Cypher queries that exist today in server.py — zero logic change,
just extraction behind the interface.
class Neo4jBackend:
"""Implements both GraphStore and VectorStore using Neo4j."""
def __init__(self, driver: AsyncDriver, config: dict):
self.driver = driver
self.config = config
self._scope: MemoryScope | None = None
# --- GraphStore ---
async def merge_node(self, label, key_field, key_value,
properties, embedding=None):
# Same MERGE Cypher as current engine.py
# + stores embedding property if provided
...
async def fulltext_search(self, query, limit=10):
# Same db.index.fulltext.queryNodes('memory_search', ...)
...
# --- VectorStore ---
async def search_vectors(self, query_embedding, limit=10, scope=None):
# CALL db.index.vector.queryNodes('memory_vectors', $k, $emb)
...
async def store_vectors(self, items):
# SET n.embedding = $embedding (already on the same node)
...
1.5 Configuration¶
# Storage backend
GRAPH_BACKEND=neo4j # neo4j | kuzu | networkx | postgres
VECTOR_BACKEND=neo4j # neo4j | chroma | pgvector | none
# Neo4j (when GRAPH_BACKEND=neo4j or VECTOR_BACKEND=neo4j)
NEO4J_URI=bolt://localhost:7687
NEO4J_USER=neo4j
NEO4J_PASSWORD=changeme
# ChromaDB (when VECTOR_BACKEND=chroma)
CHROMA_PATH=./chroma_data # local persistent directory
CHROMA_COLLECTION=engrama
# Kuzu (when GRAPH_BACKEND=kuzu)
KUZU_PATH=./kuzu_data
The factory reads .env and returns the correct implementations:
def create_stores(config: dict) -> tuple[GraphStore, VectorStore]:
graph = _create_graph_store(config)
vector = _create_vector_store(config, graph)
return graph, vector
When VECTOR_BACKEND matches GRAPH_BACKEND (e.g., both neo4j),
the factory returns the same object for both — no wasted connections.
Part 2 — Embedding provider (#7)¶
Decoupled from any LLM vendor. Three implementations at launch.
@runtime_checkable
class EmbeddingProvider(Protocol):
dimensions: int
async def embed(self, text: str) -> list[float]: ...
async def embed_batch(self, texts: list[str]) -> list[list[float]]: ...
async def health_check(self) -> bool: ...
| Provider | Model | Dims | Local | Cost |
|---|---|---|---|---|
OllamaProvider |
nomic-embed-text | 768 | yes | free |
OllamaProvider |
nomic-embed-text-v2-moe | 768 | yes | free, multilingual |
OpenAIProvider |
text-embedding-3-small | 1536 | no | $0.02/1M tokens |
SentenceTransformerProvider |
all-MiniLM-L6-v2 | 384 | yes | free, no Ollama |
NullProvider |
— | 0 | — | — |
EMBEDDING_PROVIDER=ollama # ollama | openai | sentence_transformer | none
EMBEDDING_MODEL=nomic-embed-text
EMBEDDING_DIMENSIONS=768
OLLAMA_URL=http://localhost:11434
Text representation for embedding¶
Every node gets embedded from its concatenated text properties:
def node_to_text(label: str, props: dict) -> str:
parts = [f"{label}:"]
parts.append(props.get("name") or props.get("title", ""))
for field in ("description", "notes", "rationale",
"solution", "context", "body"):
if value := props.get(field):
parts.append(value)
return " ".join(parts)
Part 3 — Hybrid search engine (#1)¶
The search engine lives in core/search.py. It talks only to the
protocols — zero database-specific code.
Algorithm¶
query ──► EmbeddingProvider.embed(query)
│
┌─────────┴──────────┐
▼ ▼
VectorStore GraphStore
.search_vectors() .fulltext_search()
│ │
└─────────┬──────────┘
▼
merge by node_id
normalize scores to [0,1]
│
▼
final = α·v_score + (1-α)·f_score + β·graph_boost
│
▼
optional: GraphStore.get_neighbours()
for top-K results (1-hop expansion)
│
▼
ranked results
Scoring¶
@dataclass
class HybridConfig:
alpha: float = 0.6 # vector weight
graph_beta: float = 0.15 # graph boost weight
boost_cap: float = 0.3 # max graph boost per node
vector_k: int = 20 # candidates from vector search
fulltext_k: int = 20 # candidates from fulltext search
class HybridSearchEngine:
def __init__(self, graph: GraphStore, vector: VectorStore,
embedder: EmbeddingProvider, config: HybridConfig):
self.graph = graph
self.vector = vector
self.embedder = embedder
self.config = config
async def search(self, query: str, limit: int = 10) -> list[SearchResult]:
# 1. Embed query
query_vec = await self.embedder.embed(query)
# 2. Parallel search (or sequential if same backend)
v_results = await self.vector.search_vectors(
query_vec, limit=self.config.vector_k
) if query_vec else []
f_results = await self.graph.fulltext_search(
query, limit=self.config.fulltext_k
)
# 3. Merge by node identity
merged = self._merge(v_results, f_results)
# 4. Score
for r in merged:
r.final_score = (
self.config.alpha * r.vector_score
+ (1 - self.config.alpha) * r.fulltext_score
+ self.config.graph_beta * r.graph_boost
)
# 5. Rank and return
merged.sort(key=lambda r: r.final_score, reverse=True)
return merged[:limit]
Graceful degradation¶
| Scenario | Behaviour |
|---|---|
EMBEDDING_PROVIDER=none |
α forced to 0.0, fulltext only |
VECTOR_BACKEND=none |
Same as above |
| Ollama not running | Fallback to fulltext + warning |
| Node has no embedding | Appears in fulltext results only |
| No fulltext index (Kuzu) | α forced to 1.0, vector only |
Zero breaking changes. Current behaviour (fulltext-only on Neo4j) is the default configuration.
Part 4 — Temporal reasoning (#3)¶
Bi-temporal model¶
Every node carries four timestamps:
| Field | Meaning | Set by |
|---|---|---|
created_at |
When the node was first created | engine (exists today) |
updated_at |
Last modification time | engine (exists today) |
valid_from |
When the fact became true in the real world | user/agent (new) |
valid_to |
When the fact stopped being true | user/agent (new) |
valid_from and valid_to enable temporal queries: "What technologies
was the project using in January?" — filter by valid_from <= date AND
(valid_to IS NULL OR valid_to >= date).
Confidence decay¶
Nodes get a confidence property (float, 0.0–1.0, default 1.0).
Decay runs on GraphStore.decay_scores():
confidence = initial × exp(-decay_rate × days_since_updated)
The reflect skill already uses confidence scores on Insight nodes. Extending this to all nodes lets search results prioritize recent, actively-maintained knowledge over stale entries.
TTL and lifecycle¶
The existing ForgetSkill already supports forget_by_ttl() with
soft-delete (archive) and hard-delete (purge). Temporal reasoning
extends this with:
- Auto-decay on a schedule (CLI:
engrama decay --rate 0.01) - Conflict detection: when
engrama_rememberupdates a node whosevalid_tois already set, flag it for review rather than silently overwriting
Schema additions¶
# Added to all nodes via GraphStore.merge_node()
temporal_fields = {
"valid_from": "datetime | None", # when fact became true
"valid_to": "datetime | None", # when fact stopped being true
"confidence": "float", # 0.0–1.0, decays over time
}
No new indexes needed — valid_from and valid_to are filtered in
queries, not searched. The existing updated_at range index covers
the decay calculation.
Part 5 — Memory security (#4)¶
Threat model¶
The OWASP Top 10 for Agentic Applications (Dec 2025) classifies memory poisoning as ASI06. The MINJA attack achieves >95% injection success rate. For Engrama — targeting security professionals — this is reputationally critical.
Defence layers¶
Layer 1 — Input sanitisation (in the engine, above the stores):
class Sanitiser:
"""Validates and cleans all inputs before they reach storage."""
def sanitise_properties(self, props: dict) -> dict:
"""Strip injection attempts from property values."""
...
def validate_label(self, label: str) -> str:
"""Whitelist-check against schema labels."""
...
def validate_relation(self, rel_type: str) -> str:
"""Whitelist-check against schema relation types."""
...
Layer 2 — Provenance tracking (metadata on every write):
provenance_fields = {
"source": "str", # "mcp" | "sdk" | "cli" | "sync"
"source_agent": "str", # which agent wrote this
"source_session": "str", # session identifier
"trust_level": "float", # 0.0–1.0, based on source
}
Layer 3 — Trust-aware retrieval (in the search engine):
Search results are weighted by trust level. Nodes written by verified sources (vault sync, CLI) get higher trust than those from agent conversations. The hybrid score formula extends:
final = α·vector + (1-α)·fulltext + β·graph_boost + γ·trust_level
Where γ = 0.1 by default. This means a low-trust node needs higher
semantic/keyword relevance to rank above a high-trust node.
Layer 4 — Scope isolation (see Part 6).
Part 6 — Multi-scope memory (#6)¶
Scope model¶
@dataclass
class MemoryScope:
user_id: str | None = None # whose memory
agent_id: str | None = None # which agent
session_id: str | None = None # which conversation
org_id: str | None = None # which organisation
When a scope is set via GraphStore.set_scope(), every query
automatically filters by the scope fields. Nodes created in a scope
carry those scope fields as properties.
Scope hierarchy¶
org_id (broadest)
└── user_id
└── agent_id
└── session_id (narrowest)
A query with user_id="alice" sees:
- All nodes with user_id="alice" (her personal memory)
- All nodes with org_id="acme" and no user_id (shared org memory)
- All nodes with no scope fields (global/public memory)
Implementation¶
For Neo4j, scopes become property filters on every MATCH clause.
The GraphStore.set_scope() method stores the active scope, and
merge_node() / fulltext_search() / search_vectors() all
apply it automatically.
For v1, Engrama remains single-user (scope fields exist but default to None). Multi-user support is a configuration change, not a code change.
Part 7 — Benchmarks (#5)¶
Target benchmarks¶
| Benchmark | What it measures | Leader (2026) |
|---|---|---|
| LOCOMO | Long conversation memory (1,986 questions) | MemMachine 91.7% |
| LongMemEval | Long-term memory evaluation (500 questions) | Mem0 93.0% |
What we need before benchmarking¶
- Hybrid search (#1) — LOCOMO heavily tests semantic recall
- Temporal reasoning (#3) — LongMemEval tests temporal questions
- A benchmark harness that loads test data, runs queries, scores results
Realistic targets¶
With hybrid search (graph+vector+fulltext), Engrama should target: - LOCOMO: 70–80% (competitive, not leading) - LongMemEval: 75–85% (graph boost helps temporal questions)
Even modest scores published transparently establish credibility. The graph boost term is Engrama's structural advantage — no competitor uses graph topology as a ranking signal.
Revised directory structure¶
engrama/
├── core/
│ ├── protocols.py # GraphStore, VectorStore, EmbeddingProvider
│ ├── search.py # HybridSearchEngine (protocol-based)
│ ├── security.py # Sanitiser, provenance, trust
│ ├── scope.py # MemoryScope dataclass + filtering
│ ├── temporal.py # Decay, bi-temporal queries
│ ├── engine.py # Orchestrator (uses protocols)
│ ├── client.py # (deprecated, kept for backward compat)
│ └── schema.py # SchemaDefinition, node dataclasses
│
├── backends/
│ ├── __init__.py # create_stores() factory
│ ├── neo4j/
│ │ ├── graph.py # Neo4jGraphStore
│ │ ├── vector.py # Neo4jVectorStore
│ │ └── backend.py # Neo4jBackend (unified, implements both)
│ ├── kuzu/ # future
│ ├── networkx/ # future
│ └── null.py # NullGraphStore, NullVectorStore
│
├── embeddings/
│ ├── __init__.py # create_provider() factory
│ ├── ollama.py # OllamaProvider
│ ├── openai.py # OpenAIProvider
│ ├── sentence_transformer.py
│ └── null.py # NullProvider
│
├── skills/ # unchanged — use protocols via engine
├── adapters/ # unchanged — use protocols via engine
└── ...
Migration from current code¶
The refactoring extracts, not rewrites:
| Current | Becomes | Change |
|---|---|---|
core/engine.py merge logic |
backends/neo4j/graph.py |
Extract |
core/engine.py fulltext search |
backends/neo4j/graph.py |
Extract |
core/engine.py orchestration |
core/engine.py (now uses protocols) |
Thin |
adapters/mcp/server.py |
Same file, uses engine | Minimal |
skills/*.py |
Same files, use engine | None |
The MCP tools and skills don't change at all — they call engine.*
methods, and the engine delegates to the protocols. The only code that
moves is the Neo4j-specific Cypher, from engine.py into
backends/neo4j/.
Implementation phases¶
Phase A — Protocols + Neo4j extraction (foundation)¶
Estimated: 4–6h | No new features, no regressions
- Create
core/protocols.pywith GraphStore, VectorStore, EmbeddingProvider - Create
backends/neo4j/backend.py— extract existing Cypher from engine.py - Create
backends/null.py— NullGraphStore, NullVectorStore - Create
embeddings/null.py— NullProvider - Create
backends/__init__.py— factory that reads .env - Refactor
core/engine.pyto accept protocols via constructor - Update MCP server lifespan to use factory
- Run all 100 existing tests — must pass unchanged
Definition of done: All existing tests pass. MCP tools work identically. Zero user-visible change.
Phase B — Embedding providers (#7)¶
Estimated: 2–3h | Enables vector search
- Create
embeddings/ollama.py— OllamaProvider - Create
embeddings/openai.py— OpenAIProvider (optional) - Create
embeddings/sentence_transformer.py(optional) - Create
embeddings/__init__.py— factory - Add .env variables: EMBEDDING_PROVIDER, EMBEDDING_MODEL, etc.
- Tests: mock Ollama API, verify embed/embed_batch
Phase C — Vector storage + hybrid search (#1)¶
Estimated: 4–5h | The big feature
- Add vector index creation to Neo4j schema init
- Implement
Neo4jVectorStore.store_vectors()andsearch_vectors() - Modify
engine.merge_node()to embed + store in one call - Create
core/search.py— HybridSearchEngine - Update
engrama_searchMCP tool to use hybrid engine - Update
engrama_rememberto embed on write - CLI:
engrama reindex— batch re-embed all nodes - Tests: hybrid scoring, graceful degradation, fulltext fallback
Phase D — Temporal reasoning (#3)¶
Estimated: 3–4h
- Add
valid_from,valid_to,confidenceto merge_node - Implement
decay_scores()in Neo4j backend - CLI:
engrama decay --rate 0.01 --max-age 90 - Modify recall/search to factor confidence into scoring
- Conflict detection in remember (flag when valid_to is set)
- Tests: decay calculation, temporal filtering
Phase E — Security hardening (#4)¶
Estimated: 3–4h
- Create
core/security.py— Sanitiser class - Add provenance fields to merge_node
- Add trust_level to scoring formula
- Input validation on all MCP tool inputs
- Tests: injection attempts, provenance tracking
Phase F — Multi-scope (#6)¶
Estimated: 2–3h
- Create
core/scope.py— MemoryScope dataclass - Add scope fields to merge_node
- Add scope filtering to all query methods
- MCP: optional scope parameters on tools
- Tests: scope isolation, hierarchy resolution
Phase G — Benchmarks (#5)¶
Estimated: 3–4h
- Benchmark harness: load LOCOMO/LongMemEval data
- Run queries through hybrid search engine
- Score and publish results in
docs/benchmarks/ - Iterate on α, β, γ parameters based on results
Total estimated: 22–29 hours across all phases
.env reference (complete)¶
# === Storage backends ===
GRAPH_BACKEND=neo4j
VECTOR_BACKEND=neo4j
# === Neo4j ===
NEO4J_URI=bolt://localhost:7687
NEO4J_USER=neo4j
NEO4J_PASSWORD=CHANGE_ME_BEFORE_FIRST_RUN
# === Embeddings ===
EMBEDDING_PROVIDER=none
EMBEDDING_MODEL=nomic-embed-text
EMBEDDING_DIMENSIONS=768
OLLAMA_URL=http://localhost:11434
# === Hybrid search ===
HYBRID_ALPHA=0.6
HYBRID_GRAPH_BETA=0.15
HYBRID_TRUST_GAMMA=0.1
# === Obsidian ===
VAULT_PATH=
# === Temporal ===
DECAY_RATE=0.01
DECAY_MAX_AGE_DAYS=90
# === Scope (v1: single user, leave empty) ===
DEFAULT_USER_ID=
DEFAULT_ORG_ID=
Consequences¶
Positive¶
- Build once, extend forever. New backends (Kuzu, PostgreSQL+AGE) implement the protocols without touching skills, adapters, or MCP tools.
- No rework. Hybrid search, temporal, security, and multi-scope all compose on the same protocol layer. Each phase adds, never refactors.
- Neo4j risk mitigated. If Neo4j's licensing changes or a lighter alternative is needed, swap the backend — the rest of Engrama is unaffected.
- Zero breaking changes. Default configuration reproduces today's exact behaviour. Every new feature is opt-in via .env.
- Testable in isolation. Each protocol implementation can be tested independently. NullStore enables pure unit tests without any database.
Negative¶
- Abstraction tax. One extra layer of indirection between skills and storage. Mitigated: the protocols are thin (~200 lines total), and the Neo4j implementation wraps the exact same Cypher we have today.
- Phase A produces zero new features. The extraction is invisible to users. Necessary investment, but delivers no immediate user value.
- Not all backends will be equal. NetworkX can't run Cypher; Kuzu
has different query syntax. The
run_cypher()method is backend- specific and may raise NotImplementedError. Reflect patterns that depend on complex Cypher will need per-backend translations — or will only work on Cypher-capable backends. - Embedding model lock-in within a graph. Changing the embedding
model requires re-indexing all nodes. Mitigated by
engrama reindex.
References¶
- DDR-001: Faceted classification system
- DDR-002: Bidirectional sync and vault portability
- Neo4j vector indexes: https://neo4j.com/docs/cypher-manual/5/indexes/semantic-indexes/vector-indexes/
- nomic-embed-text: https://ollama.com/library/nomic-embed-text
- OWASP Agentic AI Top 10: https://genaisecurityproject.com
- Mem0 LOCOMO benchmark: https://mem0.ai/research
- Zep temporal architecture: https://arxiv.org/abs/2501.13956