rxdb-adapter-supabase
RxDB Supabase 适配器 - 基于 PostgreSQL 的远程同步实现。
特性
- ✅ 完整的 CRUD 操作 - 通过 Repository 模式封装 Supabase API
- ✅ 批量操作 -
saveMany/removeMany/mutations高性能批处理 - ✅ 事务支持 - 基于 PostgreSQL RPC 的原子性操作
- ✅ 实时订阅 - Supabase Realtime 自动推送远程变更
- ✅ 变更追踪 - 自动记录所有数据变更到
RxDBChange表 - ✅ 双向同步 - Push/Pull 机制确保本地与远程数据一致
- ✅ 树形结构 -
SupabaseTreeRepository支持树形数据操作
安装
pnpm add @aiao/rxdb-adapter-supabase @supabase/supabase-js
快速开始
1. 配置 RxDB
import { RxDB } from '@aiao/rxdb';
import { RxDBAdapterSupabase } from '@aiao/rxdb-adapter-supabase';
import { RxDBAdapterSqlite } from '@aiao/rxdb-adapter-sqlite';
const rxdb = new RxDB({
dbName: 'my-app',
context: { userId: 'user-123' },
entities: [Todo, User],
sync: {
local: { adapter: 'sqlite' },
remote: { adapter: 'supabase' },
type: SyncType.PullPush
}
});
// 注册适配器
rxdb.adapter('sqlite', db => new RxDBAdapterSqlite(db, { vfs: 'MemoryAsyncVFS' }));
rxdb.adapter(
'supabase',
db =>
new RxDBAdapterSupabase(db, {
supabaseUrl: 'https://your-project.supabase.co',
supabaseKey: 'your-anon-key'
})
);
// 连接
await rxdb.connect('sqlite');
await rxdb.connect('supabase');
2. 数据操作
// 创建
const todo = rxdb.repository(Todo).create();
todo.title = 'Learn RxDB';
await todo.save();
// 查询
const todos = await rxdb.repository(Todo).find({
where: {
combinator: 'and',
rules: [{ field: 'completed', operator: '=', value: false }]
}
});
// 同步
await rxdb.versionManager.push(); // 推送本地变更到远程
await rxdb.versionManager.pull(); // 拉取远程变更到本地
await rxdb.versionManager.sync(); // 双向同步 (pull + push)
测试
前置条件
测试依赖本地 Supabase 环境,确保:
-
Docker 已安装并运行
-
启动 Supabase 测试容器:
cd docker
bash start.sh这会启动并初始化:
- PostgreSQL (端口 5432)
- Kong API Gateway (端口 8000)
- 创建必要的表和函数
-
环境变量配置: 测试自动使用以下配置:
VITE_SUPABASE_URL=http://localhost:8000VITE_SUPABASE_KEY=<anon-key>(见docker/.env)
运行测试
# 确保 Supabase 已启动
cd docker && bash start.sh
# 运行测试
nx test rxdb-adapter-supabase
# 带覆盖率
nx test rxdb-adapter-supabase --coverage
# 停止 Supabase
cd docker && bash stop.sh
常见问题:
-
❌
Could not find the table 'public.RxDBChange' in the schema cache- 原因:Supabase 容器未启动或数据库未初始化
- 解决:
cd docker && bash start.sh
-
❌
Connection refused at localhost:8000- 原因:Kong API Gateway 未就绪
- 解决:等待
start.sh完成健康检查
数据库 Schema
Supabase 适配器需要以下表结构:
RxDBChange (变更追踪表)
CREATE TABLE "RxDBChange" (
"id" BIGSERIAL PRIMARY KEY,
"branchId" TEXT NOT NULL,
"entityType" TEXT NOT NULL,
"entityId" TEXT NOT NULL,
"type" TEXT NOT NULL, -- INSERT/UPDATE/DELETE
"patch" JSONB,
"remoteId" BIGINT,
"revertChangeId" BIGINT,
"createdAt" TIMESTAMPTZ DEFAULT NOW(),
"updatedAt" TIMESTAMPTZ DEFAULT NOW()
);
RxDBBranch (分支元数据)
CREATE TABLE "RxDBBranch" (
"id" TEXT PRIMARY KEY,
"dbName" TEXT NOT NULL,
"context" JSONB,
"lastPushedChangeId" BIGINT,
"lastPullRemoteChangeId" BIGINT,
"createdAt" TIMESTAMPTZ DEFAULT NOW(),
"updatedAt" TIMESTAMPTZ DEFAULT NOW()
);
业务表示例
CREATE TABLE "Todo" (
"id" TEXT PRIMARY KEY,
"title" TEXT NOT NULL,
"completed" BOOLEAN DEFAULT FALSE,
"createdAt" TIMESTAMPTZ DEFAULT NOW(),
"updatedAt" TIMESTAMPTZ DEFAULT NOW()
);
-- 自动创建变更记录的触发器
CREATE TRIGGER rxdb_sync_trigger
AFTER INSERT OR UPDATE OR DELETE ON "Todo"
FOR EACH ROW EXECUTE FUNCTION rxdb_sync_change();
完整 SQL 脚本见 docker/sql/ 目录。
API
RxDBAdapterSupabase
class RxDBAdapterSupabase {
constructor(rxdb: RxDB, options: SupabaseAdapterOptions);
// 连接管理
connect(): Promise<IRxDBAdapter>;
disconnect(): Promise<void>;
// 同步操作
pullChanges(options: PullOptions): Promise<RemoteChange[]>;
pushChanges(changes: LocalChange[]): Promise<PushResult>;
// 批量操作
saveMany<T>(entities: T[]): Promise<T[]>;
removeMany<T>(entities: T[]): Promise<void>;
mutations(map: RxDBMutationsMap): Promise<void>;
}
SupabaseRepository
class SupabaseRepository<T> {
// CRUD
create(): T;
find(options?: FindOptions): Promise<T[]>;
findOne(options?: FindOptions): Promise<T | null>;
count(options?: FindOptions): Promise<number>;
// 批量
saveMany(entities: T[]): Promise<T[]>;
removeMany(entities: T[]): Promise<void>;
}
SupabaseTreeRepository
class SupabaseTreeRepository<T> extends SupabaseRepository<T> {
// 树形操作
findDescendants(entity: T, depth?: number): Promise<T[]>;
findAncestors(entity: T): Promise<T[]>;
findRoots(): Promise<T[]>;
}
架构
┌─────────────────────────────────────────────────────────────┐
│ RxDB Core │
│ ┌────────────┐ ┌────────────┐ ┌──────────────────────┐ │
│ │ Repository │ │VersionMgr │ │ SchemaManager │ │
│ └─────┬──────┘ └─────┬──────┘ └──────────────────────┘ │
└────────┼───────────────┼─────────────────────────────────────┘
│ │
▼ ▼
┌─────────────────────────────────────────────────────────────┐
│ RxDBAdapterSupabase (Remote) │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ SupabaseRepository │ │
│ │ - CRUD 操作 │ │
│ │ - 查询构建 (apply_rule_group) │ │
│ │ - 批量操作 │ │
│ └──────────────────────────────────────────────────────┘ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Sync Engine │ │
│ │ - pullChanges: 拉取远程变更 │ │
│ │ - pushChanges: 推送本地变更 (带压缩) │ │
│ │ - mergeChanges: 通过 RPC 原子性写入 │ │
│ └──────────────────────────────────────────────────────┘ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Realtime Subscription │ │
│ │ - 监听 RxDBChange 表 INSERT 事件 │ │
│ │ - 自动触发 pull 同步 │ │
│ └──────────────────────────────────────────────────────┘ │
└─────────────┬───────────────────────────────────────────────┘
│
▼
┌─────────────────────┐
│ Supabase (PostgreSQL) │
│ - RxDBChange │
│ - RxDBBranch │
│ - Business Tables │
│ - Triggers/RPCs │
└─────────────────────┘
同步机制
Push 流程
- 收集本地变更:从
RxDBChange表查询remoteId = null的记录 - 变更压缩:
INSERT→UPDATE*→ 单个INSERTINSERT→DELETE→ 丢弃UPDATE*→DELETE→ 单个DELETE
- 调用 RPC:通过
rxdb_mutations()原子性写入 - 更新元数据:记录
lastPushedChangeId
Pull 流程
- 查询远程变更:
id > lastPullRemoteChangeId且branchId != current - 应用到本地实体表:
INSERT→ 插入UPDATE→ 更新DELETE→ 删除
- 写入本地 RxDBChange:标记
remoteId避免重复 push - 更新元数据:记录
lastPullRemoteChangeId
开发
# 构建
nx build rxdb-adapter-supabase
# 格式化
nx format:write --projects=rxdb-adapter-supabase
# Lint
nx lint rxdb-adapter-supabase
License
MIT
Classes
| Class | Description |
|---|---|
| RxDBAdapterSupabase | Supabase 适配器 |
| SupabaseConfigError | 配置错误 |
| SupabaseDataError | 数据错误 |
| SupabaseNetworkError | 网络错误 |
| SupabaseRepository | Supabase Repository 提供对 Supabase 表的 CRUD 操作 |
| SupabaseSyncError | Supabase 同步错误基类 |
| SupabaseTreeRepository | Supabase Tree Repository 提供树形结构的查询操作 |
Interfaces
| Interface | Description |
|---|---|
| SupabaseAdapterOptions | Supabase Adapter 配置选项 |
Variables
| Variable | Description |
|---|---|
| ADAPTER_NAME | - |