PostgreSQL 源码树中的 contrib/test_decoding 子目录里有一个输出插件示例。
输出插件通过动态加载一个共享库来装入,库的基名就是输出插件的名称。普通 的库搜索路径用于定位该库。为了提供所需的输出插件回调,并表明该库实际上 是一个输出插件,它需要提供一个名为 _PG_output_plugin_init 的函数。该函数会接收一个 结构体,需要用各个动作对应的回调函数指针填充它。
typedef struct OutputPluginCallbacks
{
LogicalDecodeStartupCB startup_cb;
LogicalDecodeBeginCB begin_cb;
LogicalDecodeChangeCB change_cb;
LogicalDecodeTruncateCB truncate_cb;
LogicalDecodeCommitCB commit_cb;
LogicalDecodeMessageCB message_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
LogicalDecodeShutdownCB shutdown_cb;
LogicalDecodeFilterPrepareCB filter_prepare_cb;
LogicalDecodeBeginPrepareCB begin_prepare_cb;
LogicalDecodePrepareCB prepare_cb;
LogicalDecodeCommitPreparedCB commit_prepared_cb;
LogicalDecodeRollbackPreparedCB rollback_prepared_cb;
LogicalDecodeStreamStartCB stream_start_cb;
LogicalDecodeStreamStopCB stream_stop_cb;
LogicalDecodeStreamAbortCB stream_abort_cb;
LogicalDecodeStreamPrepareCB stream_prepare_cb;
LogicalDecodeStreamCommitCB stream_commit_cb;
LogicalDecodeStreamChangeCB stream_change_cb;
LogicalDecodeStreamMessageCB stream_message_cb;
LogicalDecodeStreamTruncateCB stream_truncate_cb;
} OutputPluginCallbacks;
typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
begin_cb、change_cb 和 commit_cb 回调是必需的,而 startup_cb、truncate_cb、 message_cb、filter_by_origin_cb 和 shutdown_cb 则是可选的。如果没有设置 truncate_cb,但需要解码 TRUNCATE,则该动作会被忽略。
输出插件也可以定义函数,以支持流式传送大型进行中事务。 stream_start_cb、stream_stop_cb、 stream_abort_cb、stream_commit_cb 和 stream_change_cb 是必需的,而 stream_message_cb 和 stream_truncate_cb 是可选的。如果输出插件还支持两阶 段提交,则 stream_prepare_cb 也是必需的。
输出插件还可以定义函数以支持两阶段提交,这样就可以在 PREPARE TRANSACTION 时解码相应操作。 begin_prepare_cb、prepare_cb、 commit_prepared_cb 和 rollback_prepared_cb 回调是必需的,而 filter_prepare_cb 是可选的。如果输出插件还支持流式传 送大型进行中事务,则 stream_prepare_cb 也是必需的。
为了解码、格式化并输出更改,输出插件可以使用后端的大部分常规基础设施,包 括调用输出函数。只要访问的关系仅限于由 initdb 在 pg_catalog 模式中创建的关系,或者用如下方式标记为用 户提供的目录表的关系,就允许进行只读访问:
ALTER TABLE user_catalog_table SET (user_catalog_table = true); CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
注意,在输出插件中访问用户目录表或普通系统目录表时,只能通过 systable_* 扫描 API 进行。通过 heap_* 扫描 API 访问会报错。此外,任何会导致分配事务 ID 的动作都被禁止。这包括写表、执行 DDL 更改,以及调用 pg_current_xact_id()。
输出插件回调几乎可以用任意格式向消费者传递数据。对于某些用例,例如通过 SQL 查看更改,把数据返回为能够容纳任意数据的数据类型(例如 bytea)会很笨拙。如果输出插件只输出服务器编码中的文本数 据,它可以把 OutputPluginOptions.output_type 设置为 OUTPUT_PLUGIN_TEXTUAL_OUTPUT 而不是 OUTPUT_PLUGIN_BINARY_OUTPUT,并在 启动回调 中声明这一点。在这种情况下,所有数据都必须采用服务器编码,这样才能装入 text datum。断言开启的构建会检查这一点。
输出插件通过它所提供的各类回调获知正在发生的更改。
并发事务按提交顺序解码,并且只有属于某个特定事务的更改,才会在 begin 和 commit 回调之间被解码。 显式或隐式回滚的事务永远不会被解码。成功的保存点会按照它们在该事务中执 行的顺序,被折叠进包含它们的事务中。如果提供了解码它们所需的输出插件回 调,那么使用 PREPARE TRANSACTION 为两阶段提交准备的 事务也会被解码。当前正在被解码的预备事务,也可能会被 ROLLBACK PREPARED 命令并发中止。在这种情况下,该事务 的逻辑解码也会中止。一旦检测到中止并调用 prepare_cb 回调,这类事务的所有更改都会被跳过。因此,即便发生并发中止,一旦后续解 码到 ROLLBACK PREPARED,仍会向输出插件提供足够的信 息,使其能够正确处理它。
只有已经安全刷写到磁盘的事务才会被解码。这可能导致 COMMIT 在紧随其后的 pg_logical_slot_get_changes() 调用中不会立即被解 码,当 synchronous_commit 被设置为 off 时尤其如此。
只要创建复制槽,或者要求某个复制槽开始流式传送更改,就会调用可选的 startup_cb 回调,而不管当前是否已经有准备好输出 的更改。
typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx,
OutputPluginOptions *options,
bool is_init);
当复制槽正在创建时,is_init 参数为真,否则为假。 options 指向一个结构体,输出插件可以在其中设 置选项:
typedef struct OutputPluginOptions
{
OutputPluginOutputType output_type;
bool receive_rewrites;
} OutputPluginOptions;
output_type 必须设置为 OUTPUT_PLUGIN_TEXTUAL_OUTPUT 或 OUTPUT_PLUGIN_BINARY_OUTPUT。另见 Section 47.6.3。如果 receive_rewrites 为真,则在某些 DDL 操作期间由堆重 写产生的更改也会传给输出插件。这对处理 DDL 复制的插件很有用,但需要特 殊处理。
启动回调应当验证 ctx->output_plugin_options 中的选项。如果输出插 件需要保存状态,可以使用 ctx->output_plugin_private 来存储。
当一个此前处于活动状态的复制槽不再使用时,就会调用可选的 shutdown_cb 回调。它可用于释放输出插件私有的资 源。此时未必是在删除该槽,也可能只是停止流式传送。
typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
只要某个已提交事务的开始被解码,就会调用必需的 begin_cb 回调。已中止的事务及其内容永远不会被解 码。
typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
txn 参数包含该事务的元信息,例如它提交时的时间 戳以及它的 XID。
只要事务提交被解码,就会调用必需的 commit_cb 回 调。如果有被修改的行,那么在此之前,所有已修改行的 change_cb 回调都已经被调用过。
typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
对于事务中的每一个单独行修改,都会调用必需的 change_cb 回调;该修改可以是 INSERT、UPDATE 或 DELETE。即使原始命令一次修改了多行,该回调也会为每 一行分别调用一次。change_cb 回调可以访问系统目录 表或用户目录表,以辅助输出行修改的详细信息。如果在解码一个预备但尚未提 交的事务,或者解码一个未提交事务的过程中,该回调也可能因为同一个事务被 同时回滚而报错。在这种情况下,对该已中止事务的逻辑解码会被平稳地停止。
typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Relation relation,
ReorderBufferChange *change);
ctx 和 txn 参数的内容与 begin_cb 和 commit_cb 回调中的 相同;另外还会传入关系描述符 relation,用于指向 该行所属的关系,以及描述行修改的结构体 change。
只有用户定义表中既不是不记录日志的(见 UNLOGGED),也不是临时的(见 TEMPORARY or TEMP)更改,才能通过逻辑解码提 取出来。
可选的 truncate_cb 回调会在解码 TRUNCATE 命令时调用。
typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
int nrelations,
Relation relations[],
ReorderBufferChange *change);
这些参数与 change_cb 回调类似。不过,由于对通过外键 关联的表执行 TRUNCATE 时需要一起执行动作,所以该回调 接收的是关系数组,而不是单个关系。详见 TRUNCATE 语句的说明。
可选的 filter_by_origin_cb 回调用于判定,从 origin_id 重放而来的数据是否为输出插件所关心 的数据。
typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx,
RepOriginId origin_id);
ctx 参数的内容与其他回调相同。除了源本身之外,没有 其他信息可用。如果要表明来自传入节点的更改并不相关,则返回 true,这会 使这些更改被过滤掉;否则返回 false。对于被过滤掉的事务和更改,其他回调 都不会被调用。
在实现级联复制或多向复制方案时,这个回调很有用。按源过滤可以避免在这类 配置中同一更改被来回复制。虽然事务和更改本身也带有源信息,但通过这个 回调来过滤会明显更高效。
只要逻辑解码消息被解码,可选的 message_cb 回调就 会被调用。
typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr message_lsn,
bool transactional,
const char *prefix,
Size message_size,
const char *message);
txn 参数包含事务的元信息,例如事务提交时的时间戳和 XID。但要注意,当消息是非事务性的,并且记录该消息的事务尚未分配 XID 时,它可以为 NULL。message_lsn 给出该消息的 WAL 位置。transactional 表示该消息是否作为事务 性消息发送。与更改回调类似,在解码一个预备但尚未提交的事务,或者解码一 个未提交事务时,该消息回调也可能由于同一事务被同时回滚而报错。在这种情 况下,对该已中止事务的逻辑解码会被平稳地停止。 prefix 是任意的、以空字符结尾的前缀,可用于识别 当前插件感兴趣的消息。最后,message 参数保存了 实际消息,其大小为 message_size。
应特别注意,确保输出插件视为有意义的消息前缀具有唯一性。使用扩展名或输 出插件自身的名称通常是一个不错的选择。
可选的 filter_prepare_cb 回调用于判定,当前两阶段 提交事务中的数据,应当在本次 prepare 阶段解码,还是等到 COMMIT PREPARED 时作为常规单阶段事务再解码。若要 表示应跳过解码,则返回 true;否则返回 false。如果该回调未定义,则默认视为 false(也就是说,不做过滤,所有使用两阶段提交的事 务也都会分两阶段解码)。
typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
TransactionId xid,
const char *gid);
ctx 参数的内容与其他回调相同。 xid 和 gid 参数提供了标 识该事务的两种不同方式。后续的 COMMIT PREPARED 或 ROLLBACK PREPARED 会同时携带这两个标识符,从而让输 出插件可以自行选择使用哪一个。
在解码过程中,每个事务都可能多次调用这个回调;而且对于给定的 xid 和 gid 组合,它每次 都必须给出相同的静态答案。
只要预备事务的开始被解码,就会调用必需的 begin_prepare_cb 回调。 gid 字段是 txn 参数的一部 分,可以在此回调中使用,以检查插件是否已经收到过这个 PREPARE;在这种情况下, 它可以报错,或者跳过该事务余下的更改。
typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
只要一个用于两阶段提交的事务被解码到预备点,就会调用必需的 prepare_cb 回调。如果存在任何被修改的行,那么在此 之前,所有这些行的 change_cb 回调都已经被调用过。 gid 字段是 txn 参数的一 部分,可以在这个回调中使用。
typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr prepare_lsn);
必需的 commit_prepared_cb 回调会在解码到事务的 COMMIT PREPARED 时调用。 gid 字段是 txn 参数的一 部分,可以在此回调中使用。
typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
必需的 rollback_prepared_cb 回调会在解码到事务的 ROLLBACK PREPARED 时调用。 gid 字段是 txn 参数的一 部分,可以在此回调中使用。prepare_end_lsn 和 prepare_time 参数可用于检查插件是否已经收到过这 个 PREPARE TRANSACTION;如果已经收到过,它就可以执 行回滚,否则可以跳过该回滚操作。单独的 gid 并不足够,因为下游节点可能存在一个具有相同 标识符的预备事务。
typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr prepare_end_lsn,
TimestampTz prepare_time);
在打开一个来自进行中事务的流式更改块时,会调用必需的 stream_start_cb 回调。
typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
在关闭一个来自进行中事务的流式更改块时,会调用必需的 stream_stop_cb 回调。
typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
为了中止一个先前已经流式传送过的事务,会调用必需的 stream_abort_cb 回调。
typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr abort_lsn);
stream_prepare_cb 回调用于把一个先前已经流式传送的 事务作为两阶段提交的一部分进行预备。当输出插件同时支持流式传送大型进行 中事务和两阶段提交时,这个回调是必需的。
typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr prepare_lsn);
为了提交一个先前已经流式传送过的事务,会调用必需的 stream_commit_cb 回调。
typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
在发送一个流式更改块中的更改时(由 stream_start_cb 和 stream_stop_cb 调用界定),会调用必需的 stream_change_cb 回调。实际更改内容此时不会显示,因 为事务可能在稍后中止,而我们不会对已中止事务的更改进行解码。
typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Relation relation,
ReorderBufferChange *change);
在发送一个流式更改块中的通用消息时(由 stream_start_cb 和 stream_stop_cb 调用界定),会调用可选的 stream_message_cb 回调。事务性消息的内容此时不会显 示,因为事务可能在稍后中止,而我们不会对已中止事务的更改进行解码。
typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr message_lsn,
bool transactional,
const char *prefix,
Size message_size,
const char *message);
可选的 stream_truncate_cb 回调会在一个流式更改块中 遇到 TRUNCATE 命令时调用(该流式更改块由 stream_start_cb 和 stream_stop_cb 调用界定)。
typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
int nrelations,
Relation relations[],
ReorderBufferChange *change);
这些参数与 stream_change_cb 回调类似。不过,由于对通 过外键关联的表执行 TRUNCATE 时需要一起执行动作,所以 该回调接收的是关系数组,而不是单个关系。详见 TRUNCATE 语句的说明。
为了真正产生输出,输出插件可以在 StringInfo 输出缓冲区 ctx->out 中写入数据,此时位于 begin_cb、 commit_cb 或 change_cb 回调内部。 写入输出缓冲区之前,必须先调 用 OutputPluginPrepareWrite(ctx, last_write);写完 缓冲区之后,必须调用 OutputPluginWrite(ctx, last_write) 来执行写出。 last_write 指示某次写出是否为该回调的最后一次写 出。
下面的示例展示了如何把数据输出给输出插件的消费者:
OutputPluginPrepareWrite(ctx, true); appendStringInfo(ctx->out, "BEGIN %u", txn->xid); OutputPluginWrite(ctx, true);
如果您发现文档中有不正确的内容、与您使用特定功能的经验不符或需要进一步说明,请使用此表单来报告文档问题。