Memory

Raw SQL

Implement MemoryStore with a SQL client and hand-written queries.

This example uses PostgreSQL and the pg client. Store Anvia messages as JSONB and order them by an auto-incrementing id.

Schema

CREATE TABLE agent_memory_messages (
  id BIGSERIAL PRIMARY KEY,
  session_id TEXT NOT NULL,
  user_id TEXT,
  run_id TEXT NOT NULL,
  turn INTEGER NOT NULL,
  message JSONB NOT NULL,
  metadata JSONB,
  created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

CREATE INDEX agent_memory_messages_session_id_id_idx
  ON agent_memory_messages (session_id, id);

CREATE TABLE agent_memory_errors (
  id BIGSERIAL PRIMARY KEY,
  session_id TEXT NOT NULL,
  user_id TEXT,
  run_id TEXT NOT NULL,
  error JSONB NOT NULL,
  messages JSONB NOT NULL,
  created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

Store

import type {
  MemoryAppendInput,
  MemoryContext,
  MemoryErrorInput,
  MemoryStore,
  Message,
} from "@anvia/core";
import type { Pool } from "pg";

export class SqlMemoryStore implements MemoryStore {
  constructor(private readonly pool: Pool) {}

  async load(context: MemoryContext): Promise<Message[]> {
    const result = await this.pool.query<{ message: Message }>(
      `SELECT message
       FROM agent_memory_messages
       WHERE session_id = $1
       ORDER BY id ASC`,
      [context.sessionId],
    );

    return result.rows.map((row) => row.message);
  }

  async append(input: MemoryAppendInput): Promise<void> {
    const client = await this.pool.connect();
    try {
      await client.query("BEGIN");

      for (const message of input.messages) {
        await client.query(
          `INSERT INTO agent_memory_messages (
            session_id, user_id, run_id, turn, message, metadata
          ) VALUES ($1, $2, $3, $4, $5::jsonb, $6::jsonb)`,
          [
            input.context.sessionId,
            input.context.userId ?? null,
            input.runId,
            input.turn,
            JSON.stringify(message),
            JSON.stringify(input.context.metadata ?? null),
          ],
        );
      }

      await client.query("COMMIT");
    } catch (error) {
      await client.query("ROLLBACK");
      throw error;
    } finally {
      client.release();
    }
  }

  async clear(context: MemoryContext): Promise<void> {
    await this.pool.query(
      "DELETE FROM agent_memory_messages WHERE session_id = $1",
      [context.sessionId],
    );
  }

  async recordError(input: MemoryErrorInput): Promise<void> {
    await this.pool.query(
      `INSERT INTO agent_memory_errors (
        session_id, user_id, run_id, error, messages
      ) VALUES ($1, $2, $3, $4::jsonb, $5::jsonb)`,
      [
        input.context.sessionId,
        input.context.userId ?? null,
        input.runId,
        JSON.stringify(serializeError(input.error)),
        JSON.stringify(input.messages),
      ],
    );
  }
}

function serializeError(error: unknown): Record<string, unknown> {
  if (error instanceof Error) {
    return {
      name: error.name,
      message: error.message,
      stack: error.stack,
    };
  }
  return { message: String(error) };
}

Use It

const memory = new SqlMemoryStore(pool);

const agent = new AgentBuilder("support", model)
  .memory(memory)
  .build();

await agent.session("thread_123", { userId: "user_456" }).prompt("Hello").send();