Memory
Raw SQL
Implement MemoryStore with a SQL client and hand-written queries.
This example uses PostgreSQL and the pg client. Store Anvia messages as JSONB and order them by an auto-incrementing id.
Schema
CREATE TABLE agent_memory_messages (
id BIGSERIAL PRIMARY KEY,
session_id TEXT NOT NULL,
user_id TEXT,
run_id TEXT NOT NULL,
turn INTEGER NOT NULL,
message JSONB NOT NULL,
metadata JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX agent_memory_messages_session_id_id_idx
ON agent_memory_messages (session_id, id);
CREATE TABLE agent_memory_errors (
id BIGSERIAL PRIMARY KEY,
session_id TEXT NOT NULL,
user_id TEXT,
run_id TEXT NOT NULL,
error JSONB NOT NULL,
messages JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);Store
import type {
MemoryAppendInput,
MemoryContext,
MemoryErrorInput,
MemoryStore,
Message,
} from "@anvia/core";
import type { Pool } from "pg";
export class SqlMemoryStore implements MemoryStore {
constructor(private readonly pool: Pool) {}
async load(context: MemoryContext): Promise<Message[]> {
const result = await this.pool.query<{ message: Message }>(
`SELECT message
FROM agent_memory_messages
WHERE session_id = $1
ORDER BY id ASC`,
[context.sessionId],
);
return result.rows.map((row) => row.message);
}
async append(input: MemoryAppendInput): Promise<void> {
const client = await this.pool.connect();
try {
await client.query("BEGIN");
for (const message of input.messages) {
await client.query(
`INSERT INTO agent_memory_messages (
session_id, user_id, run_id, turn, message, metadata
) VALUES ($1, $2, $3, $4, $5::jsonb, $6::jsonb)`,
[
input.context.sessionId,
input.context.userId ?? null,
input.runId,
input.turn,
JSON.stringify(message),
JSON.stringify(input.context.metadata ?? null),
],
);
}
await client.query("COMMIT");
} catch (error) {
await client.query("ROLLBACK");
throw error;
} finally {
client.release();
}
}
async clear(context: MemoryContext): Promise<void> {
await this.pool.query(
"DELETE FROM agent_memory_messages WHERE session_id = $1",
[context.sessionId],
);
}
async recordError(input: MemoryErrorInput): Promise<void> {
await this.pool.query(
`INSERT INTO agent_memory_errors (
session_id, user_id, run_id, error, messages
) VALUES ($1, $2, $3, $4::jsonb, $5::jsonb)`,
[
input.context.sessionId,
input.context.userId ?? null,
input.runId,
JSON.stringify(serializeError(input.error)),
JSON.stringify(input.messages),
],
);
}
}
function serializeError(error: unknown): Record<string, unknown> {
if (error instanceof Error) {
return {
name: error.name,
message: error.message,
stack: error.stack,
};
}
return { message: String(error) };
}Use It
const memory = new SqlMemoryStore(pool);
const agent = new AgentBuilder("support", model)
.memory(memory)
.build();
await agent.session("thread_123", { userId: "user_456" }).prompt("Hello").send();