Raypx

查询

使用 Drizzle ORM 执行数据库查询。

Raypx 使用 Drizzle ORM 进行所有数据库操作。本页展示常用的查询模式和实际示例。

创建数据库实例

在 RPC Procedure 中,通过 createDb() 获取数据库实例:

import { createDb, schema } from "@raypx/database"

export const profileRouter = {
  get: protectedProcedure.handler(async ({ context }) => {
    const db = createDb()
    const [profile] = await db
      .select()
      .from(schema.profiles)
      .where(eq(schema.profiles.userId, context.session.user.id))
      .limit(1)
    return profile ?? null
  }),
}

createDb() 使用单例模式,多次调用返回同一个数据库连接。

导入 Schema 和操作符

Drizzle ORM 的查询操作符从 @raypx/database 包导出:

import {
  createDb,
  schema,   // 所有表定义
  eq,       // 等于
  and,      // AND 条件
  desc,     // 降序排列
  gte,      // 大于等于
  sql,      // 原生 SQL
  count,    // 计数
  sum,      // 求和
} from "@raypx/database"

SELECT 查询

基本查询

const db = createDb()
const [profile] = await db
  .select()
  .from(schema.profiles)
  .where(eq(schema.profiles.userId, userId))
  .limit(1)

选择特定列

const docs = await db
  .select({
    id: schema.documents.id,
    title: schema.documents.title,
    contentType: schema.documents.contentType,
    sizeBytes: schema.documents.sizeBytes,
    status: schema.documents.status,
    createdAt: schema.documents.createdAt,
  })
  .from(schema.documents)
  .where(eq(schema.documents.userId, userId))

多条件查询

import { and, eq } from "@raypx/database"

const [doc] = await db
  .select()
  .from(schema.documents)
  .where(
    and(
      eq(schema.documents.id, input.id),
      eq(schema.documents.userId, context.session.user.id),
    ),
  )
  .limit(1)

排序和分页

import { desc } from "@raypx/database"

const events = await db
  .select()
  .from(schema.usageEvents)
  .where(eq(schema.usageEvents.userId, userId))
  .orderBy(desc(schema.usageEvents.createdAt))
  .limit(50)

聚合查询

import { sql } from "@raypx/database"

const rows = await db
  .select({
    aiTokens: sql<number>`COALESCE(SUM(CASE WHEN metric = 'ai_tokens' THEN quantity ELSE 0 END), 0)`,
    aiRequests: sql<number>`COALESCE(SUM(CASE WHEN metric = 'ai_requests' THEN quantity ELSE 0 END), 0)`,
    costUsd: sql<string>`COALESCE(SUM(cost_usd), 0)`,
  })
  .from(schema.usageEvents)
  .where(and(eq(schema.usageEvents.userId, userId), sql`${schema.usageEvents.createdAt} >= ${periodStart}`))

INSERT 插入

插入并返回

const [convo] = await db
  .insert(schema.conversations)
  .values({
    userId: context.session.user.id,
    title: input.title,
    task: input.task ?? "summarize",
  })
  .returning()

批量插入

await db.insert(schema.messages).values([
  {
    conversationId: input.conversationId,
    userId,
    role: "user",
    content: input.content,
  },
  {
    conversationId: input.conversationId,
    userId,
    role: "assistant",
    content: "This is a response.",
    model: "mock:raypx-1",
  },
])

UPDATE 更新

基本更新

const [updated] = await db
  .update(schema.profiles)
  .set({ ...input, updatedAt: new Date() })
  .where(eq(schema.profiles.userId, userId))
  .returning()

条件更新

await db
  .update(schema.conversations)
  .set({ updatedAt: new Date() })
  .where(eq(schema.conversations.id, input.conversationId))

UPSERT(插入或更新)

Drizzle ORM 的 onConflictDoUpdate 实现 UPSERT:

await db
  .insert(schema.profiles)
  .values({ userId, ...input })
  .onConflictDoUpdate({
    target: schema.profiles.userId,
    set: input,
  })
  .returning()

在实际项目中,Raypx 使用查询 + 条件插入的方式实现类似效果:

const [existing] = await db
  .select()
  .from(schema.profiles)
  .where(eq(schema.profiles.userId, userId))
  .limit(1)

if (existing) {
  // 更新
  const [updated] = await db
    .update(schema.profiles)
    .set({ ...input, updatedAt: new Date() })
    .where(eq(schema.profiles.userId, userId))
    .returning()
  return updated
}

// 插入
const [created] = await db
  .insert(schema.profiles)
  .values({ userId, ...input })
  .returning()
return created

DELETE 删除

const [deleted] = await db
  .delete(schema.documents)
  .where(and(eq(schema.documents.id, input.id), eq(schema.documents.userId, userId)))
  .returning()

原生 SQL

当 Drizzle ORM 的查询构建器无法满足需求时,可以使用原生 SQL:

import { sql } from "@raypx/database"

// 健康检查
await db.execute(sql`SELECT 1`)

// 复杂聚合
const result = await db.execute(sql`
  SELECT
    metric,
    COUNT(*) as count,
    SUM(quantity) as total
  FROM usage_event
  WHERE user_id = ${userId}
  GROUP BY metric
`)

使用原生 SQL 时,务必通过参数化查询(${variable} 插值)传递用户输入,不要使用字符串拼接,以防止 SQL 注入。

On this page