Building RAG with Drizzle ORM and pgvector
January 25, 2025 ยท Eden Stack Team
Building RAG with Drizzle ORM and pgvector
Retrieval-Augmented Generation (RAG) is the secret sauce that lets your AI answer questions about your data. In Eden Stack, we've implemented a clean RAG pipeline using Drizzle ORM, Neon PostgreSQL with pgvector, and the Vercel AI SDK.
This post walks through the architecture and code.
The RAG Pipeline
Here's how data flows through our RAG system:
Setting Up the Schema
First, we need a table to store our document chunks with their embeddings. Drizzle ORM has first-class support for pgvector:
// packages/db/src/schema.ts
import {
pgTable,
serial,
text,
integer,
timestamp,
jsonb,
index,
vector
} from "drizzle-orm/pg-core";
export const projectFileChunks = pgTable(
"project_file_chunks",
{
id: serial("id").primaryKey(),
fileId: text("file_id")
.notNull()
.references(() => projectFiles.id, { onDelete: "cascade" }),
projectId: text("project_id")
.notNull()
.references(() => projects.id, { onDelete: "cascade" }),
// Chunk content
content: text("content").notNull(),
chunkIndex: integer("chunk_index").notNull(),
// Embedding (1536 dimensions for text-embedding-3-small)
embedding: vector("embedding", { dimensions: 1536 }),
// Metadata (source file, page number, etc.)
metadata: jsonb("metadata"),
createdAt: timestamp("created_at").notNull().defaultNow(),
},
(table) => [
// HNSW index for fast similarity search
index("project_file_chunks_embedding_idx").using(
"hnsw",
table.embedding.op("vector_cosine_ops")
),
// Index for filtering by project
index("project_file_chunks_project_id_idx").on(table.projectId),
]
);Key decisions:
- 1536 dimensions: This matches OpenAI's
text-embedding-3-smallmodel - HNSW index: Hierarchical Navigable Small World algorithm โ faster than IVFFlat for most workloads
- Cosine distance:
vector_cosine_opsworks well for text similarity - Cascade delete: When a file is deleted, its chunks go too
Generating Embeddings
We use the Vercel AI SDK for embedding generation. It provides a clean API and handles batching:
// packages/jobs/src/lib/embeddings.ts
import { openai } from "@ai-sdk/openai";
import { embed, embedMany } from "ai";
const EMBEDDING_MODEL = openai.embedding("text-embedding-3-small");
export async function generateEmbedding(text: string): Promise<number[]> {
const { embedding } = await embed({
model: EMBEDDING_MODEL,
value: text,
});
return embedding;
}
export async function generateEmbeddings(
texts: string[]
): Promise<number[][]> {
if (texts.length === 0) {
return [];
}
const { embeddings } = await embedMany({
model: EMBEDDING_MODEL,
values: texts,
});
return embeddings;
}Why text-embedding-3-small?
- Cost-effective: $0.02 per 1M tokens (vs $0.13 for
text-embedding-3-large) - Fast: Lower latency for real-time use
- Good enough: For most RAG use cases, the quality difference is negligible
The Ingestion Pipeline
Document processing happens in the background via Inngest. This ensures reliability โ if a step fails, it retries automatically:
// packages/jobs/src/functions/file-processing.ts
import { inngest } from "../client";
import { db, projectFileChunks, projectFiles } from "@eden/db";
import { generateEmbeddings } from "../lib/embeddings";
import { extractText } from "../lib/unstructured";
import { chunkText } from "../lib/chunking";
export const processProjectFile = inngest.createFunction(
{ id: "process-project-file" },
{ event: "project-file/uploaded" },
async ({ event, step }) => {
const { fileId, projectId, fileUrl } = event.data;
// Step 1: Extract text from document
const rawText = await step.run("extract-text", async () => {
return extractText(fileUrl);
});
// Step 2: Chunk the text
const chunks = await step.run("chunk-text", async () => {
return chunkText(rawText, {
maxChunkSize: 1000,
overlap: 200,
});
});
// Step 3: Generate embeddings for all chunks
const embeddings = await step.run("generate-embeddings", async () => {
return generateEmbeddings(chunks.map((c) => c.content));
});
// Step 4: Store in database
await step.run("store-chunks", async () => {
const values = chunks.map((chunk, i) => ({
fileId,
projectId,
content: chunk.content,
chunkIndex: i,
embedding: embeddings[i],
metadata: chunk.metadata,
}));
await db.insert(projectFileChunks).values(values);
});
// Step 5: Mark file as processed
await step.run("update-file-status", async () => {
await db
.update(projectFiles)
.set({ status: "processed" })
.where(eq(projectFiles.id, fileId));
});
return { chunksCreated: chunks.length };
}
);Each step.run() is checkpointed โ if the function crashes, it resumes from the last completed step.
Querying with Vector Similarity
When a user asks a question, we find relevant context using vector similarity search:
// apps/api/src/routes/chat.ts
import { db, projectFileChunks } from "@eden/db";
import { eq, sql } from "drizzle-orm";
async function getRelevantContext(
projectId: string,
query: string,
limit = 5
): Promise<string[]> {
// Generate embedding for the query
const queryEmbedding = await generateEmbedding(query);
// Find most similar chunks using cosine distance
const results = await db
.select({
content: projectFileChunks.content,
metadata: projectFileChunks.metadata,
distance: sql<number>`${projectFileChunks.embedding} <=> ${JSON.stringify(queryEmbedding)}::vector`,
})
.from(projectFileChunks)
.where(eq(projectFileChunks.projectId, projectId))
.orderBy(
sql`${projectFileChunks.embedding} <=> ${JSON.stringify(queryEmbedding)}::vector`
)
.limit(limit);
return results.map((r) => r.content);
}The <=> operator is pgvector's cosine distance operator. Lower values = more similar.
Putting It All Together
Here's how we use the context in a chat endpoint:
// apps/api/src/routes/chat.ts
export const chatRoute = new Elysia({ prefix: "/chat" })
.post("/", async ({ body, user }) => {
const { message, projectId, conversationId } = body;
// Get relevant context from user's documents
const context = await getRelevantContext(projectId, message, 5);
// Build the system prompt with context
const systemPrompt = `You are a helpful assistant.
Use the following context to answer questions:
${context.map((c, i) => `[${i + 1}] ${c}`).join("\n\n")}
If the answer isn't in the context, say so.`;
// Stream the response
const result = streamText({
model: anthropic("claude-sonnet-4-20250514"),
system: systemPrompt,
messages: [{ role: "user", content: message }],
});
return result.toDataStreamResponse();
});Chunking Strategies
How you chunk text significantly impacts retrieval quality. Here's our approach:
// packages/jobs/src/lib/chunking.ts
interface ChunkOptions {
maxChunkSize: number; // Target size in characters
overlap: number; // Characters to overlap between chunks
}
export function chunkText(
text: string,
options: ChunkOptions
): Array<{ content: string; metadata: object }> {
const { maxChunkSize, overlap } = options;
const chunks: Array<{ content: string; metadata: object }> = [];
// Split by paragraphs first
const paragraphs = text.split(/\n\n+/);
let currentChunk = "";
let currentStart = 0;
for (const para of paragraphs) {
if (currentChunk.length + para.length > maxChunkSize && currentChunk) {
chunks.push({
content: currentChunk.trim(),
metadata: { startChar: currentStart },
});
// Keep overlap from end of current chunk
const overlapText = currentChunk.slice(-overlap);
currentChunk = overlapText + para;
currentStart = currentStart + currentChunk.length - overlap;
} else {
currentChunk += (currentChunk ? "\n\n" : "") + para;
}
}
if (currentChunk.trim()) {
chunks.push({
content: currentChunk.trim(),
metadata: { startChar: currentStart },
});
}
return chunks;
}Key principles:
- Respect semantic boundaries: Split on paragraphs, not mid-sentence
- Use overlap: Helps when relevant info spans chunk boundaries
- Store metadata: Track source location for citations
Performance Considerations
1. Batch Embeddings
Always embed multiple texts in one call:
// Slow: N API calls
for (const chunk of chunks) {
const embedding = await generateEmbedding(chunk);
}
// Fast: 1 API call
const embeddings = await generateEmbeddings(chunks);2. Index Choice
For most workloads (< 1M vectors), HNSW is ideal:
CREATE INDEX ON project_file_chunks
USING hnsw (embedding vector_cosine_ops);For larger datasets, consider IVFFlat with appropriate lists parameter.
3. Filter Then Search
If you're filtering by project/user, let PostgreSQL optimize the query:
// Good: Filter in WHERE clause
.where(eq(projectFileChunks.projectId, projectId))
.orderBy(sql`embedding <=> ${queryVector}::vector`)
// Bad: Filter in application after vector searchTesting RAG Locally
-
Enable pgvector in your local Postgres:
CREATE EXTENSION IF NOT EXISTS vector; -
Seed some test data:
bun run db:seed -
Run the ingestion:
await inngest.send({ name: "project-file/uploaded", data: { fileId: "...", projectId: "...", fileUrl: "..." }, }); -
Test retrieval:
curl -X POST http://localhost:3001/chat \ -H "Content-Type: application/json" \ -d '{"message": "What does the document say about X?", "projectId": "..."}'
Next Steps
- Add hybrid search (combine vector + keyword search with RRF)
- Implement reranking with a cross-encoder model
- Add citations linking answers back to source documents
- Try HyDE (Hypothetical Document Embeddings) for query expansion
Wrapping Up
The combination of Drizzle ORM + pgvector + Vercel AI SDK makes building RAG surprisingly pleasant. You get:
- Type-safe queries with Drizzle
- Reliable ingestion with Inngest
- Simple embedding API with Vercel AI SDK
- Production-grade vector search with Neon's pgvector
Check out the AI Features guide for more implementation details.