受支持版本: 当前版本 (18) / 17 / 16 / 15 / 14
开发版本: devel

47.6. 逻辑解码输出插件 #

PostgreSQL 源码树中的 contrib/test_decoding 子目录里有一个输出插件示例。

47.6.1. 初始化函数 #

输出插件通过动态加载一个共享库来装入,库的基名就是输出插件的名称。普通 的库搜索路径用于定位该库。为了提供所需的输出插件回调,并表明该库实际上 是一个输出插件,它需要提供一个名为 _PG_output_plugin_init 的函数。该函数会接收一个 结构体,需要用各个动作对应的回调函数指针填充它。

typedef struct OutputPluginCallbacks
{
    LogicalDecodeStartupCB startup_cb;
    LogicalDecodeBeginCB begin_cb;
    LogicalDecodeChangeCB change_cb;
    LogicalDecodeTruncateCB truncate_cb;
    LogicalDecodeCommitCB commit_cb;
    LogicalDecodeMessageCB message_cb;
    LogicalDecodeFilterByOriginCB filter_by_origin_cb;
    LogicalDecodeShutdownCB shutdown_cb;
    LogicalDecodeFilterPrepareCB filter_prepare_cb;
    LogicalDecodeBeginPrepareCB begin_prepare_cb;
    LogicalDecodePrepareCB prepare_cb;
    LogicalDecodeCommitPreparedCB commit_prepared_cb;
    LogicalDecodeRollbackPreparedCB rollback_prepared_cb;
    LogicalDecodeStreamStartCB stream_start_cb;
    LogicalDecodeStreamStopCB stream_stop_cb;
    LogicalDecodeStreamAbortCB stream_abort_cb;
    LogicalDecodeStreamPrepareCB stream_prepare_cb;
    LogicalDecodeStreamCommitCB stream_commit_cb;
    LogicalDecodeStreamChangeCB stream_change_cb;
    LogicalDecodeStreamMessageCB stream_message_cb;
    LogicalDecodeStreamTruncateCB stream_truncate_cb;
} OutputPluginCallbacks;

typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);

begin_cbchange_cbcommit_cb 回调是必需的,而 startup_cbtruncate_cbmessage_cbfilter_by_origin_cbshutdown_cb 则是可选的。如果没有设置 truncate_cb,但需要解码 TRUNCATE,则该动作会被忽略。

输出插件也可以定义函数,以支持流式传送大型进行中事务。 stream_start_cbstream_stop_cbstream_abort_cbstream_commit_cbstream_change_cb 是必需的,而 stream_message_cbstream_truncate_cb 是可选的。如果输出插件还支持两阶 段提交,则 stream_prepare_cb 也是必需的。

输出插件还可以定义函数以支持两阶段提交,这样就可以在 PREPARE TRANSACTION 时解码相应操作。 begin_prepare_cbprepare_cbcommit_prepared_cbrollback_prepared_cb 回调是必需的,而 filter_prepare_cb 是可选的。如果输出插件还支持流式传 送大型进行中事务,则 stream_prepare_cb 也是必需的。

47.6.2. 能力 #

为了解码、格式化并输出更改,输出插件可以使用后端的大部分常规基础设施,包 括调用输出函数。只要访问的关系仅限于由 initdbpg_catalog 模式中创建的关系,或者用如下方式标记为用 户提供的目录表的关系,就允许进行只读访问:

ALTER TABLE user_catalog_table SET (user_catalog_table = true);
CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);

注意,在输出插件中访问用户目录表或普通系统目录表时,只能通过 systable_* 扫描 API 进行。通过 heap_* 扫描 API 访问会报错。此外,任何会导致分配事务 ID 的动作都被禁止。这包括写表、执行 DDL 更改,以及调用 pg_current_xact_id()

47.6.3. 输出模式 #

输出插件回调几乎可以用任意格式向消费者传递数据。对于某些用例,例如通过 SQL 查看更改,把数据返回为能够容纳任意数据的数据类型(例如 bytea)会很笨拙。如果输出插件只输出服务器编码中的文本数 据,它可以把 OutputPluginOptions.output_type 设置为 OUTPUT_PLUGIN_TEXTUAL_OUTPUT 而不是 OUTPUT_PLUGIN_BINARY_OUTPUT,并在 启动回调 中声明这一点。在这种情况下,所有数据都必须采用服务器编码,这样才能装入 text datum。断言开启的构建会检查这一点。

47.6.4. 输出插件回调 #

输出插件通过它所提供的各类回调获知正在发生的更改。

并发事务按提交顺序解码,并且只有属于某个特定事务的更改,才会在 begincommit 回调之间被解码。 显式或隐式回滚的事务永远不会被解码。成功的保存点会按照它们在该事务中执 行的顺序,被折叠进包含它们的事务中。如果提供了解码它们所需的输出插件回 调,那么使用 PREPARE TRANSACTION 为两阶段提交准备的 事务也会被解码。当前正在被解码的预备事务,也可能会被 ROLLBACK PREPARED 命令并发中止。在这种情况下,该事务 的逻辑解码也会中止。一旦检测到中止并调用 prepare_cb 回调,这类事务的所有更改都会被跳过。因此,即便发生并发中止,一旦后续解 码到 ROLLBACK PREPARED,仍会向输出插件提供足够的信 息,使其能够正确处理它。

Note

只有已经安全刷写到磁盘的事务才会被解码。这可能导致 COMMIT 在紧随其后的 pg_logical_slot_get_changes() 调用中不会立即被解 码,当 synchronous_commit 被设置为 off 时尤其如此。

47.6.4.1. 启动回调 #

只要创建复制槽,或者要求某个复制槽开始流式传送更改,就会调用可选的 startup_cb 回调,而不管当前是否已经有准备好输出 的更改。

typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx,
                                        OutputPluginOptions *options,
                                        bool is_init);

当复制槽正在创建时,is_init 参数为真,否则为假。 options 指向一个结构体,输出插件可以在其中设 置选项:

typedef struct OutputPluginOptions
{
    OutputPluginOutputType output_type;
    bool        receive_rewrites;
} OutputPluginOptions;

output_type 必须设置为 OUTPUT_PLUGIN_TEXTUAL_OUTPUTOUTPUT_PLUGIN_BINARY_OUTPUT。另见 Section 47.6.3。如果 receive_rewrites 为真,则在某些 DDL 操作期间由堆重 写产生的更改也会传给输出插件。这对处理 DDL 复制的插件很有用,但需要特 殊处理。

启动回调应当验证 ctx->output_plugin_options 中的选项。如果输出插 件需要保存状态,可以使用 ctx->output_plugin_private 来存储。

47.6.4.2. 关闭回调 #

当一个此前处于活动状态的复制槽不再使用时,就会调用可选的 shutdown_cb 回调。它可用于释放输出插件私有的资 源。此时未必是在删除该槽,也可能只是停止流式传送。

typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);

47.6.4.3. 事务开始回调 #

只要某个已提交事务的开始被解码,就会调用必需的 begin_cb 回调。已中止的事务及其内容永远不会被解 码。

typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx,
                                      ReorderBufferTXN *txn);

txn 参数包含该事务的元信息,例如它提交时的时间 戳以及它的 XID。

47.6.4.4. 事务结束回调 #

只要事务提交被解码,就会调用必需的 commit_cb 回 调。如果有被修改的行,那么在此之前,所有已修改行的 change_cb 回调都已经被调用过。

typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
                                       ReorderBufferTXN *txn,
                                       XLogRecPtr commit_lsn);

47.6.4.5. 更改回调 #

对于事务中的每一个单独行修改,都会调用必需的 change_cb 回调;该修改可以是 INSERTUPDATEDELETE。即使原始命令一次修改了多行,该回调也会为每 一行分别调用一次。change_cb 回调可以访问系统目录 表或用户目录表,以辅助输出行修改的详细信息。如果在解码一个预备但尚未提 交的事务,或者解码一个未提交事务的过程中,该回调也可能因为同一个事务被 同时回滚而报错。在这种情况下,对该已中止事务的逻辑解码会被平稳地停止。

typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
                                       ReorderBufferTXN *txn,
                                       Relation relation,
                                       ReorderBufferChange *change);

ctxtxn 参数的内容与 begin_cbcommit_cb 回调中的 相同;另外还会传入关系描述符 relation,用于指向 该行所属的关系,以及描述行修改的结构体 change

Note

只有用户定义表中既不是不记录日志的(见 UNLOGGED),也不是临时的(见 TEMPORARY or TEMP)更改,才能通过逻辑解码提 取出来。

47.6.4.6. 截断回调 #

可选的 truncate_cb 回调会在解码 TRUNCATE 命令时调用。

typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
                                         ReorderBufferTXN *txn,
                                         int nrelations,
                                         Relation relations[],
                                         ReorderBufferChange *change);

这些参数与 change_cb 回调类似。不过,由于对通过外键 关联的表执行 TRUNCATE 时需要一起执行动作,所以该回调 接收的是关系数组,而不是单个关系。详见 TRUNCATE 语句的说明。

47.6.4.7. 源过滤回调 #

可选的 filter_by_origin_cb 回调用于判定,从 origin_id 重放而来的数据是否为输出插件所关心 的数据。

typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx,
                                               RepOriginId origin_id);

ctx 参数的内容与其他回调相同。除了源本身之外,没有 其他信息可用。如果要表明来自传入节点的更改并不相关,则返回 true,这会 使这些更改被过滤掉;否则返回 false。对于被过滤掉的事务和更改,其他回调 都不会被调用。

在实现级联复制或多向复制方案时,这个回调很有用。按源过滤可以避免在这类 配置中同一更改被来回复制。虽然事务和更改本身也带有源信息,但通过这个 回调来过滤会明显更高效。

47.6.4.8. 通用消息回调 #

只要逻辑解码消息被解码,可选的 message_cb 回调就 会被调用。

typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
                                        ReorderBufferTXN *txn,
                                        XLogRecPtr message_lsn,
                                        bool transactional,
                                        const char *prefix,
                                        Size message_size,
                                        const char *message);

txn 参数包含事务的元信息,例如事务提交时的时间戳和 XID。但要注意,当消息是非事务性的,并且记录该消息的事务尚未分配 XID 时,它可以为 NULL。message_lsn 给出该消息的 WAL 位置。transactional 表示该消息是否作为事务 性消息发送。与更改回调类似,在解码一个预备但尚未提交的事务,或者解码一 个未提交事务时,该消息回调也可能由于同一事务被同时回滚而报错。在这种情 况下,对该已中止事务的逻辑解码会被平稳地停止。 prefix 是任意的、以空字符结尾的前缀,可用于识别 当前插件感兴趣的消息。最后,message 参数保存了 实际消息,其大小为 message_size

应特别注意,确保输出插件视为有意义的消息前缀具有唯一性。使用扩展名或输 出插件自身的名称通常是一个不错的选择。

47.6.4.9. 预备过滤回调 #

可选的 filter_prepare_cb 回调用于判定,当前两阶段 提交事务中的数据,应当在本次 prepare 阶段解码,还是等到 COMMIT PREPARED 时作为常规单阶段事务再解码。若要 表示应跳过解码,则返回 true;否则返回 false。如果该回调未定义,则默认视为 false(也就是说,不做过滤,所有使用两阶段提交的事 务也都会分两阶段解码)。

typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
                                              TransactionId xid,
                                              const char *gid);

ctx 参数的内容与其他回调相同。 xidgid 参数提供了标 识该事务的两种不同方式。后续的 COMMIT PREPAREDROLLBACK PREPARED 会同时携带这两个标识符,从而让输 出插件可以自行选择使用哪一个。

在解码过程中,每个事务都可能多次调用这个回调;而且对于给定的 xidgid 组合,它每次 都必须给出相同的静态答案。

47.6.4.10. 预备事务开始回调 #

只要预备事务的开始被解码,就会调用必需的 begin_prepare_cb 回调。 gid 字段是 txn 参数的一部 分,可以在此回调中使用,以检查插件是否已经收到过这个 PREPARE;在这种情况下, 它可以报错,或者跳过该事务余下的更改。

typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
                                             ReorderBufferTXN *txn);

47.6.4.11. 事务预备回调 #

只要一个用于两阶段提交的事务被解码到预备点,就会调用必需的 prepare_cb 回调。如果存在任何被修改的行,那么在此 之前,所有这些行的 change_cb 回调都已经被调用过。 gid 字段是 txn 参数的一 部分,可以在这个回调中使用。

typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
                                        ReorderBufferTXN *txn,
                                        XLogRecPtr prepare_lsn);

47.6.4.12. 预备事务提交回调 #

必需的 commit_prepared_cb 回调会在解码到事务的 COMMIT PREPARED 时调用。 gid 字段是 txn 参数的一 部分,可以在此回调中使用。

typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
                                               ReorderBufferTXN *txn,
                                               XLogRecPtr commit_lsn);

47.6.4.13. 预备事务回滚回调 #

必需的 rollback_prepared_cb 回调会在解码到事务的 ROLLBACK PREPARED 时调用。 gid 字段是 txn 参数的一 部分,可以在此回调中使用。prepare_end_lsnprepare_time 参数可用于检查插件是否已经收到过这 个 PREPARE TRANSACTION;如果已经收到过,它就可以执 行回滚,否则可以跳过该回滚操作。单独的 gid 并不足够,因为下游节点可能存在一个具有相同 标识符的预备事务。

typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx,
                                                 ReorderBufferTXN *txn,
                                                 XLogRecPtr prepare_end_lsn,
                                                 TimestampTz prepare_time);

47.6.4.14. 流开始回调 #

在打开一个来自进行中事务的流式更改块时,会调用必需的 stream_start_cb 回调。

typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx,
                                            ReorderBufferTXN *txn);

47.6.4.15. 流停止回调 #

在关闭一个来自进行中事务的流式更改块时,会调用必需的 stream_stop_cb 回调。

typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx,
                                           ReorderBufferTXN *txn);

47.6.4.16. 流中止回调 #

为了中止一个先前已经流式传送过的事务,会调用必需的 stream_abort_cb 回调。

typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
                                            ReorderBufferTXN *txn,
                                            XLogRecPtr abort_lsn);

47.6.4.17. 流预备回调 #

stream_prepare_cb 回调用于把一个先前已经流式传送的 事务作为两阶段提交的一部分进行预备。当输出插件同时支持流式传送大型进行 中事务和两阶段提交时,这个回调是必需的。

typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx,
                                              ReorderBufferTXN *txn,
                                              XLogRecPtr prepare_lsn);

47.6.4.18. 流提交回调 #

为了提交一个先前已经流式传送过的事务,会调用必需的 stream_commit_cb 回调。

typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx,
                                             ReorderBufferTXN *txn,
                                             XLogRecPtr commit_lsn);

47.6.4.19. 流更改回调 #

在发送一个流式更改块中的更改时(由 stream_start_cbstream_stop_cb 调用界定),会调用必需的 stream_change_cb 回调。实际更改内容此时不会显示,因 为事务可能在稍后中止,而我们不会对已中止事务的更改进行解码。

typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx,
                                             ReorderBufferTXN *txn,
                                             Relation relation,
                                             ReorderBufferChange *change);

47.6.4.20. 流消息回调 #

在发送一个流式更改块中的通用消息时(由 stream_start_cbstream_stop_cb 调用界定),会调用可选的 stream_message_cb 回调。事务性消息的内容此时不会显 示,因为事务可能在稍后中止,而我们不会对已中止事务的更改进行解码。

typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx,
                                              ReorderBufferTXN *txn,
                                              XLogRecPtr message_lsn,
                                              bool transactional,
                                              const char *prefix,
                                              Size message_size,
                                              const char *message);

47.6.4.21. 流截断回调 #

可选的 stream_truncate_cb 回调会在一个流式更改块中 遇到 TRUNCATE 命令时调用(该流式更改块由 stream_start_cbstream_stop_cb 调用界定)。

typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx,
                                               ReorderBufferTXN *txn,
                                               int nrelations,
                                               Relation relations[],
                                               ReorderBufferChange *change);

这些参数与 stream_change_cb 回调类似。不过,由于对通 过外键关联的表执行 TRUNCATE 时需要一起执行动作,所以 该回调接收的是关系数组,而不是单个关系。详见 TRUNCATE 语句的说明。

47.6.5. 产生输出的函数 #

为了真正产生输出,输出插件可以在 StringInfo 输出缓冲区 ctx->out 中写入数据,此时位于 begin_cbcommit_cbchange_cb 回调内部。 写入输出缓冲区之前,必须先调 用 OutputPluginPrepareWrite(ctx, last_write);写完 缓冲区之后,必须调用 OutputPluginWrite(ctx, last_write) 来执行写出。 last_write 指示某次写出是否为该回调的最后一次写 出。

下面的示例展示了如何把数据输出给输出插件的消费者:

OutputPluginPrepareWrite(ctx, true);
appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
OutputPluginWrite(ctx, true);

提交更正

如果您发现文档中有不正确的内容、与您使用特定功能的经验不符或需要进一步说明,请使用此表单来报告文档问题。