Debezium连接器与Kafka Connect框架一起使用,以捕获数据库中的更改并生成更改事件。然后,Kafka Connect工作程序将为连接器配置的转换应用于连接器生成的每个消息,使用工作程序的转换器将每个消息键和值序列化为二进制形式,最后将每个消息写入正确的Kafka主题。

Debezium的PostgreSQL连接器包含两个不同的部分,它们可以一起工作,以便能够读取和处理服务器更改,必须在PostgreSQL服务器中安装和配置的逻辑解码输出插件,其中之一:

  • decoderbufs(由Debezium社区维护,基于ProtoBuf)
  • wal2json(由wal2json社区维护,基于JSON)
  • pgoutput,PostgreSQL 10+中的标准逻辑解码插件(由Postgres社区维护,由Postgres自身用于逻辑复制);该插件始终存在,这意味着不必安装任何其他库,并且Debezium连接器将直接将原始复制事件流解释为更改事件。

由于我虚拟机安装的是PostgreSQL 9.6.8版本,所以并不支持pgoutput插件,所以需要额外安装。Debezium官网有安装wal2json的教程,为了方便起见,这里安装wal2json插件。

官方文档:

Debezium connector for PostgreSQL

Kafka Connect

wal2json Installation

  1. sudo yum install wal2json12
  2. cd /opt/module
  3. git clone https://github.com/eulerto/wal2json -b master --single-branch
  4. cd wal2json
  5. make
  6. make install

编译成功的信息显示:

  1. Cloning into 'wal2json'...
  2. remote: Counting objects: 445, done.
  3. remote: Total 445 (delta 0), reused 0 (delta 0), pack-reused 445
  4. Receiving objects: 100% (445/445), 180.70 KiB | 0 bytes/s, done.
  5. Resolving deltas: 100% (317/317), done.
  6. Note: checking out 'd2b7fef021c46e0d429f2c1768de361069e58696'.
  7. You are in 'detached HEAD' state. You can look around, make experimental
  8. changes and commit them, and you can discard any commits you make in this
  9. state without impacting any branches by performing another checkout.
  10. If you want to create a new branch to retain commits you create, you may
  11. do so (now or later) by using -b with the checkout command again. Example:
  12. git checkout -b new_branch_name
  13. HEAD is now at d2b7fef... Improve style
  14. gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -g -pipe -Wall -Wp,-D_FORTIFY_SOURCE=2 -fexceptions -fstack-protector-strong --param=ssp-buffer-size=4 -grecord-gcc-switches -m64 -mtune=generic -fPIC -I. -I./ -I/usr/pgsql-9.6/include/server -I/usr/pgsql-9.6/include/internal -D_GNU_SOURCE -I/usr/include/libxml2 -I/usr/include -c -o wal2json.o wal2json.c
  15. gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -g -pipe -Wall -Wp,-D_FORTIFY_SOURCE=2 -fexceptions -fstack-protector-strong --param=ssp-buffer-size=4 -grecord-gcc-switches -m64 -mtune=generic -fPIC -L/usr/pgsql-9.6/lib -Wl,--as-needed -L/usr/lib64 -Wl,--as-needed -Wl,-rpath,'/usr/pgsql-9.6/lib',--enable-new-dtags -shared -o wal2json.so wal2json.o
  16. /usr/bin/mkdir -p '/usr/pgsql-9.6/lib'
  17. /usr/bin/install -c -m 755 wal2json.so '/usr/pgsql-9.6/lib/'

如果在make时,检测到没有权限,则使用root账户执行命令chmod 777 -R /dic

PostgreSQL Server Configuration

Setting up libraries, WAL and replication parameters

编辑$PGDATA目录中postgresql.conf

  1. vi $PGDATA/postgresql.conf
  2. #编辑内容如下:
  3. listen_addresses = '*'
  4. port = 5432
  5. wal_level = logical
  6. max_wal_senders = 8
  7. wal_keep_segments = 4
  8. max_replication_slots = 4
  9. shared_preload_libraries = 'wal2json'

Setting up replication permissions

编辑$PGDATA目录中pg_hba.conf文件

  1. vi $PGDATA/pg_hba.conf
  2. #编辑内容如下:
  3. # IPv4 local connections:
  4. host all all 0.0.0.0/0 md5
  5. # replication privilege.
  6. local replication postgres trust
  7. host replication postgres 127.0.0.1/32 trust
  8. host replication postgres ::1/128 trust
  9. host replication postgres 192.168.142.102/32 trust

编辑完以上两文件,重启数据库服务:

  1. pg_ctl restart

Database Test Environment Set-up

  1. --切换到postgres用户,进入到postgresql交互式命令行
  2. [postgres@hadoop102 monkey]$ psql
  3. --创建测试库和测试表
  4. postgres=# CREATE DATABASE test;
  5. postgres=# \c test;
  6. postgres=# CREATE TABLE test_table (
  7. id char(10) NOT NULL,
  8. code char(10),
  9. PRIMARY KEY (id)
  10. );

Decoding Output Plug-in Test

  • 使用wal2json,为数据库test创建一个名叫test_slot的slot
  1. pg_recvlogical -d test --slot test_slot --create-slot -P wal2json
  • 开始使用 test_slot 对数据库 test进行数据streaming变化的监测
  1. pg_recvlogical -d test --slot test_slot --start -o pretty-print=1 -f -
  • 对表test_tableINSERT/UPDATE/DELETE 操作
  1. test=# INSERT INTO test_table (id, code) VALUES('id1', 'code1');
  2. INSERT 0 1
  3. test=# update test_table set code='code2' where id='id1';
  4. UPDATE 1
  5. test=# delete from test_table where id='id1';
  6. DELETE 1

在监测窗口会接收到如下信息:

Output for _INSERT_ event

  1. {
  2. "change": [
  3. {
  4. "kind": "insert",
  5. "schema": "public",
  6. "table": "test_table",
  7. "columnnames": ["id", "code"],
  8. "columntypes": ["character(10)", "character(10)"],
  9. "columnvalues": ["id1 ", "code1 "]
  10. }
  11. ]
  12. }

Output for _UPDATE_ event

  1. {
  2. "change": [
  3. {
  4. "kind": "update",
  5. "schema": "public",
  6. "table": "test_table",
  7. "columnnames": ["id", "code"],
  8. "columntypes": ["character(10)", "character(10)"],
  9. "columnvalues": ["id1 ", "code2 "],
  10. "oldkeys": {
  11. "keynames": ["id"],
  12. "keytypes": ["character(10)"],
  13. "keyvalues": ["id1 "]
  14. }
  15. }
  16. ]
  17. }

Output for _DELETE_ event

  1. {
  2. "change": [
  3. {
  4. "kind": "delete",
  5. "schema": "public",
  6. "table": "test_table",
  7. "oldkeys": {
  8. "keynames": ["id"],
  9. "keytypes": ["character(10)"],
  10. "keyvalues": ["id1 "]
  11. }
  12. }
  13. ]
  14. }

当测试完成,对数据库test进行监测的test_slot也可以被移除:

  1. pg_recvlogical -d test --slot test_slot --drop-slot

至此,wal2json插件算是安装成功并测试通过了。

Debezium PostgreSQL Connector相关配置

Debezium PostgreSQL Connector 安装

connector’s plug-in archive

注意版本问题:目前的稳定版为1.2.0,我下载的是1.0.3

将下载好的debezium-connector-postgres-1.0.3.Final-plugin.tar.gz文件解压到kafka对应的connect目录下

  1. [monkey@hadoop102 kafka]$ cd /opt/module/kafka
  2. [monkey@hadoop102 kafka]$ mkdir connect
  3. [monkey@hadoop102 kafka]$ cd /opt/software
  4. [monkey@hadoop102 software]$ tar -zxvf debezium-connector-postgres-1.0.3.Final-plugin.tar.gz -C /opt/module/kafka/connect

Kafka Connect目前支持两种模式: standalone(单进程)和distributed。

由于我是单机,所以用standalone模式来测试。

编辑worker配置文件:

  1. [monkey@hadoop102 kafka]$ cd /opt/module/kafka/config
  2. [monkey@hadoop102 kafka]$ vi connect-standalone.properties
  3. # 编辑最后一行plugin.path,路径为Debezium connector的jar file所在目录,不用配置到最底层目录(注意可配置多个路径,单个路径也要以,结尾)
  4. plugin.path=/opt/module/kafka/connect,

image-20200813103317383.png

Connector 配置样例

配置样例有两种形式:本地编辑properties文件和使用POST方式提交到Kafka Connect 服务

1、properties文件格式

  1. name=student-connector
  2. connector.class=io.debezium.connector.postgresql.PostgresConnector
  3. tasks.max=1
  4. database.hostname=192.168.142.102
  5. database.port=5432
  6. database.user=postgres
  7. database.password=postgres
  8. database.dbname=test
  9. database.server.name=info
  10. table.whitelist=public.student
  11. plugin.name=wal2json
  12. slot.name=my_slot

2、使用POST方式,提交JSON格式文件

  1. {
  2. "name": "student-connector",
  3. "config": {
  4. "name": "student-connector",
  5. "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  6. "tasks.max": "1",
  7. "database.hostname": "192.168.142.102",
  8. "database.port": "5432",
  9. "database.user": "postgres",
  10. "database.password": "postgres",
  11. "database.dbname" : "test",
  12. "database.server.name": "info",
  13. "table.whitelist": "public.student",
  14. "plugin.name": "wal2json",
  15. "slot.name":"my_slot"
  16. }
  17. }

以上的slot.name为从插件和数据库实例进行流式更改而创建的Postgres逻辑解码插槽的名称。
值必须符合Postgres复制插槽的命名规则,该规则指出:“每个复制插槽都有一个名称,该名称可以包含小写字母,数字和下划线字符”。不指定默认为debezium,如果需要添加多个connector,不指定的话,会报错:
Caused by: org.postgresql.util.PSQLException: ERROR: replication slot "debezium" is active for PID 52197

standalone模式测试

1、开启connector

  1. [monkey@hadoop102 kafka]$ bin/connect-standalone.sh config/connect-standalone.properties postgres-student.properties

2、操作PostgreSQL数据库test下的student表

  1. test=# INSERT INTO student (id, name) VALUES('5', 'sam');
  2. INSERT 0 1
  3. test=# update student set name = 'ethan' where id = '5';
  4. UPDATE 1
  5. test=# delete from student where id='5';
  6. DELETE 1

3、启用kafka消费者,查看对应topic的消息接收情况

  1. [monkey@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic info.public.student

Output for _INSERT_ event

  1. {
  2. "schema":{
  3. "type":"struct",
  4. "fields":[
  5. {
  6. "type":"struct",
  7. "fields":[
  8. {
  9. "type":"string",
  10. "optional":false,
  11. "field":"id"
  12. },
  13. {
  14. "type":"string",
  15. "optional":true,
  16. "field":"name"
  17. }
  18. ],
  19. "optional":true,
  20. "name":"info.public.student.Value",
  21. "field":"before"
  22. },
  23. {
  24. "type":"struct",
  25. "fields":[
  26. {
  27. "type":"string",
  28. "optional":false,
  29. "field":"id"
  30. },
  31. {
  32. "type":"string",
  33. "optional":true,
  34. "field":"name"
  35. }
  36. ],
  37. "optional":true,
  38. "name":"info.public.student.Value",
  39. "field":"after"
  40. },
  41. {
  42. "type":"struct",
  43. "fields":[
  44. {
  45. "type":"string",
  46. "optional":false,
  47. "field":"version"
  48. },
  49. {
  50. "type":"string",
  51. "optional":false,
  52. "field":"connector"
  53. },
  54. {
  55. "type":"string",
  56. "optional":false,
  57. "field":"name"
  58. },
  59. {
  60. "type":"int64",
  61. "optional":false,
  62. "field":"ts_ms"
  63. },
  64. {
  65. "type":"string",
  66. "optional":true,
  67. "name":"io.debezium.data.Enum",
  68. "version":1,
  69. "parameters":{
  70. "allowed":"true,last,false"
  71. },
  72. "default":"false",
  73. "field":"snapshot"
  74. },
  75. {
  76. "type":"string",
  77. "optional":false,
  78. "field":"db"
  79. },
  80. {
  81. "type":"string",
  82. "optional":false,
  83. "field":"schema"
  84. },
  85. {
  86. "type":"string",
  87. "optional":false,
  88. "field":"table"
  89. },
  90. {
  91. "type":"int64",
  92. "optional":true,
  93. "field":"txId"
  94. },
  95. {
  96. "type":"int64",
  97. "optional":true,
  98. "field":"lsn"
  99. },
  100. {
  101. "type":"int64",
  102. "optional":true,
  103. "field":"xmin"
  104. }
  105. ],
  106. "optional":false,
  107. "name":"io.debezium.connector.postgresql.Source",
  108. "field":"source"
  109. },
  110. {
  111. "type":"string",
  112. "optional":false,
  113. "field":"op"
  114. },
  115. {
  116. "type":"int64",
  117. "optional":true,
  118. "field":"ts_ms"
  119. }
  120. ],
  121. "optional":false,
  122. "name":"info.public.student.Envelope"
  123. },
  124. "payload":{
  125. "before":null,
  126. "after":{
  127. "id":"5 ",
  128. "name":"sam "
  129. },
  130. "source":{
  131. "version":"1.0.3.Final",
  132. "connector":"postgresql",
  133. "name":"info",
  134. "ts_ms":1583920562395,
  135. "snapshot":"false",
  136. "db":"test",
  137. "schema":"public",
  138. "table":"student",
  139. "txId":1760,
  140. "lsn":23397480,
  141. "xmin":null
  142. },
  143. "op":"c",
  144. "ts_ms":1583920562442
  145. }
  146. }

Output for _UPDATE_ event

  1. {
  2. "schema":{
  3. "type":"struct",
  4. "fields":[
  5. {
  6. "type":"struct",
  7. "fields":[
  8. {
  9. "type":"string",
  10. "optional":false,
  11. "field":"id"
  12. },
  13. {
  14. "type":"string",
  15. "optional":true,
  16. "field":"name"
  17. }
  18. ],
  19. "optional":true,
  20. "name":"info.public.student.Value",
  21. "field":"before"
  22. },
  23. {
  24. "type":"struct",
  25. "fields":[
  26. {
  27. "type":"string",
  28. "optional":false,
  29. "field":"id"
  30. },
  31. {
  32. "type":"string",
  33. "optional":true,
  34. "field":"name"
  35. }
  36. ],
  37. "optional":true,
  38. "name":"info.public.student.Value",
  39. "field":"after"
  40. },
  41. {
  42. "type":"struct",
  43. "fields":[
  44. {
  45. "type":"string",
  46. "optional":false,
  47. "field":"version"
  48. },
  49. {
  50. "type":"string",
  51. "optional":false,
  52. "field":"connector"
  53. },
  54. {
  55. "type":"string",
  56. "optional":false,
  57. "field":"name"
  58. },
  59. {
  60. "type":"int64",
  61. "optional":false,
  62. "field":"ts_ms"
  63. },
  64. {
  65. "type":"string",
  66. "optional":true,
  67. "name":"io.debezium.data.Enum",
  68. "version":1,
  69. "parameters":{
  70. "allowed":"true,last,false"
  71. },
  72. "default":"false",
  73. "field":"snapshot"
  74. },
  75. {
  76. "type":"string",
  77. "optional":false,
  78. "field":"db"
  79. },
  80. {
  81. "type":"string",
  82. "optional":false,
  83. "field":"schema"
  84. },
  85. {
  86. "type":"string",
  87. "optional":false,
  88. "field":"table"
  89. },
  90. {
  91. "type":"int64",
  92. "optional":true,
  93. "field":"txId"
  94. },
  95. {
  96. "type":"int64",
  97. "optional":true,
  98. "field":"lsn"
  99. },
  100. {
  101. "type":"int64",
  102. "optional":true,
  103. "field":"xmin"
  104. }
  105. ],
  106. "optional":false,
  107. "name":"io.debezium.connector.postgresql.Source",
  108. "field":"source"
  109. },
  110. {
  111. "type":"string",
  112. "optional":false,
  113. "field":"op"
  114. },
  115. {
  116. "type":"int64",
  117. "optional":true,
  118. "field":"ts_ms"
  119. }
  120. ],
  121. "optional":false,
  122. "name":"info.public.student.Envelope"
  123. },
  124. "payload":{
  125. "before":{
  126. "id":"5 ",
  127. "name":null
  128. },
  129. "after":{
  130. "id":"5 ",
  131. "name":"ethan "
  132. },
  133. "source":{
  134. "version":"1.0.3.Final",
  135. "connector":"postgresql",
  136. "name":"info",
  137. "ts_ms":1583920898322,
  138. "snapshot":"false",
  139. "db":"test",
  140. "schema":"public",
  141. "table":"student",
  142. "txId":1761,
  143. "lsn":23398864,
  144. "xmin":null
  145. },
  146. "op":"u",
  147. "ts_ms":1583920898326
  148. }
  149. }

Output for _DEETE_ event

  1. {
  2. "schema":{
  3. "type":"struct",
  4. "fields":[
  5. {
  6. "type":"struct",
  7. "fields":[
  8. {
  9. "type":"string",
  10. "optional":false,
  11. "field":"id"
  12. },
  13. {
  14. "type":"string",
  15. "optional":true,
  16. "field":"name"
  17. }
  18. ],
  19. "optional":true,
  20. "name":"info.public.student.Value",
  21. "field":"before"
  22. },
  23. {
  24. "type":"struct",
  25. "fields":[
  26. {
  27. "type":"string",
  28. "optional":false,
  29. "field":"id"
  30. },
  31. {
  32. "type":"string",
  33. "optional":true,
  34. "field":"name"
  35. }
  36. ],
  37. "optional":true,
  38. "name":"info.public.student.Value",
  39. "field":"after"
  40. },
  41. {
  42. "type":"struct",
  43. "fields":[
  44. {
  45. "type":"string",
  46. "optional":false,
  47. "field":"version"
  48. },
  49. {
  50. "type":"string",
  51. "optional":false,
  52. "field":"connector"
  53. },
  54. {
  55. "type":"string",
  56. "optional":false,
  57. "field":"name"
  58. },
  59. {
  60. "type":"int64",
  61. "optional":false,
  62. "field":"ts_ms"
  63. },
  64. {
  65. "type":"string",
  66. "optional":true,
  67. "name":"io.debezium.data.Enum",
  68. "version":1,
  69. "parameters":{
  70. "allowed":"true,last,false"
  71. },
  72. "default":"false",
  73. "field":"snapshot"
  74. },
  75. {
  76. "type":"string",
  77. "optional":false,
  78. "field":"db"
  79. },
  80. {
  81. "type":"string",
  82. "optional":false,
  83. "field":"schema"
  84. },
  85. {
  86. "type":"string",
  87. "optional":false,
  88. "field":"table"
  89. },
  90. {
  91. "type":"int64",
  92. "optional":true,
  93. "field":"txId"
  94. },
  95. {
  96. "type":"int64",
  97. "optional":true,
  98. "field":"lsn"
  99. },
  100. {
  101. "type":"int64",
  102. "optional":true,
  103. "field":"xmin"
  104. }
  105. ],
  106. "optional":false,
  107. "name":"io.debezium.connector.postgresql.Source",
  108. "field":"source"
  109. },
  110. {
  111. "type":"string",
  112. "optional":false,
  113. "field":"op"
  114. },
  115. {
  116. "type":"int64",
  117. "optional":true,
  118. "field":"ts_ms"
  119. }
  120. ],
  121. "optional":false,
  122. "name":"info.public.student.Envelope"
  123. },
  124. "payload":{
  125. "before":{
  126. "id":"5 ",
  127. "name":null
  128. },
  129. "after":null,
  130. "source":{
  131. "version":"1.0.3.Final",
  132. "connector":"postgresql",
  133. "name":"info",
  134. "ts_ms":1583921079909,
  135. "snapshot":"false",
  136. "db":"test",
  137. "schema":"public",
  138. "table":"student",
  139. "txId":1762,
  140. "lsn":23399936,
  141. "xmin":null
  142. },
  143. "op":"d",
  144. "ts_ms":1583921079912
  145. }
  146. }

主要有用的信息在:

  1. {
  2. "payload":{
  3. "before":{
  4. "id":"5 ",
  5. "name":null
  6. },
  7. "after":{
  8. "id":"5 ",
  9. "name":"ethan "
  10. },
  11. "source":{
  12. "version":"1.0.3.Final",
  13. "connector":"postgresql",
  14. "name":"info",
  15. "ts_ms":1583920898322,
  16. "snapshot":"false",
  17. "db":"test",
  18. "schema":"public",
  19. "table":"student",
  20. "txId":1761,
  21. "lsn":23398864,
  22. "xmin":null
  23. },
  24. "op":"u",
  25. "ts_ms":1583920898326
  26. }
  27. }

distributed模式测试
  1. [monkey@hadoop102 kafka]$ bin/connect-distributed.sh config/connect-distributed.properties

得到的结果和standalone模式是一致的。

『2021.09.15』使用Debezium同步PostgreSQL数据至Kafka - 图2