跳到主要内容

rxdb-adapter-supabase

RxDB Supabase 适配器 - 基于 PostgreSQL 的远程同步实现。

特性

  • 完整的 CRUD 操作 - 通过 Repository 模式封装 Supabase API
  • 批量操作 - saveMany / removeMany / mutations 高性能批处理
  • 事务支持 - 基于 PostgreSQL RPC 的原子性操作
  • 实时订阅 - Supabase Realtime 自动推送远程变更
  • 变更追踪 - 自动记录所有数据变更到 RxDBChange
  • 双向同步 - Push/Pull 机制确保本地与远程数据一致
  • 树形结构 - SupabaseTreeRepository 支持树形数据操作

安装

pnpm add @aiao/rxdb-adapter-supabase @supabase/supabase-js

快速开始

1. 配置 RxDB

import { RxDB } from '@aiao/rxdb';
import { RxDBAdapterSupabase } from '@aiao/rxdb-adapter-supabase';
import { RxDBAdapterSqlite } from '@aiao/rxdb-adapter-sqlite';

const rxdb = new RxDB({
dbName: 'my-app',
context: { userId: 'user-123' },
entities: [Todo, User],
sync: {
local: { adapter: 'sqlite' },
remote: { adapter: 'supabase' },
type: SyncType.PullPush
}
});

// 注册适配器
rxdb.adapter('sqlite', db => new RxDBAdapterSqlite(db, { vfs: 'MemoryAsyncVFS' }));

rxdb.adapter(
'supabase',
db =>
new RxDBAdapterSupabase(db, {
supabaseUrl: 'https://your-project.supabase.co',
supabaseKey: 'your-anon-key'
})
);

// 连接
await rxdb.connect('sqlite');
await rxdb.connect('supabase');

2. 数据操作

// 创建
const todo = rxdb.repository(Todo).create();
todo.title = 'Learn RxDB';
await todo.save();

// 查询
const todos = await rxdb.repository(Todo).find({
where: {
combinator: 'and',
rules: [{ field: 'completed', operator: '=', value: false }]
}
});

// 同步
await rxdb.versionManager.push(); // 推送本地变更到远程
await rxdb.versionManager.pull(); // 拉取远程变更到本地
await rxdb.versionManager.sync(); // 双向同步 (pull + push)

测试

前置条件

测试依赖本地 Supabase 环境,确保:

  1. Docker 已安装并运行

  2. 启动 Supabase 测试容器

    cd docker
    bash start.sh

    这会启动并初始化:

    • PostgreSQL (端口 5432)
    • Kong API Gateway (端口 8000)
    • 创建必要的表和函数
  3. 环境变量配置: 测试自动使用以下配置:

    • VITE_SUPABASE_URL=http://localhost:8000
    • VITE_SUPABASE_KEY=<anon-key> (见 docker/.env)

运行测试

# 确保 Supabase 已启动
cd docker && bash start.sh

# 运行测试
nx test rxdb-adapter-supabase

# 带覆盖率
nx test rxdb-adapter-supabase --coverage

# 停止 Supabase
cd docker && bash stop.sh

常见问题

  • Could not find the table 'public.RxDBChange' in the schema cache

    • 原因:Supabase 容器未启动或数据库未初始化
    • 解决cd docker && bash start.sh
  • Connection refused at localhost:8000

    • 原因:Kong API Gateway 未就绪
    • 解决:等待 start.sh 完成健康检查

数据库 Schema

Supabase 适配器需要以下表结构:

RxDBChange (变更追踪表)

CREATE TABLE "RxDBChange" (
"id" BIGSERIAL PRIMARY KEY,
"branchId" TEXT NOT NULL,
"entityType" TEXT NOT NULL,
"entityId" TEXT NOT NULL,
"type" TEXT NOT NULL, -- INSERT/UPDATE/DELETE
"patch" JSONB,
"remoteId" BIGINT,
"revertChangeId" BIGINT,
"createdAt" TIMESTAMPTZ DEFAULT NOW(),
"updatedAt" TIMESTAMPTZ DEFAULT NOW()
);

RxDBBranch (分支元数据)

CREATE TABLE "RxDBBranch" (
"id" TEXT PRIMARY KEY,
"dbName" TEXT NOT NULL,
"context" JSONB,
"lastPushedChangeId" BIGINT,
"lastPullRemoteChangeId" BIGINT,
"createdAt" TIMESTAMPTZ DEFAULT NOW(),
"updatedAt" TIMESTAMPTZ DEFAULT NOW()
);

业务表示例

CREATE TABLE "Todo" (
"id" TEXT PRIMARY KEY,
"title" TEXT NOT NULL,
"completed" BOOLEAN DEFAULT FALSE,
"createdAt" TIMESTAMPTZ DEFAULT NOW(),
"updatedAt" TIMESTAMPTZ DEFAULT NOW()
);

-- 自动创建变更记录的触发器
CREATE TRIGGER rxdb_sync_trigger
AFTER INSERT OR UPDATE OR DELETE ON "Todo"
FOR EACH ROW EXECUTE FUNCTION rxdb_sync_change();

完整 SQL 脚本见 docker/sql/ 目录。

API

RxDBAdapterSupabase

class RxDBAdapterSupabase {
constructor(rxdb: RxDB, options: SupabaseAdapterOptions);

// 连接管理
connect(): Promise<IRxDBAdapter>;
disconnect(): Promise<void>;

// 同步操作
pullChanges(options: PullOptions): Promise<RemoteChange[]>;
pushChanges(changes: LocalChange[]): Promise<PushResult>;

// 批量操作
saveMany<T>(entities: T[]): Promise<T[]>;
removeMany<T>(entities: T[]): Promise<void>;
mutations(map: RxDBMutationsMap): Promise<void>;
}

SupabaseRepository

class SupabaseRepository<T> {
// CRUD
create(): T;
find(options?: FindOptions): Promise<T[]>;
findOne(options?: FindOptions): Promise<T | null>;
count(options?: FindOptions): Promise<number>;

// 批量
saveMany(entities: T[]): Promise<T[]>;
removeMany(entities: T[]): Promise<void>;
}

SupabaseTreeRepository

class SupabaseTreeRepository<T> extends SupabaseRepository<T> {
// 树形操作
findDescendants(entity: T, depth?: number): Promise<T[]>;
findAncestors(entity: T): Promise<T[]>;
findRoots(): Promise<T[]>;
}

架构

┌─────────────────────────────────────────────────────────────┐
│ RxDB Core │
│ ┌────────────┐ ┌────────────┐ ┌──────────────────────┐ │
│ │ Repository │ │VersionMgr │ │ SchemaManager │ │
│ └─────┬──────┘ └─────┬──────┘ └──────────────────────┘ │
└────────┼───────────────┼─────────────────────────────────────┘
│ │
▼ ▼
┌─────────────────────────────────────────────────────────────┐
│ RxDBAdapterSupabase (Remote) │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ SupabaseRepository │ │
│ │ - CRUD 操作 │ │
│ │ - 查询构建 (apply_rule_group) │ │
│ │ - 批量操作 │ │
│ └──────────────────────────────────────────────────────┘ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Sync Engine │ │
│ │ - pullChanges: 拉取远程变更 │ │
│ │ - pushChanges: 推送本地变更 (带压缩) │ │
│ │ - mergeChanges: 通过 RPC 原子性写入 │ │
│ └──────────────────────────────────────────────────────┘ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Realtime Subscription │ │
│ │ - 监听 RxDBChange 表 INSERT 事件 │ │
│ │ - 自动触发 pull 同步 │ │
│ └──────────────────────────────────────────────────────┘ │
└─────────────┬───────────────────────────────────────────────┘


┌─────────────────────┐
│ Supabase (PostgreSQL) │
│ - RxDBChange │
│ - RxDBBranch │
│ - Business Tables │
│ - Triggers/RPCs │
└─────────────────────┘

同步机制

Push 流程

  1. 收集本地变更:从 RxDBChange 表查询 remoteId = null 的记录
  2. 变更压缩
    • INSERT→UPDATE* → 单个 INSERT
    • INSERT→DELETE → 丢弃
    • UPDATE*→DELETE → 单个 DELETE
  3. 调用 RPC:通过 rxdb_mutations() 原子性写入
  4. 更新元数据:记录 lastPushedChangeId

Pull 流程

  1. 查询远程变更id > lastPullRemoteChangeIdbranchId != current
  2. 应用到本地实体表
    • INSERT → 插入
    • UPDATE → 更新
    • DELETE → 删除
  3. 写入本地 RxDBChange:标记 remoteId 避免重复 push
  4. 更新元数据:记录 lastPullRemoteChangeId

开发

# 构建
nx build rxdb-adapter-supabase

# 格式化
nx format:write --projects=rxdb-adapter-supabase

# Lint
nx lint rxdb-adapter-supabase

License

MIT

Classes

ClassDescription
RxDBAdapterSupabaseSupabase 适配器
SupabaseConfigError配置错误
SupabaseDataError数据错误
SupabaseNetworkError网络错误
SupabaseRepositorySupabase Repository 提供对 Supabase 表的 CRUD 操作
SupabaseSyncErrorSupabase 同步错误基类
SupabaseTreeRepositorySupabase Tree Repository 提供树形结构的查询操作

Interfaces

InterfaceDescription
SupabaseAdapterOptionsSupabase Adapter 配置选项

Variables

VariableDescription
ADAPTER_NAME-