查询
使用 Drizzle ORM 执行数据库查询。
Raypx 使用 Drizzle ORM 进行所有数据库操作。本页展示常用的查询模式和实际示例。
创建数据库实例
在 RPC Procedure 中,通过 createDb() 获取数据库实例:
import { createDb, schema } from "@raypx/database"
export const profileRouter = {
get: protectedProcedure.handler(async ({ context }) => {
const db = createDb()
const [profile] = await db
.select()
.from(schema.profiles)
.where(eq(schema.profiles.userId, context.session.user.id))
.limit(1)
return profile ?? null
}),
}createDb() 使用单例模式,多次调用返回同一个数据库连接。
导入 Schema 和操作符
Drizzle ORM 的查询操作符从 @raypx/database 包导出:
import {
createDb,
schema, // 所有表定义
eq, // 等于
and, // AND 条件
desc, // 降序排列
gte, // 大于等于
sql, // 原生 SQL
count, // 计数
sum, // 求和
} from "@raypx/database"SELECT 查询
基本查询
const db = createDb()
const [profile] = await db
.select()
.from(schema.profiles)
.where(eq(schema.profiles.userId, userId))
.limit(1)选择特定列
const docs = await db
.select({
id: schema.documents.id,
title: schema.documents.title,
contentType: schema.documents.contentType,
sizeBytes: schema.documents.sizeBytes,
status: schema.documents.status,
createdAt: schema.documents.createdAt,
})
.from(schema.documents)
.where(eq(schema.documents.userId, userId))多条件查询
import { and, eq } from "@raypx/database"
const [doc] = await db
.select()
.from(schema.documents)
.where(
and(
eq(schema.documents.id, input.id),
eq(schema.documents.userId, context.session.user.id),
),
)
.limit(1)排序和分页
import { desc } from "@raypx/database"
const events = await db
.select()
.from(schema.usageEvents)
.where(eq(schema.usageEvents.userId, userId))
.orderBy(desc(schema.usageEvents.createdAt))
.limit(50)聚合查询
import { sql } from "@raypx/database"
const rows = await db
.select({
aiTokens: sql<number>`COALESCE(SUM(CASE WHEN metric = 'ai_tokens' THEN quantity ELSE 0 END), 0)`,
aiRequests: sql<number>`COALESCE(SUM(CASE WHEN metric = 'ai_requests' THEN quantity ELSE 0 END), 0)`,
costUsd: sql<string>`COALESCE(SUM(cost_usd), 0)`,
})
.from(schema.usageEvents)
.where(and(eq(schema.usageEvents.userId, userId), sql`${schema.usageEvents.createdAt} >= ${periodStart}`))INSERT 插入
插入并返回
const [convo] = await db
.insert(schema.conversations)
.values({
userId: context.session.user.id,
title: input.title,
task: input.task ?? "summarize",
})
.returning()批量插入
await db.insert(schema.messages).values([
{
conversationId: input.conversationId,
userId,
role: "user",
content: input.content,
},
{
conversationId: input.conversationId,
userId,
role: "assistant",
content: "This is a response.",
model: "mock:raypx-1",
},
])UPDATE 更新
基本更新
const [updated] = await db
.update(schema.profiles)
.set({ ...input, updatedAt: new Date() })
.where(eq(schema.profiles.userId, userId))
.returning()条件更新
await db
.update(schema.conversations)
.set({ updatedAt: new Date() })
.where(eq(schema.conversations.id, input.conversationId))UPSERT(插入或更新)
Drizzle ORM 的 onConflictDoUpdate 实现 UPSERT:
await db
.insert(schema.profiles)
.values({ userId, ...input })
.onConflictDoUpdate({
target: schema.profiles.userId,
set: input,
})
.returning()在实际项目中,Raypx 使用查询 + 条件插入的方式实现类似效果:
const [existing] = await db
.select()
.from(schema.profiles)
.where(eq(schema.profiles.userId, userId))
.limit(1)
if (existing) {
// 更新
const [updated] = await db
.update(schema.profiles)
.set({ ...input, updatedAt: new Date() })
.where(eq(schema.profiles.userId, userId))
.returning()
return updated
}
// 插入
const [created] = await db
.insert(schema.profiles)
.values({ userId, ...input })
.returning()
return createdDELETE 删除
const [deleted] = await db
.delete(schema.documents)
.where(and(eq(schema.documents.id, input.id), eq(schema.documents.userId, userId)))
.returning()原生 SQL
当 Drizzle ORM 的查询构建器无法满足需求时,可以使用原生 SQL:
import { sql } from "@raypx/database"
// 健康检查
await db.execute(sql`SELECT 1`)
// 复杂聚合
const result = await db.execute(sql`
SELECT
metric,
COUNT(*) as count,
SUM(quantity) as total
FROM usage_event
WHERE user_id = ${userId}
GROUP BY metric
`)使用原生 SQL 时,务必通过参数化查询(${variable} 插值)传递用户输入,不要使用字符串拼接,以防止 SQL 注入。