Building the AI Memory Stack: Layered Storage, Async Extraction and Atomic Persistence

Every AI agent you build today can hold a conversation. It can reason, use tools, and chain together complex workflows. But the moment a session ends, everything disappears. The agent forgets who you are, what you were working on, and every preference it learned during the conversation.

This is not a minor inconvenience. It is the single biggest gap between demo agents and production agents. A customer support agent that forgets your previous tickets. A coding assistant that re-asks your framework preferences every session. A financial analyst agent that cannot recall the portfolio it analyzed yesterday. Without memory, agents are perpetual strangers.

In this article, you will build a complete production memory system for AI agents. Not a toy demo that stores a few strings in a dictionary, but a real architecture with layered storage, async processing, confidence scoring, token management, and crash-safe writes. By the end, you will have a memory stack that is fast, reliable, cost efficient, and useful across sessions.

Here is how the system works at a high level:

  1. Separate memory into three distinct layers: user context, conversation history, and discrete facts
  2. Extract memories asynchronously in a background thread so the user never waits
  3. Batch memory updates using a debounce queue to reduce LLM calls
  4. Filter extracted facts by confidence score to keep only reliable information
  5. Cap memory injection at 2,000 tokens so the agent prompt never gets overloaded
  6. Write all memory files atomically so concurrent reads never see corrupted state

Table of Contents

Phase 1: Layered Memory Architecture — Designing the three-tier data model for user context, history, and facts

Phase 2: Async Background Extraction — Running memory updates in a background thread without blocking conversations

Phase 3: Debounce Queue — Batching multiple messages into a single LLM extraction call

Phase 4: Confidence-Based Filtering — Scoring and pruning facts to keep memory clean

Phase 5: Token-Capped Prompt Injection — Fitting the most reliable context into a fixed prompt budget

Phase 6: Atomic File Writes — Ensuring crash-safe persistence with rename-based atomicity

Phase 1: Layered Memory Architecture

The foundation of a production memory system is separating what you store into distinct layers. If you dump everything into a single flat list, your agent drowns in noise. The key insight is that different types of information serve different purposes and change at different rates.

LangChain’s LangMem framework identifies three core memory types in their conceptual guide: semantic memory (facts and knowledge), episodic memory (past experiences), and procedural memory (behavioral rules). Our architecture maps to these categories with a practical, implementation-focused twist.

Before we build anything, let’s understand the three memory layers:

  1. User Context: A short, structured summary of who the user is and what they care about right now. This is semantic memory, storing current facts about the user.
  2. Conversation History: A tiered record of recent and past interactions. This is episodic memory, capturing what happened and when.
  3. Discrete Facts: Individual pieces of information extracted from conversations, each tagged with metadata. This is a granular extension of semantic memory with confidence scoring.

Defining the User Context Layer

The user context layer stores three types of summaries, each one to three sentences long. These summaries get updated over time as the agent learns more about the user.

from dataclasses import dataclass, field
from typing import Optional
from datetime import datetime

@dataclass
class UserContext:
# What the user does professionally and what projects they work on
work_context: str = ""
# Personal preferences, communication style, background
personal_context: str = ""
# Whatever the user is currently focused on right now
top_of_mind: str = ""
# When this context was last updated
last_updated: Optional[datetime] = None

n the code above, we define a simple UserContext dataclass with three string fields. The work_context captures professional details like "Senior engineer at a fintech startup, building a RAG pipeline for SEC filings." The personal_context stores preferences like "Prefers concise answers, uses Python 3.11, familiar with LangChain." The top_of_mind tracks the user's current focus, like "Debugging a retrieval accuracy issue in the embedding pipeline." Each field is a one-to-three sentence summary, not a full history.

Why three separate context fields? Mixing work context with personal preferences creates messy, unfocused prompts. By separating them, the agent can selectively inject only what is relevant. A coding question needs work context. A tone adjustment needs personal context. A follow-up on yesterday’s task needs top of mind.

Defining the History Layer

The history layer organizes past interactions into time-based tiers. Recent history gets more detail. Older history gets compressed.

@dataclass
class ConversationHistory:
# Detailed context from the last 30 days
recent_months: str = ""
# Compressed summary of interactions from 1-6 months ago
earlier_context: str = ""
# High-level background from more than 6 months ago
long_term_background: str = ""
last_updated: Optional[datetime] = None

This three-tier structure mirrors how human memory works. You remember yesterday’s conversation in detail, last month’s work in broad strokes, and last year’s projects as general themes. The recent_months field might contain "User has been debugging retrieval accuracy for two weeks. Tried BM25, switched to hybrid search, saw 15% improvement." The long_term_background compresses to "User has been building RAG systems since early 2025."

Defining the Facts Layer

The facts layer is where things get interesting. Instead of free-form text, each fact is a structured object with metadata.

@dataclass
class MemoryFact:
# The actual piece of information
content: str = ""
# Category tag: "preference", "project", "technical", "personal"
category: str = ""
# How confident we are this fact is accurate (0.0 to 1.0)
confidence: float = 0.0
# When this fact was extracted from conversation
extracted_at: Optional[datetime] = None

@dataclass
class FactStore:
facts: list = field(default_factory=list)
# Hard cap on total facts stored
max_facts: int = 100

Each MemoryFact has a confidence score between 0.0 and 1.0. A fact like "User's name is Sarah" extracted from "Hi, I'm Sarah" gets a high confidence of 0.95. A fact like "User might prefer PostgreSQL over MySQL" inferred from a vague comment gets a lower confidence of 0.6. The category field enables filtering, so if the agent needs technical context it can prioritize "technical" facts over "personal" ones.

Key insight: The confidence score is what separates a production memory system from a toy one. Without it, your agent treats a guess the same as a confirmed fact. OpenAI’s context engineering cookbook calls this “belief updates instead of fact accumulation,” where memory evolves based on evidence quality rather than just appending everything.

Assembling the Complete Memory Model

Now let’s combine all three layers into a single memory container.

import json
from datetime import datetime

@dataclass
class AgentMemory:
# Layer 1: Who the user is right now
user_context: UserContext = field(default_factory=UserContext)
# Layer 2: What has happened over time
history: ConversationHistory = field(default_factory=ConversationHistory)
# Layer 3: Individual facts with confidence scores
facts: FactStore = field(default_factory=FactStore)

def to_dict(self) -> dict:
"""Serialize the entire memory state to a dictionary."""
return {
"user_context": {
"work_context": self.user_context.work_context,
"personal_context": self.user_context.personal_context,
"top_of_mind": self.user_context.top_of_mind,
"last_updated": self.user_context.last_updated.isoformat()
if self.user_context.last_updated else None,
},
"history": {
"recent_months": self.history.recent_months,
"earlier_context": self.history.earlier_context,
"long_term_background": self.history.long_term_background,
"last_updated": self.history.last_updated.isoformat()
if self.history.last_updated else None,
},
"facts": [
{
"content": f.content,
"category": f.category,
"confidence": f.confidence,
"extracted_at": f.extracted_at.isoformat()
if f.extracted_at else None,
}
for f in self.facts.facts
],
}

@classmethod
def from_dict(cls, data: dict) -> "AgentMemory":
"""Deserialize memory from a dictionary."""
memory = cls()
uc = data.get("user_context", {})
memory.user_context = UserContext(
work_context=uc.get("work_context", ""),
personal_context=uc.get("personal_context", ""),
top_of_mind=uc.get("top_of_mind", ""),
last_updated=datetime.fromisoformat(uc["last_updated"])
if uc.get("last_updated") else None,
)
hist = data.get("history", {})
memory.history = ConversationHistory(
recent_months=hist.get("recent_months", ""),
earlier_context=hist.get("earlier_context", ""),
long_term_background=hist.get("long_term_background", ""),
last_updated=datetime.fromisoformat(hist["last_updated"])
if hist.get("last_updated") else None,
)
for f in data.get("facts", []):
memory.facts.facts.append(
MemoryFact(
content=f["content"],
category=f["category"],
confidence=f["confidence"],
extracted_at=datetime.fromisoformat(f["extracted_at"])
if f.get("extracted_at") else None,
)
)
return memory

The AgentMemory class combines all three layers and provides serialization methods. The to_dict() method converts the entire memory state to a JSON-compatible dictionary for storage. The from_dict() class method reconstructs memory from stored data. This pattern of separating the in-memory model from the storage format is standard practice in production systems.

Expected output (after creating and serializing a memory object):

{
"user_context": {
"work_context": "Senior engineer building RAG pipelines at a fintech startup",
"personal_context": "Prefers Python, familiar with LangChain, likes concise answers",
"top_of_mind": "Debugging embedding accuracy for SEC filing retrieval",
"last_updated": "2026-04-07T10:30:00"
},
"history": {
"recent_months": "User spent two weeks on retrieval tuning. Tried BM25, moved to hybrid.",
"earlier_context": "Built initial RAG prototype in January 2026.",
"long_term_background": "Has been working with LLMs since mid-2024."
},
"facts": [
{
"content": "Uses Python 3.11 with LangChain 0.3",
"category": "technical",
"confidence": 0.92,
"extracted_at": "2026-04-06T14:22:00"
}
]
}

Let’s test this by creating a memory object and verifying the round-trip serialization.

# Create a memory instance and populate it
memory = AgentMemory()
memory.user_context.work_context = "Senior engineer building RAG pipelines"
memory.user_context.top_of_mind = "Debugging embedding accuracy"
memory.user_context.last_updated = datetime.now()

# Add a fact
memory.facts.facts.append(
MemoryFact(
content="Uses Python 3.11 with LangChain 0.3",
category="technical",
confidence=0.92,
extracted_at=datetime.now(),
)
)

# Serialize and deserialize
data = memory.to_dict()
restored = AgentMemory.from_dict(data)
print(f"Work context preserved: {restored.user_context.work_context}")
print(f"Facts preserved: {len(restored.facts.facts)}")
print(f"Confidence preserved: {restored.facts.facts[0].confidence}")

Expected output:

Work context preserved: Senior engineer building RAG pipelines
Facts preserved: 1
Confidence preserved: 0.92

How this maps to LangGraph: In LangGraph, this memory model would live as fields in your TypedDict state. The user_context and history fields map to what LangGraph’s Store calls “profiles,” which are single documents with strict schemas that get updated in place. The facts list maps to what LangGraph calls “collections,” which are individual documents searchable at runtime. LangGraph’s built-in MemorySaver handles thread-level conversation state, while the Store interface handles cross-thread persistent memory like what we are building here.

This concludes Phase 1. We now have a structured three-layer memory model that separates user context, conversation history, and confidence-scored facts. In Phase 2, we will give this memory a heartbeat by building the extraction engine that fills it, running quietly in the background while the conversation flows uninterrupted.

Phase 2: Async Background Memory Extraction

With our memory model defined, we need a way to populate it. Memory extraction requires an LLM call to analyze conversation content and pull out relevant facts, context updates, and history summaries. This is the most expensive operation in the entire memory pipeline.

The critical design decision here is: never block the main conversation. If the user sends a message and your agent pauses for two extra seconds while it updates memory, the experience feels sluggish. The user does not care about memory updates. They care about getting a response.

Anthropic’s research on long-running Claude agents demonstrates this principle. Their agents use a CHANGELOG.md file as portable long-term memory that tracks progress, completed tasks, and failed approaches. Critically, this memory update happens as part of the commit workflow, not during the active problem-solving loop. The conversation flow stays fast while memory catches up asynchronously.

LangChain’s LangMem framework formalizes this as two memory formation patterns:

  1. Hot path (conscious formation): Active memory extraction during conversations. Enables immediate updates when critical context emerges, but adds perceptible latency.
  2. Background path (subconscious formation): An LLM reflects on a conversation after it occurs, finding patterns and extracting insights without slowing down immediate interaction.

We are going to build the background path. It is the only viable option for production.

Building the Extraction Prompt

First, we need a prompt that instructs the LLM to extract structured memory from a conversation.

EXTRACTION_PROMPT = """Analyze the following conversation and extract memory updates.

Return a JSON object with these fields:

1. "user_context_updates": object with optional fields:
- "work_context": one sentence about the user's professional context (if mentioned)
- "personal_context": one sentence about preferences or background (if mentioned)
- "top_of_mind": one sentence about their current focus (if apparent)

2. "facts": list of objects, each with:
- "content": the fact as a clear, standalone sentence
- "category": one of "preference", "project", "technical", "personal"
- "confidence": float between 0.0 and 1.0 indicating how certain this fact is

Only extract information that is clearly stated or strongly implied.
Do not infer facts that require significant speculation.
If nothing new is learned, return empty updates.

Conversation:
{conversation}
"""

This prompt is deliberately conservative. It asks the LLM to only extract what is “clearly stated or strongly implied,” which helps keep confidence scores meaningful. The structured JSON output format makes parsing reliable.

Why structured JSON output? Free-form text extraction is unpredictable. One run might say “The user likes Python” and another might say “User preference: Python programming language.” Structured output with explicit fields ensures consistent, parseable results every time.

The Async Memory Extractor

Now we build the core extraction engine that runs in a background thread.

import threading
import json
import logging
from typing import Optional
from openai import OpenAI

logger = logging.getLogger(__name__)

class AsyncMemoryExtractor:
def __init__(self, memory: AgentMemory, model: str = "gpt-4o-mini"):
# The shared memory object that gets updated
self.memory = memory
# LLM client for extraction calls
self.client = OpenAI()
self.model = model
# Lock to prevent concurrent memory writes
self._lock = threading.Lock()

def extract_in_background(self, conversation: str) -> None:
"""Launch a background thread to extract and store memories."""
# Create a daemon thread so it does not block program exit
thread = threading.Thread(
target=self._extract_and_update,
args=(conversation,),
daemon=True,
)
thread.start()
logger.info("Memory extraction started in background thread")

def _extract_and_update(self, conversation: str) -> None:
"""The actual extraction logic that runs in the background."""
try:
# Call the LLM to extract memory from the conversation
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": "You extract structured memory from conversations. Return valid JSON only."},
{"role": "user", "content": EXTRACTION_PROMPT.format(conversation=conversation)},
],
response_format={"type": "json_object"},
temperature=0.1,
)

# Parse the extraction result
result = json.loads(response.choices[0].message.content)

# Acquire the lock before updating shared memory
with self._lock:
self._apply_updates(result)

logger.info("Memory extraction completed successfully")

except Exception as e:
# Memory extraction failures should never crash the agent
logger.error(f"Memory extraction failed: {e}")

def _apply_updates(self, result: dict) -> None:
"""Apply extracted updates to the memory object."""
# Update user context if new information was found
context_updates = result.get("user_context_updates", {})
if context_updates.get("work_context"):
self.memory.user_context.work_context = context_updates["work_context"]
if context_updates.get("personal_context"):
self.memory.user_context.personal_context = context_updates["personal_context"]
if context_updates.get("top_of_mind"):
self.memory.user_context.top_of_mind = context_updates["top_of_mind"]
self.memory.user_context.last_updated = datetime.now()

# Add new facts
for fact_data in result.get("facts", []):
fact = MemoryFact(
content=fact_data["content"],
category=fact_data["category"],
confidence=fact_data.get("confidence", 0.5),
extracted_at=datetime.now(),
)
self.memory.facts.facts.append(fact)

Let’s walk through this code carefully. The AsyncMemoryExtractor wraps our AgentMemory and provides a single public method: extract_in_background(). When called, it spawns a daemon thread that makes an LLM call to analyze the conversation, parses the JSON result, and applies updates to the shared memory object.

The _lock (a threading.Lock) prevents two background threads from writing to memory simultaneously. Without it, concurrent updates could corrupt the facts list or overwrite context fields mid-update. The daemon=True flag ensures the background thread does not prevent the program from exiting if the main thread finishes.

Notice the temperature=0.1 in the LLM call. We want deterministic, conservative extraction, not creative interpretation. The response_format={"type": "json_object"} forces the LLM to return valid JSON, eliminating parsing failures.

Production vs demo distinction: In this demo, we use a simple threading.Thread. In production, you would use a proper task queue like Celery, or an async framework like asyncio with background tasks. The principle is identical: never block the main conversation loop. LangGraph handles this natively through its background processing support in Store managers, where memory extraction runs as a separate node in the graph.

Expected output (logging from a background extraction):

INFO: Memory extraction started in background thread
INFO: Memory extraction completed successfully

Let’s test the extractor with a sample conversation.

# Create memory and extractor
memory = AgentMemory()
extractor = AsyncMemoryExtractor(memory)

# Simulate a conversation
sample_conversation = """
User: I'm working on a RAG pipeline for our SEC filing analysis tool.
Assistant: I can help with that. What retrieval strategy are you currently using?
User: We started with basic chunk-and-embed using Chroma, but accuracy is only around 60%.
I'm a senior engineer at FinTech Corp. We need to hit 85% before the Q2 launch.
Assistant: Have you considered hybrid retrieval combining BM25 with dense embeddings?
User: Not yet, but that sounds promising. I prefer Python and we are using LangChain 0.3.
"""

# This returns immediately, extraction happens in background
extractor.extract_in_background(sample_conversation)

# Wait briefly for the background thread to finish (only for demo purposes)
import time
time.sleep(3)

# Check what was extracted
print(f"Work context: {memory.user_context.work_context}")
print(f"Top of mind: {memory.user_context.top_of_mind}")
print(f"Facts extracted: {len(memory.facts.facts)}")
for fact in memory.facts.facts:
print(f" [{fact.confidence:.2f}] {fact.content} ({fact.category})")

Expected output:

Work context: Senior engineer at FinTech Corp building a RAG pipeline for SEC filing analysis
Top of mind: Improving retrieval accuracy from 60% to 85% before Q2 launch
Facts extracted: 4
[0.95] User is a senior engineer at FinTech Corp (project)
[0.90] Current RAG accuracy is 60% using basic chunk-and-embed with Chroma (technical)
[0.85] Target accuracy is 85% before Q2 launch (project)
[0.92] User prefers Python and uses LangChain 0.3 (preference)

his concludes Phase 2. We now have an extraction engine that populates our three-layer memory model without interrupting the conversation flow. But right now, it fires on every single message. In Phase 3, we will add a debounce queue that batches multiple messages into a single extraction call, dramatically cutting costs.

Phase 3: Debounce Queue

In a real conversation, users often send multiple messages in quick succession. A question, a follow-up, a correction. If we trigger a memory extraction after every single message, we waste LLM calls on partial context. The second message might contradict the first. The third might clarify both.

The solution is a debounce queue. Instead of extracting immediately, we wait for a quiet period (30 seconds after the last message) before processing. This batches an entire conversational exchange into one extraction call. Much cheaper. Much cleaner.

How the Debounce Pattern Works

Before we build the implementation, let’s understand the three key behaviors:

  1. Message arrives: Add it to the queue and reset the 30-second timer
  2. Another message arrives within 30 seconds: Add it to the queue and reset the timer again
  3. 30 seconds pass with no new messages: Process the entire queue in one batch

This means a rapid-fire exchange of 5 messages results in just one LLM extraction call instead of five.

Implementing the Debounce Queue

import threading
import time
from typing import List

class DebounceMemoryQueue:
def __init__(
self,
extractor: AsyncMemoryExtractor,
debounce_seconds: float = 30.0,
):
# The extractor that will process batched conversations
self.extractor = extractor
# How long to wait after the last message before processing
self.debounce_seconds = debounce_seconds
# Queue of messages waiting to be processed
self._queue: List[str] = []
# Lock for thread-safe queue access
self._lock = threading.Lock()
# The current timer that will trigger processing
self._timer: Optional[threading.Timer] = None

def add_message(self, role: str, content: str) -> None:
"""Add a message to the queue and reset the debounce timer."""
with self._lock:
# Format and append the message to the queue
self._queue.append(f"{role}: {content}")

# Cancel the existing timer if one is running
if self._timer is not None:
self._timer.cancel()

# Start a new timer that will fire after the debounce period
self._timer = threading.Timer(
self.debounce_seconds,
self._process_queue,
)
self._timer.daemon = True
self._timer.start()

def _process_queue(self) -> None:
"""Process all queued messages as a single batch."""
with self._lock:
if not self._queue:
return

# Join all queued messages into one conversation string
conversation = "\n".join(self._queue)
# Clear the queue
self._queue.clear()
self._timer = None

# Send the batched conversation to the async extractor
self.extractor.extract_in_background(conversation)

def flush(self) -> None:
"""Force immediate processing of the queue (useful for session end)."""
with self._lock:
if self._timer is not None:
self._timer.cancel()
self._timer = None
self._process_queue()

In the code above, the DebounceMemoryQueue wraps our AsyncMemoryExtractor with a time-based batching layer. Every call to add_message() appends the message to an internal queue and resets a 30-second timer. If no new messages arrive within that window, _process_queue() fires and sends the entire batch to the extractor as one conversation string.

The flush() method is important for session boundaries. When a user disconnects or a session ends, you call flush() to immediately process any remaining queued messages instead of losing them.

Why 30 seconds? This value balances responsiveness with cost efficiency. In testing, most multi-message exchanges complete within 20 seconds. A 30-second window catches nearly all related messages while keeping the feedback loop tight enough that memory feels current. Adjust this based on your application’s conversation patterns.

Let’s test the debounce behavior.

# Create the full pipeline
memory = AgentMemory()
extractor = AsyncMemoryExtractor(memory)
queue = DebounceMemoryQueue(extractor, debounce_seconds=2.0) # 2s for demo

# Simulate rapid-fire messages (would normally trigger 4 extractions)
queue.add_message("User", "I need help with my RAG pipeline")
queue.add_message("Assistant", "Sure, what framework are you using?")
queue.add_message("User", "LangChain with Chroma for vector storage")
queue.add_message("User", "Actually we just switched to Pinecone yesterday")

print(f"Messages queued: 4")
print(f"Extraction calls so far: 0 (waiting for debounce)")

# Wait for the debounce period to pass
time.sleep(3)

print(f"Messages queued after debounce: 0")
print(f"Extraction calls: 1 (single batch)")

Expected output:

Messages queued: 4
Extraction calls so far: 0 (waiting for debounce)
Messages queued after debounce: 0
Extraction calls: 1 (single batch)
Cost impact: If a typical conversation has 10 user messages and 10 assistant messages, the naive approach makes 20 extraction calls. With debounce, you make 2 to 3 calls for the same conversation. At approximately $0.01 per extraction call using gpt-4o-mini, that is the difference between $0.20 and $0.03 per conversation. Across thousands of users, this adds up fast.
How this maps to LangGraph: LangGraph’s background memory processing achieves a similar effect through its graph execution model. Instead of debouncing with timers, you would add a “memory extraction” node that runs after the conversation node completes its batch of interactions. The Store manager in LangMem handles batching automatically when configured for background processing.

This concludes Phase 3. We now have a cost-efficient pipeline that batches conversation messages and extracts memory in a single background call. But we are still storing every fact the LLM extracts, regardless of quality. In Phase 4, we will add a confidence filter that ensures only reliable facts make it into memory.

Phase 4: Confidence-Based Filtering

Not every piece of information extracted from a conversation deserves to be remembered. The user says “I might try PostgreSQL” and your LLM extracts “User’s preferred database is PostgreSQL” with a confidence of 0.4. That is a guess, not a fact. If you store it, the next session might confidently recommend PostgreSQL when the user was just thinking out loud.

The fix is a confidence threshold. Set it at 0.7, and anything below that score gets discarded. Additionally, cap the total number of stored facts at 100 and use an eviction policy that removes the oldest, lowest-confidence facts first.

OpenAI’s context engineering cookbook describes this philosophy as “belief updates instead of fact accumulation.” The system should not blindly append everything. It should maintain a curated, high-confidence view of the user.

Implementing the Confidence Filter

class ConfidenceFilter:
def __init__(
self,
min_confidence: float = 0.7,
max_facts: int = 100,
):
# Facts below this score get discarded
self.min_confidence = min_confidence
# Maximum number of facts to retain
self.max_facts = max_facts

def filter_new_facts(self, new_facts: list[MemoryFact]) -> list[MemoryFact]:
"""Remove facts that fall below the confidence threshold."""
accepted = []
rejected = []
for fact in new_facts:
if fact.confidence >= self.min_confidence:
accepted.append(fact)
else:
rejected.append(fact)

if rejected:
logger.info(
f"Filtered out {len(rejected)} low-confidence facts "
f"(threshold: {self.min_confidence})"
)
return accepted

def enforce_cap(self, facts: list[MemoryFact]) -> list[MemoryFact]:
"""Trim the fact list to stay under the maximum, removing
the oldest lowest-confidence facts first."""
if len(facts) <= self.max_facts:
return facts

# Sort by confidence (ascending), then by age (oldest first)
# This puts the least valuable facts at the front
facts_sorted = sorted(
facts,
key=lambda f: (f.confidence, f.extracted_at or datetime.min),
)

# Remove from the front (lowest confidence, oldest) until under cap
excess = len(facts_sorted) - self.max_facts
removed = facts_sorted[:excess]
kept = facts_sorted[excess:]

logger.info(
f"Evicted {len(removed)} facts to enforce cap of {self.max_facts}. "
f"Lowest evicted confidence: {removed[0].confidence:.2f}"
)

return kept

The ConfidenceFilter class has two responsibilities. The filter_new_facts() method gates incoming facts, rejecting anything below the 0.7 threshold before it enters memory. The enforce_cap() method trims the total fact list when it exceeds 100, using a composite sort that prioritizes removing old, low-confidence facts first.

The sort key (f.confidence, f.extracted_at) creates a two-level ordering. Facts with confidence 0.7 sort before facts with confidence 0.9. Among facts with equal confidence, older ones sort first. This means the eviction policy removes stale, uncertain facts before recent, confident ones.

Why 0.7 as the threshold? In practice, LLM-extracted confidence scores tend to cluster around 0.5 (uncertain) and 0.9 (confident). A threshold of 0.7 sits right above the uncertain cluster, catching most genuine facts while filtering out speculation. You can tune this based on your extraction model’s calibration.

Integrating the Filter into the Extractor

Let’s update the AsyncMemoryExtractor to use the confidence filter.

class FilteredMemoryExtractor(AsyncMemoryExtractor):
def __init__(self, memory: AgentMemory, model: str = "gpt-4o-mini"):
super().__init__(memory, model)
# Add the confidence filter
self.filter = ConfidenceFilter(min_confidence=0.7, max_facts=100)

def _apply_updates(self, result: dict) -> None:
"""Apply extracted updates with confidence filtering."""
# Update user context (same as before)
context_updates = result.get("user_context_updates", {})
if context_updates.get("work_context"):
self.memory.user_context.work_context = context_updates["work_context"]
if context_updates.get("personal_context"):
self.memory.user_context.personal_context = context_updates["personal_context"]
if context_updates.get("top_of_mind"):
self.memory.user_context.top_of_mind = context_updates["top_of_mind"]
self.memory.user_context.last_updated = datetime.now()

# Parse new facts
new_facts = []
for fact_data in result.get("facts", []):
new_facts.append(
MemoryFact(
content=fact_data["content"],
category=fact_data["category"],
confidence=fact_data.get("confidence", 0.5),
extracted_at=datetime.now(),
)
)

# Filter by confidence threshold
accepted_facts = self.filter.filter_new_facts(new_facts)

# Add accepted facts to memory
self.memory.facts.facts.extend(accepted_facts)

# Enforce the total fact cap
self.memory.facts.facts = self.filter.enforce_cap(
self.memory.facts.facts
)

The FilteredMemoryExtractor extends our base extractor by adding the confidence filter at two points. New facts get filtered immediately after extraction (removing low-confidence ones). Then the total fact list gets trimmed if it exceeds the cap. This two-step approach ensures both quality and quantity are controlled.

Let’s test the filter with a mix of high and low confidence facts.

# Create test facts with varying confidence
test_facts = [
MemoryFact("User's name is Sarah Chen", "personal", 0.95, datetime.now()),
MemoryFact("Uses Python 3.11", "technical", 0.90, datetime.now()),
MemoryFact("Might be interested in Rust", "preference", 0.4, datetime.now()),
MemoryFact("Could be using Docker", "technical", 0.55, datetime.now()),
MemoryFact("Works at FinTech Corp", "project", 0.88, datetime.now()),
MemoryFact("Seems to prefer dark mode", "preference", 0.3, datetime.now()),
]

confidence_filter = ConfidenceFilter(min_confidence=0.7, max_facts=100)
accepted = confidence_filter.filter_new_facts(test_facts)

print(f"Input facts: {len(test_facts)}")
print(f"Accepted facts: {len(accepted)}")
print(f"Rejected facts: {len(test_facts) - len(accepted)}")
print("\nAccepted:")
for f in accepted:
print(f" [{f.confidence:.2f}] {f.content}")

Expected output:

Input facts: 6
Accepted facts: 3
Rejected facts: 3

Accepted:
[0.95] User's name is Sarah Chen
[0.90] Uses Python 3.11
[0.88] Works at FinTech Corp

Before and After: Without the confidence filter, our memory would contain 6 facts including “Might be interested in Rust” (0.4) and “Seems to prefer dark mode” (0.3). These uncertain facts could mislead the agent. With the filter, only 3 high-confidence facts survive, each one reliable enough to inform future responses.

his concludes Phase 4. Our memory pipeline now only stores facts it is confident about, and automatically evicts stale, uncertain facts when capacity is reached. But even with clean facts, we still need to manage how much memory gets injected into the agent’s prompt. In Phase 5, we will cap the token budget and ensure the agent always gets the most reliable context first.

Phase 5: Token-Capped Prompt Injection

You now have a clean, confidence-filtered set of memories. The next challenge is injecting them into the agent’s system prompt without blowing up the context window. If memory grows to 5,000 tokens and your system prompt is already 3,000 tokens, you are consuming 8,000 tokens before the user even says anything.

The rule is simple: cap memory injection at 2,000 tokens. If memory exceeds that limit, drop the lowest confidence facts first until it fits. The agent always gets the most reliable context, never an overloaded prompt.

Building the Token-Aware Injector

import tiktoken

class MemoryInjector:
def __init__(
self,
max_tokens: int = 2000,
encoding_name: str = "cl100k_base",
):
# Maximum tokens allowed for memory in the system prompt
self.max_tokens = max_tokens
# Tokenizer for counting tokens accurately
self.encoder = tiktoken.get_encoding(encoding_name)

def count_tokens(self, text: str) -> int:
"""Count the number of tokens in a string."""
return len(self.encoder.encode(text))

def build_memory_prompt(self, memory: AgentMemory) -> str:
"""Build the memory section of the system prompt,
staying within the token budget."""

# Step 1: Always include user context (highest priority)
sections = []
context_section = self._format_user_context(memory.user_context)
sections.append(context_section)

# Step 2: Include history summaries
history_section = self._format_history(memory.history)
sections.append(history_section)

# Step 3: Include facts sorted by confidence (highest first)
sorted_facts = sorted(
memory.facts.facts,
key=lambda f: f.confidence,
reverse=True, # Highest confidence first
)

# Step 4: Add facts one by one until we hit the token limit
facts_lines = []
current_text = "\n\n".join(sections)
current_tokens = self.count_tokens(current_text)

for fact in sorted_facts:
fact_line = f"- [{fact.category}] {fact.content}"
line_tokens = self.count_tokens(fact_line)

if current_tokens + line_tokens + 10 > self.max_tokens:
# Adding this fact would exceed the budget
break

facts_lines.append(fact_line)
current_tokens += line_tokens

# Assemble the final memory prompt
if facts_lines:
facts_section = "Known facts about this user:\n" + "\n".join(facts_lines)
sections.append(facts_section)

memory_prompt = "\n\n".join(sections)
final_tokens = self.count_tokens(memory_prompt)

logger.info(
f"Memory prompt: {final_tokens} tokens "
f"({len(facts_lines)}/{len(sorted_facts)} facts included)"
)

return memory_prompt

def _format_user_context(self, ctx: UserContext) -> str:
"""Format user context into a readable prompt section."""
lines = ["User context:"]
if ctx.work_context:
lines.append(f" Work: {ctx.work_context}")
if ctx.personal_context:
lines.append(f" Preferences: {ctx.personal_context}")
if ctx.top_of_mind:
lines.append(f" Current focus: {ctx.top_of_mind}")
return "\n".join(lines)

def _format_history(self, history: ConversationHistory) -> str:
"""Format conversation history into a readable prompt section."""
lines = ["Conversation history:"]
if history.recent_months:
lines.append(f" Recent: {history.recent_months}")
if history.earlier_context:
lines.append(f" Earlier: {history.earlier_context}")
if history.long_term_background:
lines.append(f" Background: {history.long_term_background}")
return "\n".join(lines)

The MemoryInjector class builds the memory section of the system prompt with a strict token budget. The strategy follows a priority order: user context first (always included), then history summaries, then individual facts sorted by confidence from highest to lowest.

The key logic is in build_memory_prompt(). It assembles the context and history sections first, measures their token count, then adds facts one at a time from highest to lowest confidence until the budget is exhausted. Facts that do not fit simply get dropped. This guarantees the agent always sees the most reliable information.

We use tiktoken with the cl100k_base encoding, which is the tokenizer for GPT-4 and most modern models. This gives us accurate token counts instead of rough character-based estimates.

Why cap at 2,000 tokens? A typical system prompt for an AI agent runs 1,000 to 3,000 tokens. Adding 2,000 tokens of memory keeps the total under 5,000, leaving plenty of room for the conversation itself. If you use a model with a 128K context window, you have more headroom, but keeping memory compact forces it to stay relevant. Bloated memory prompts dilute the agent’s attention.

Let’s test the injector with a memory object that has more facts than can fit.

# Build a memory with many facts
memory = AgentMemory()
memory.user_context.work_context = "Senior ML engineer at FinTech Corp"
memory.user_context.personal_context = "Prefers Python, concise answers"
memory.user_context.top_of_mind = "Optimizing RAG retrieval accuracy"
memory.history.recent_months = "Working on SEC filing analysis for two weeks"

# Add 20 facts with varying confidence
for i in range(20):
memory.facts.facts.append(
MemoryFact(
content=f"Technical fact number {i + 1} about the user's setup",
category="technical",
confidence=0.7 + (i * 0.015),
extracted_at=datetime.now(),
)
)

injector = MemoryInjector(max_tokens=500) # Low limit for demo
prompt = injector.build_memory_prompt(memory)
print(prompt)

Expected output:

User context:
Work: Senior ML engineer at FinTech Corp
Preferences: Prefers Python, concise answers
Current focus: Optimizing RAG retrieval accuracy

Conversation history:
Recent: Working on SEC filing analysis for two weeks

Known facts about this user:
- [technical] Technical fact number 20 about the user's setup
- [technical] Technical fact number 19 about the user's setup
- [technical] Technical fact number 18 about the user's setup
- [technical] Technical fact number 17 about the user's setup
- [technical] Technical fact number 16 about the user's setup

Notice how only 5 of the 20 facts fit within the 500-token budget. The injector automatically selected the highest-confidence facts and dropped the rest. In production with a 2,000-token budget, you can typically fit 30 to 50 facts alongside the context and history sections.

How this maps to LangGraph: In LangGraph, this injection pattern maps to the system prompt construction in your graph’s entry node. You would build the memory prompt as part of the SystemMessage content, using the Store to retrieve facts and the graph's state to access user context. LangGraph's MemorySaver and Store provide the retrieval layer, and you would add the token-capping logic as a utility function called during prompt assembly.

This concludes Phase 5. Our memory pipeline now respects a strict token budget, always prioritizing the most confident facts. The final piece is making sure all of this data gets written to disk safely. In Phase 6, we will implement atomic file writes that guarantee crash-safe persistence.

Phase 6: Atomic File Writes

Everything we have built so far lives in memory. If the process crashes, all memories are lost. We need to persist the memory state to disk. But naive file writes are dangerous in a concurrent system. If the agent is reading memory while a background thread is writing it, the reader can see a half-written, corrupted file.

The solution is atomic writes using the rename pattern. You write to a temporary file first, then rename it to the final path. On any Unix system (Linux, macOS), os.rename() is an atomic operation. Readers either see the old complete file or the new complete file, never a partial state.

Implementing Atomic Persistence

import os
import tempfile
import json

class AtomicMemoryStore:
def __init__(self, file_path: str):
# The final file path where memory is persisted
self.file_path = file_path
# Directory for temporary files (same filesystem as final path)
self._dir = os.path.dirname(file_path) or "."

def save(self, memory: AgentMemory) -> None:
"""Atomically write memory state to disk."""
# Serialize the memory to JSON
data = json.dumps(memory.to_dict(), indent=2, default=str)

# Write to a temporary file in the same directory
# (same filesystem ensures rename is atomic)
fd, tmp_path = tempfile.mkstemp(
dir=self._dir,
prefix=".memory_",
suffix=".tmp",
)
try:
# Write the full content to the temp file
with os.fdopen(fd, "w") as f:
f.write(data)
f.flush()
# Force the OS to write to disk (not just the buffer)
os.fsync(f.fileno())

# Atomic rename: readers never see a partial file
os.replace(tmp_path, self.file_path)
logger.info(f"Memory saved atomically to {self.file_path}")

except Exception:
# Clean up the temp file if anything goes wrong
if os.path.exists(tmp_path):
os.unlink(tmp_path)
raise

def load(self) -> AgentMemory:
"""Load memory state from disk."""
if not os.path.exists(self.file_path):
logger.info("No existing memory file found, starting fresh")
return AgentMemory()

with open(self.file_path, "r") as f:
data = json.loads(f.read())

memory = AgentMemory.from_dict(data)
logger.info(
f"Memory loaded: {len(memory.facts.facts)} facts"
)
return memory

Let’s break down why each step matters. The tempfile.mkstemp() call creates a temporary file in the same directory as the final file. This is critical because os.replace() is only guaranteed atomic when source and destination are on the same filesystem. If you create the temp file in /tmp and the final file is on /data, the rename might not be atomic.

The os.fsync() call forces the OS to flush the file data from its buffer to the physical disk. Without it, a power failure could lose the data even though f.write() succeeded (the data might still be in the OS page cache). The os.replace() function (preferred over os.rename() because it works cross-platform and handles existing files) performs the atomic swap.

If anything fails during the write, the except block cleans up the temporary file. The final file is never touched until the new data is fully written and synced to disk.

Why not just write directly to the file? Consider what happens if the process crashes mid-write. With a direct write, the file might contain half of the old data and half of the new data, creating a corrupted JSON file that fails to parse on the next load. With atomic rename, the worst case is that the rename never happened and you still have the previous valid version.
Production considerations: For high-throughput systems where many agents write memory concurrently, consider using SQLite with WAL mode or a dedicated database instead of JSON files. SQLite provides ACID transactions out of the box. For this architecture, the file-based approach works well for single-agent-per-user deployments.

Let’s test the atomic save and load cycle.

# Create and populate memory
memory = AgentMemory()
memory.user_context.work_context = "ML engineer building production agents"
memory.user_context.last_updated = datetime.now()
memory.facts.facts.append(
MemoryFact("Uses LangGraph for orchestration", "technical", 0.91, datetime.now())
)

# Save atomically
store = AtomicMemoryStore("/tmp/agent_memory.json")
store.save(memory)

# Load it back
loaded = store.load()
print(f"Work context: {loaded.user_context.work_context}")
print(f"Facts: {len(loaded.facts.facts)}")
print(f"First fact: {loaded.facts.facts[0].content}")

Expected output:

Memory saved atomically to /tmp/agent_memory.json
Memory loaded: 1 facts
Work context: ML engineer building production agents
Facts: 1
First fact: Uses LangGraph for orchestration

Wiring Persistence into the Pipeline

Now let’s connect the atomic store to our extraction pipeline so memory gets saved after every update.

class PersistentMemoryExtractor(FilteredMemoryExtractor):
def __init__(
self,
memory: AgentMemory,
store: AtomicMemoryStore,
model: str = "gpt-4o-mini",
):
super().__init__(memory, model)
# The atomic store for persisting to disk
self.store = store

def _apply_updates(self, result: dict) -> None:
"""Apply updates and persist to disk atomically."""
# Apply the filtered updates to memory
super()._apply_updates(result)
# Save the updated memory state to disk
self.store.save(self.memory)

This small subclass adds one line: after every memory update, the state gets atomically persisted to disk. The background thread extracts, filters, updates memory, and saves, all without the main conversation thread knowing or caring.

his concludes Phase 6. Our memory pipeline is now fully persistent with crash-safe atomic writes. Let’s put the entire system together and see it work end to end.

Putting It All Together: End-to-End Demo

Here is the complete pipeline wired together, from message input to persisted memory.

def create_memory_pipeline(memory_file: str = "agent_memory.json"):
"""Create the full memory pipeline with all six practices."""

# Load existing memory or start fresh
store = AtomicMemoryStore(memory_file)
memory = store.load()

# Create the filtered, persistent extractor
extractor = PersistentMemoryExtractor(
memory=memory,
store=store,
model="gpt-4o-mini",
)

# Wrap in a debounce queue
queue = DebounceMemoryQueue(
extractor=extractor,
debounce_seconds=30.0,
)

# Create the memory injector for prompt building
injector = MemoryInjector(max_tokens=2000)

return memory, queue, injector


def handle_message(
queue: DebounceMemoryQueue,
injector: MemoryInjector,
memory: AgentMemory,
user_message: str,
) -> str:
"""Handle an incoming user message with memory support."""

# Step 1: Build the system prompt with current memory
memory_context = injector.build_memory_prompt(memory)
system_prompt = f"""You are a helpful AI assistant.

{memory_context}

Use the above context to personalize your responses.
Reference known facts about the user when relevant.
Do not mention that you are reading from a memory system."""

# Step 2: Call the LLM for the response
client = OpenAI()
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_message},
],
)
assistant_reply = response.choices[0].message.content

# Step 3: Queue both messages for background memory extraction
queue.add_message("User", user_message)
queue.add_message("Assistant", assistant_reply)

# The response returns immediately
# Memory extraction happens in the background after debounce
return assistant_reply

Let’s observe what happens in a typical session:

  1. The user sends a message. The agent builds a system prompt that includes existing memory (token-capped, highest confidence facts first)
  2. The LLM generates a response using that personalized context
  3. Both the user message and assistant reply get queued for memory extraction
  4. The response returns to the user immediately (no waiting for memory)
  5. After 30 seconds of quiet, the debounce timer fires
  6. A background thread extracts facts from the batched conversation
  7. New facts get filtered by the 0.7 confidence threshold
  8. The fact list gets capped at 100 entries
  9. Updated memory gets atomically written to disk
  10. Next session, the agent loads this file and remembers everything
Key insight: The user experiences none of steps 5 through 9. From their perspective, the agent just responds and remembers. The entire memory pipeline is invisible, which is exactly how it should be.

How to Improve It Further

  1. Semantic deduplication: Before adding a new fact, check if a semantically similar fact already exists. Use embedding similarity (cosine distance above 0.9) to detect duplicates and merge them, keeping the higher confidence score. LangChain’s LangMem framework calls this “memory consolidation” and their MemoryManager component handles it automatically.
  2. Fact decay over time: Facts should lose confidence as they age. A preference stated six months ago may no longer be accurate. Implement a decay function that reduces confidence by a small amount (0.01 per week) so stale facts naturally get evicted as new ones arrive.
  3. Category-aware injection: Instead of sorting facts purely by confidence, allocate token budgets per category. Reserve 500 tokens for “technical” facts, 300 for “preference” facts, and so on. This ensures the agent gets a balanced view of the user rather than overloading on one category.
  4. Multi-user memory isolation: Wrap the entire pipeline in a user-scoped namespace. Each user gets their own memory file, their own debounce queue, and their own fact store. LangGraph’s Store interface supports this natively through its namespace parameter, allowing you to scope memories by user, organization, or any arbitrary hierarchy.
  5. Memory conflict resolution: When a new fact contradicts an existing one (the user switches from Python to Rust), the old fact should be invalidated rather than kept alongside the new one. Implement a contradiction detector that checks new facts against existing ones and removes conflicts. This is what OpenAI’s context engineering framework calls “belief updates,” where memory evolves to reflect the user’s current state rather than accumulating a history of every past state.

Building the AI Memory Stack: Layered Storage, Async Extraction and Atomic Persistence was originally published in Towards AI on Medium, where people are continuing the conversation by highlighting and responding to this story.

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top