Supabase 适配器
@aiao/rxdb-adapter-supabase 提供了与 Supabase 后端的集成能力,实现本地数据与云端数据库的双向同步、实时订阅和事务性批量操作。
安装
- npm
- Yarn
- pnpm
- Bun
npm install @aiao/rxdb @aiao/rxdb-adapter-supabase @supabase/supabase-js
yarn add @aiao/rxdb @aiao/rxdb-adapter-supabase @supabase/supabase-js
pnpm add @aiao/rxdb @aiao/rxdb-adapter-supabase @supabase/supabase-js
bun add @aiao/rxdb @aiao/rxdb-adapter-supabase @supabase/supabase-js
核心概念
变更追踪
所有数据变更统一记录到 RxDBChange 表(而非按实体分表),每条记录包含:
namespace/entity/entityId— 标识变更的实体type—INSERT/UPDATE/DELETEpatch/inversePatch— 正向和反向补丁(用于撤销)clientId— 发起变更的客户端标识(用于过滤自身变更)
同步游标
使用 RxDBChange 表的自增 id 作为同步游标(而非时间戳),避免同毫秒内多条变更导致的重复问题。
QueryCache 模式
fetchMetadata 只拉取 {id, updatedAt} 做新鲜度比较,脏数据再通过 findByIds 拉取完整内容,减少 90%+ 的数据传输量。
基础使用
方式一:传入 URL + Key
import { RxDB, SyncType } from '@aiao/rxdb';
import { RxDBAdapterSupabase } from '@aiao/rxdb-adapter-supabase';
const rxdb = new RxDB({
dbName: 'myapp',
entities: [Todo, User, Order],
sync: {
remote: { adapter: 'supabase' },
type: SyncType.Full
}
});
rxdb.adapter('supabase', db => {
return new RxDBAdapterSupabase(db, {
supabaseUrl: 'https://your-project.supabase.co',
supabaseKey: 'your-anon-key'
});
});
方式二:传入已有客户端
import { createClient } from '@supabase/supabase-js';
const supabase = createClient('https://your-project.supabase.co', 'your-anon-key');
rxdb.adapter('supabase', db => {
return new RxDBAdapterSupabase(db, { client: supabase });
});
客户端优先
传入 client 时,supabaseUrl 和 supabaseKey 会被忽略。推荐在已有 Supabase 客户端的项目中使用此方式,避免创建多个实例。
配置选项
interface SupabaseAdapterOptions extends IRxDBAdapterOptions {
/** Supabase 项目 URL */
supabaseUrl?: string;
/** Supabase API Key(通常使用 anon key) */
supabaseKey?: string;
/** 已有的 Supabase 客户端实例(优先级高于 URL + Key) */
client?: SupabaseClient;
}
实时同步
适配器通过 Supabase Realtime 订阅 RxDBChange 表的 INSERT 事件,实现跨客户端的实时同步:
客户端 A 修改数据
│
▼
写入实体表 + RxDBChange 表(通过 rxdb_mutations RPC)
│
▼
Supabase Realtime 广播 INSERT 事件
│
▼
客户端 B 收到变更
│
├── clientId === 自身?→ 忽略(避免回声)
│
└── clientId !== 自身?→ 派发 Remote 事件
├── INSERT → EntityRemoteCreatedEvent
├── UPDATE → EntityRemoteUpdatedEvent
└── DELETE → EntityRemoteRemovedEvent
连接与断开
// 启动实时订阅
await rxdb.connect('supabase');
// 断开订阅
await rxdb.disconnect('supabase');
RPC 函数
适配器依赖以下 PostgreSQL 函数(需在 Supabase 数据库中预先创建):
rxdb_mutations
事务性批量操作,同时写入实体表和变更记录表:
// 内部调用示意
const { data } = await client.rpc('rxdb_mutations', {
p_upserts: [{ table: 'todos', schema: 'public', data: [...] }],
p_deletes: [{ table: 'todos', schema: 'public', ids: [...] }],
p_changes: [...], // RxDBChange 记录
p_skip_sync: true // 跳过服务端同步触发器
});
| 参数 | 类型 | 说明 |
|---|---|---|
p_upserts | json[] | 按表分组的 upsert 数据 |
p_deletes | json[] | 按表分组的删除 ID |
p_changes | json[] | 变更记录(写入 RxDBChange 表) |
p_skip_sync | boolean | 是否跳过同步触发器(避免循环) |
树查询 RPC
| 函数 | 参数 | 用途 |
|---|---|---|
get_descendants | root_id, max_level | 递归 CTE 查询子孙节点 |
get_root_descendants | max_level | 查询所有根节点及其子孙 |
get_ancestors | node_id, max_level | 递归 CTE 查询祖先节点 |
Pull/Push 同步
拉取变更
// 基于游标拉取远程变更
const changes = await adapter.pullChanges(sinceId, limit);
- 使用
RxDBChange.id(自增)作为游标 - 支持
repositoryFilter按实体过滤 - 支持
filter(RuleGroup)行级过滤 - 返回
RemoteChange[],按id ASC排序
合并变更
// 事务性双写:实体表 + RxDBChange 表
const maxChangeId = await adapter.mergeChanges(actions);
解析 entityKey 格式 namespace:entity:entityId,通过 rxdb_mutations RPC 实现原子性操作。
关系查询
适配器支持通过 PostgREST 的嵌套查询语法 实现关系数据的自动 JOIN:
| 关系类型 | 支持 | 示例 |
|---|---|---|
| 多对一 (m:1) | ✅ | order.customer |
| 一对一 (1:1) | ✅ | user.profile |
| 一对多 (1:m) | ✅ | customer.orders |
| 多对多 (m:n) | ✅ | post.tags(通过中间表) |
嵌套 WHERE 查询
支持通过点号路径查询关联实体的属性:
// 查询有订单金额 > 100 的客户
const customers = await Customer.find({
where: {
'orders.amount': { $gt: 100 }
}
});
查询操作符映射
| RxDB 操作符 | PostgREST 映射 |
|---|---|
= / != | eq / neq |
< / <= / > / >= | lt / lte / gt / gte |
in / notIn | .in() / .not('in') |
contains | ilike.*value* |
startsWith | ilike.value* |
endsWith | ilike.*value |
between | .gte().lte() |
null / notNull | .is(null) / .not('is', null) |
exists / notExists | 关联表 !inner JOIN |
错误处理
import {
SupabaseSyncError,
SupabaseConfigError,
SupabaseNetworkError,
SupabaseDataError
} from '@aiao/rxdb-adapter-supabase';
try {
await rxdb.connect('supabase');
} catch (error) {
if (error instanceof SupabaseConfigError) {
// 配置错误:URL 或 Key 无效
} else if (error instanceof SupabaseNetworkError) {
// 网络错误:无法连接 Supabase
} else if (error instanceof SupabaseDataError) {
// 数据错误:类型转换失败等
}
}
| 错误类型 | code | 场景 |
|---|---|---|
SupabaseConfigError | CONFIG_ERROR | URL/Key 配置无效 |
SupabaseNetworkError | NETWORK_ERROR | 网络连接失败 |
SupabaseDataError | DATA_ERROR | 数据类型转换错误 |
数据类型转换
从 Supabase 返回的数据会自动转换:
| 属性类型 | 转换规则 |
|---|---|
date | string → Date |
boolean | Boolean(value) |
keyValue | 递归转换嵌套属性 |
json / stringArray / numberArray | 原样保留 |
性能优化
- 使用 QueryCache 模式:
fetchMetadata只传输{id, updatedAt},大幅减少网络流量 - 批量操作:使用
saveMany()/removeMany()代替逐条操作,减少 HTTP 请求数 - 事务性双写:
mutations()通过单次 RPC 调用完成所有操作,保证原子性 - 游标同步:基于自增 ID 而非时间戳,避免重复拉取
故障排查
RPC 函数不存在
ERROR: function rxdb_mutations does not exist
确保已在 Supabase 数据库中创建所需的 RPC 函数(rxdb_mutations、get_descendants 等)。
Realtime 未收到变更
- 确认 Supabase 项目已启用 Realtime
- 确认
RxDBChange表已添加到 Realtime Publication - 检查 RLS(Row Level Security)策略是否允许读取
clientId 冲突
如果多个标签页使用相同的 clientId,会导致变更被错误过滤。确保每个客户端实例具有唯一的 clientId。