实时

Postgres 变更

使用 Supabase Realtime 监听 Postgres 变更。


让我们探索如何使用 Realtime 的 Postgres 变更功能来监听数据库事件。

快速上手#

在本示例中,我们将设置一个数据库表,使用行级别安全策略保护它,并使用 Supabase 客户端库订阅所有变更。

1

设置一个包含 'todos' 表的 Supabase 项目

创建新项目 在 Supabase 控制台中。

项目准备好后,在你的 Supabase 数据库中创建一个表。你可以使用表格界面或 SQL 编辑器 来完成。

1
-- Create a table called "todos"
2
-- with a column to store tasks.
3
create table todos (
4
id serial primary key,
5
task text
6
);
2

允许匿名访问

在本示例中,我们将为该表启用 行级别安全策略 并允许匿名访问。在生产环境中,请务必使用适当的权限来保护你的应用程序。

1
-- Turn on security
2
alter table "todos"
3
enable row level security;
4
5
-- Allow anonymous access
6
create policy "Allow anonymous access"
7
on todos
8
for select
9
to anon
10
using (true);
3

启用 Postgres 复制

转到你的项目的 发布设置,并在 supabase_realtime 下,切换你想要监听的表。

或者,通过运行给定的 SQL 将表添加到 supabase_realtime 发布中

1
alter publication supabase_realtime
2
add table your_table_name;
4

安装客户端

安装 Supabase JavaScript 客户端。

1
npm install @supabase/supabase-js
5

创建客户端

此客户端将用于监听 Postgres 变更。

1
import { } from '@supabase/supabase-js'
2
3
const = (
4
'https://<project>.supabase.co',
5
'<sb_publishable_... or anon key>'
6
)
6

按 schema 监听变更

通过将 schema 属性设置为 'public' 和事件名称设置为 *,监听 public schema 中所有表的变更。事件名称可以是以下之一

  • INSERT
  • UPDATE
  • DELETE
  • *

通道名称可以是任何字符串,但不能是 'realtime'。

1
const =
2
.('schema-db-changes')
3
.(
4
'postgres_changes',
5
{
6
: '*',
7
: 'public',
8
},
9
() => .()
10
)
11
.()
7

插入虚拟数据

现在我们可以向我们的表添加一些数据,这将触发 channelA 事件处理程序。

1
insert into todos (task)
2
values
3
('Change!');

用法#

你可以使用 Supabase 客户端库来订阅数据库变更。

监听特定 schema#

使用 schema 参数订阅特定 schema 事件

1
const changes = supabase
2
.channel('schema-db-changes')
3
.on(
4
'postgres_changes',
5
{
6
schema: 'public', // Subscribes to the "public" schema in Postgres
7
event: '*', // Listen to all changes
8
},
9
(payload) => console.log(payload)
10
)
11
.subscribe()

通道名称可以是任何字符串,但不能是 'realtime'。

监听 INSERT 事件#

使用 event 参数仅监听数据库 INSERT

1
const changes = supabase
2
.channel('schema-db-changes')
3
.on(
4
'postgres_changes',
5
{
6
event: 'INSERT', // Listen only to INSERTs
7
schema: 'public',
8
},
9
(payload) => console.log(payload)
10
)
11
.subscribe()

通道名称可以是任何字符串,但不能是 'realtime'。

监听 UPDATE 事件#

使用 event 参数仅监听数据库 UPDATE

1
const changes = supabase
2
.channel('schema-db-changes')
3
.on(
4
'postgres_changes',
5
{
6
event: 'UPDATE', // Listen only to UPDATEs
7
schema: 'public',
8
},
9
(payload) => console.log(payload)
10
)
11
.subscribe()

通道名称可以是任何字符串,但不能是 'realtime'。

监听 DELETE 事件#

使用 event 参数仅监听数据库 DELETE

1
const changes = supabase
2
.channel('schema-db-changes')
3
.on(
4
'postgres_changes',
5
{
6
event: 'DELETE', // Listen only to DELETEs
7
schema: 'public',
8
},
9
(payload) => console.log(payload)
10
)
11
.subscribe()

通道名称可以是任何字符串,但不能是 'realtime'。

监听特定表#

使用 table 参数订阅特定表事件

1
const changes = supabase
2
.channel('table-db-changes')
3
.on(
4
'postgres_changes',
5
{
6
event: '*',
7
schema: 'public',
8
table: 'todos',
9
},
10
(payload) => console.log(payload)
11
)
12
.subscribe()

通道名称可以是任何字符串,但不能是 'realtime'。

监听多个变更#

要使用相同的通道监听不同的事件和 schema/table/filter 组合

1
const channel = supabase
2
.channel('db-changes')
3
.on(
4
'postgres_changes',
5
{
6
event: '*',
7
schema: 'public',
8
table: 'messages',
9
},
10
(payload) => console.log(payload)
11
)
12
.on(
13
'postgres_changes',
14
{
15
event: 'INSERT',
16
schema: 'public',
17
table: 'users',
18
},
19
(payload) => console.log(payload)
20
)
21
.subscribe()

过滤特定变更#

使用 filter 参数进行精细的变更

1
const changes = supabase
2
.channel('table-filter-changes')
3
.on(
4
'postgres_changes',
5
{
6
event: 'INSERT',
7
schema: 'public',
8
table: 'todos',
9
filter: 'id=eq.1',
10
},
11
(payload) => console.log(payload)
12
)
13
.subscribe()

可用过滤器#

Realtime 提供过滤器,以便你可以指定客户端接收的数据的粒度级别。

等于 (eq)#

要监听表中的列值等于客户端指定值时的变更

1
const channel = supabase
2
.channel('changes')
3
.on(
4
'postgres_changes',
5
{
6
event: 'UPDATE',
7
schema: 'public',
8
table: 'messages',
9
filter: 'body=eq.hey',
10
},
11
(payload) => console.log(payload)
12
)
13
.subscribe()

此过滤器使用 Postgres 的 = 过滤器。

不等于 (neq)#

要监听表中的列值不等于客户端指定值时的变更

1
const channel = supabase
2
.channel('changes')
3
.on(
4
'postgres_changes',
5
{
6
event: 'INSERT',
7
schema: 'public',
8
table: 'messages',
9
filter: 'body=neq.bye',
10
},
11
(payload) => console.log(payload)
12
)
13
.subscribe()

此过滤器使用 Postgres 的 != 过滤器。

小于 (lt)#

要监听表中的列值小于客户端指定值时的变更

1
const channel = supabase
2
.channel('changes')
3
.on(
4
'postgres_changes',
5
{
6
event: 'INSERT',
7
schema: 'public',
8
table: 'profiles',
9
filter: 'age=lt.65',
10
},
11
(payload) => console.log(payload)
12
)
13
.subscribe()

此过滤器使用 Postgres 的 < 过滤器,因此适用于非数字类型。请确保检查比较数据的预期行为类型。

小于或等于 (lte)#

要监听表中的列值小于或等于客户端指定值时的变更

1
const channel = supabase
2
.channel('changes')
3
.on(
4
'postgres_changes',
5
{
6
event: 'UPDATE',
7
schema: 'public',
8
table: 'profiles',
9
filter: 'age=lte.65',
10
},
11
(payload) => console.log(payload)
12
)
13
.subscribe()

此过滤器使用 Postgres 的 <= 过滤器,因此适用于非数字类型。请确保检查比较数据的预期行为类型。

大于 (gt)#

要监听表中的列值大于客户端指定值时的变更

1
const channel = supabase
2
.channel('changes')
3
.on(
4
'postgres_changes',
5
{
6
event: 'INSERT',
7
schema: 'public',
8
table: 'products',
9
filter: 'quantity=gt.10',
10
},
11
(payload) => console.log(payload)
12
)
13
.subscribe()

此过滤器使用 Postgres 的 > 过滤器,因此适用于非数字类型。请确保检查比较数据的预期行为类型。

大于或等于 (gte)#

要监听表中的列值大于或等于客户端指定值时的变更

1
const channel = supabase
2
.channel('changes')
3
.on(
4
'postgres_changes',
5
{
6
event: 'INSERT',
7
schema: 'public',
8
table: 'products',
9
filter: 'quantity=gte.10',
10
},
11
(payload) => console.log(payload)
12
)
13
.subscribe()

此过滤器使用 Postgres 的 >= 过滤器,因此适用于非数字类型。请确保检查比较数据的预期行为类型。

包含在列表中 (in)#

要监听表中的列值等于任何客户端指定值时的变更

1
const channel = supabase
2
.channel('changes')
3
.on(
4
'postgres_changes',
5
{
6
event: 'INSERT',
7
schema: 'public',
8
table: 'colors',
9
filter: 'name=in.(red, blue, yellow)',
10
},
11
(payload) => console.log(payload)
12
)
13
.subscribe()

此过滤器使用 Postgres 的 = ANY。Realtime 允许此过滤器的最大值为 100 个值。

接收 old 记录#

默认情况下,仅发送 new 记录变更,但如果你想在 UPDATEDELETE 记录时接收 old 记录(先前值),你可以将表的 replica identity 设置为 full

1
alter table
2
messages replica identity full;

私有 schema#

Postgres 变更可开箱即用地用于 public schema 中的表。你可以通过将表 SELECT 权限授予访问令牌中找到的数据库角色来监听你的私有 schema 中的表。你可以运行类似以下查询

1
grant select on "non_private_schema"."some_table" to authenticated;

自定义令牌#

你可能选择自行签名令牌以自定义可以在你的 RLS 策略中检查的声明。

你的项目 JWT 密钥可以在你的控制台的 项目 API 密钥 中找到。

要将你自己的 JWT 与 Realtime 一起使用,请确保在实例化 Supabase 客户端后并在连接到通道之前设置令牌。

1
const { createClient } = require('@supabase/supabase-js')
2
3
const supabase = createClient(process.env.SUPABASE_URL, process.env.SUPABASE_KEY, {})
4
5
// Set your custom JWT here
6
supabase.realtime.setAuth('your-custom-jwt')
7
8
const channel = supabase
9
.channel('db-changes')
10
.on(
11
'postgres_changes',
12
{
13
event: '*',
14
schema: 'public',
15
table: 'messages',
16
filter: 'body=eq.bye',
17
},
18
(payload) => console.log(payload)
19
)
20
.subscribe()

刷新令牌#

你需要自行刷新令牌,但一旦生成,就可以将它们传递给 Realtime。

例如,如果你使用的是 supabase-js v2 客户端,则可以这样传递你的令牌

1
// Client setup
2
3
supabase.realtime.setAuth('fresh-token')

限制#

删除事件不可过滤#

在跟踪 Postgres 变更时,你无法过滤删除事件。此限制是由于从 Postgres 拉取变更的方式造成的。

表名中的空格#

Realtime 当前不适用于表名包含空格的情况。

数据库实例和 Realtime 性能#

Realtime 系统通常需要预见性,因为它们的扩展动态性。对于 Postgres 变更 功能,每个变更事件都必须检查订阅用户是否具有访问权限。例如,如果你有 100 个用户订阅一个表,并且你进行一次插入,那么它将触发 100 次“读取”:每个用户一次。

可能存在数据库瓶颈,从而限制消息吞吐量。如果你的数据库无法快速授权变更,则变更将被延迟,直到你收到超时为止。

数据库变更是在单个线程上处理的,以保持变更顺序。这意味着计算升级对 Postgres 变更订阅的性能影响不大。你可以在下面估算数据库的预期最大吞吐量。

如果你正在大规模使用 Postgres 变更,你应该考虑使用没有 RLS 和过滤器的单独“公共”表。或者,你可以仅在服务器端使用 Realtime,然后使用 Realtime 广播将变更重新流式传输到你的客户端。

输入你的数据库设置以估算你的实例的最大吞吐量

不要忘记运行你自己的基准测试,以确保性能对你的用例是可以接受的。

我们正在对 Realtime 的 Postgres 变更进行许多改进。如果你不确定你的用例的性能,请使用 支持表单 联系我们,我们将很乐意帮助你。我们拥有一支可以就最适合你的用例的解决方案向你提供建议的工程师团队。