運用 Kafka 實現 CDC 資料同步 – 以 MongoDB 為例
內容目錄
前言
關於 CDC 技術的簡介及 Kafka 如何用在 CDC,可以參考文章「淺談 Change Data Capture (CDC) 及重要性,它如何實現資料同步?」的介紹。
本文以透過 MongoDB Kafka Connector 實現 MongoDB 集群間的資料同步為例,開源的 Kafka Connect 外掛程式有 Debezium MongoDB Connector 和 MongoDB Kafka Connector 等,下表作簡單比較:
工具 | Debezium MongoDB Connector | MongoDB Kafka Connector |
---|---|---|
支援性 | 僅提供來源端連接器驅動程式 | 提供來源端及目的端連接器驅動程式 |
同步機制 | Oplog Mode (~v1.9) Change Streams Mode (v1.8+) | Change Streams Mode |
接收端 CDC 資料處理 | 需搭配 MongoDB Kafka Connector 提供的 Debezium CDC Handler | MongoDB CDC Handler |
作用 MongoDB 節點 | 僅支援單一節點 Replica Set/Sharded Cluster | 支援多節點 Replica Set/Sharded Cluster |
本文旨在提供讀者自行架設簡易測試環境的步驟作為參考,並用以驗證資料庫間的資料串流功能。以下內容將著重在說明如何建置 Kafka 服務及設定資料同步的工作。
你需要準備
- 一台已安裝 Linux 穩定發行版本的 VM 或實體機器,本文使用 CentOS 7。 建議配置:
- vCPUs: 4 以上
- RAM: 8 GB 以上
- Disk: 500 GB 以上
- Linux 基本操作知識
- 來源端及目的端 MongoDB 集群(含連線資訊、具
ChangeStreams
讀取及相關表格讀寫等權限可供 Connector 工作使用的連線帳密)- 需為 Replica Set 或是 Sharded Cluster
- 來源端 MongoDB 表格(含資料),本文假設為
mydb
database 下的mycoll
表格 - MongoDB 集群的連線工具,如 MongoDB Compass 或 terminal
環境架構
Kafka 架構示意圖
本文在同一台機器上安裝單節點 Kafka 及 Kafka Connect,並在 Kafka Connect 中加入 MongoDB Kafka Connector 外掛程式。
MongoDB 的 CDC 機制 (Change Streams)
Change Streams 為監聽 oplog 中的表格資料異動並產生異動事件 (Change Event) 資料。 當一個正在透過 Kafka 進行資料同步的來源端 MongoDB 表格發生異動,會發生以下事情:
- 來源端 MongoDB 資料表格的異動,產生新的異動事件。
- Source Connector 捕捉異動事件,並集結成批送往 Kafka,對 Kafka Broker 送寫入請求。
- Kafka Broker 將事件資料寫入指定 Topic。
- Sink Connector 透過 Kafka Broker 讀取指定 Topic 中的資料,並根據內容將新的文件寫入目的端 MongoDB 的指定表格。
Kafka 服務相關元件和名詞介紹
- ZooKeeper: Apache 的另一個專案,這裡可協調各 Kafka 服務節點間的狀態管理及工作分配。
- 在 Kafka 2.8.0 以後版本改為使用 KRaft,可脫離 ZooKeeper 運作。
- Broker: 相當於一個 Kafka 節點,負責處理對 Topic 寫入或讀取資料的請求。
- Topic: Kafka 中用來存放資料的容器。
- Partition: 指 Topic 分割。一個 Topic 可以分割成許多個 Partition,如此 Consumer 可以同時拉取不同 partition 中的資料,提升速率。
- Producer: 指將資料送往 Kafka Broker 寫入的資料製造者。
- Consumer / Consumer Groups: Topic 訊息的消費者。同一個 Topic 可能有很多個消費者,因此以群組形式存在。
- Leader / Followers: Kafka 通常以多節點集群的形式運作,集群中會有一個主要節點作為主節點,其他節點則為副節點,從主節點同步狀態及資料。
- Kafka Connect: 從 Kafka 分出來的服務,負責 Connector 的管理。
- plugin: 即供 Kafka Connect 使用的外掛驅動程式,需安裝該種類連接器的驅動才能夠建立使用該種連接器的 Connector。
- Connector: 與外部資料環境及 Kafka 對接,用來進行指定讀/寫資料工作的單位。
- Worker / Worker Task: 一個 Sink Connector 下依據 Topic partition 數目可以有一或多個 process 平行執行 Connector 設定中定義的工作。
操作步驟
下載 Apache Kafka
curl <https://downloads.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgztar> -zvf kafka_2.13-3.3.1.tgzcd kafka_2.13-3.3.1
啟動 Kafka 服務
./bin/kafka-server-start.sh
Kafka 預設使用 port 9092。
啟動 Kafka Connect 服務
Kafka Connect 分有 standalone (預設使用 ./config/connect-standalone.properties
中的設定) 及 distributed (預設使用 ./config/connect-distributed.properties
中的設定)。差異在於 distributed 版本支援分散儲存 topic 中的資料,一般建議使用 distributed 版本
在啟動 Kafka Connect 之前,我們需要先安裝 MongoDB Kafka Connector 外掛,在啟動 Kafka Connect 服務時才會讀取到該外掛程式的存在。
開啟 connect-distributed.properties
,往下滾動至最底部,找到 plugin.path
一行,修改為您要放置外掛檔案的路徑位置。
vi ./config/connect-distributed.properties
下載外掛程式並移至 .properties
中指定 plugin.path
即可,這邊假設為 kafka_2.13-3.3.1/plugins
。
curl <https://search.maven.org/remotecontent?filepath=org/mongodb/kafka/mongo-kafka-connect/1.8.0/mongo-kafka-connect-1.8.0-all.jarmkdir> -p pluginscp mongo-kafka-connect-1.8.0-all.jar plugins
再來啟動 Kafka Connect,預設使用 port 8083
./bin/connect-distributed.sh
可以透過以下指令查看已安裝的 Kafka connector plugins,此時應該要看得到 MongoSourceConnector
及 MongoSinkConnector
。
curl <http://localhost:8083/connector-plugins>
新增 Source Connector
Kafka Connect 提供 REST API 管理執行中的 connector 工作,包含新增 connector、查看 connector 設定內容、變更 connector 設定內容、刪除 connector 等。
在新增 connector 時透過 API 傳入的設定為 json 格式,為方便可以將設定存為檔案再由指令呼叫設定檔內容。
以下提供 source connector 範例及常用設定表列說明。
mongo-src-connector.json
{
"name": "mongo-src-connector",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri": "mongodb://admin:password@mymongosrc1:27017,mymongosrc2:27017,mymongosrc3:27017",
"database": "mydb",
"collection": "mycoll",
"topic.prefix": "src",
"copy.existing": true,
"key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false",
"tasks.max": "1" }
}
設定參數 | 說明 |
---|---|
name | 用以辨別此 Connector 工作的名稱 |
connector.class | Kafka connector plugin 的 class 名稱 |
connection.uri | MongoDB 集群連線字串密碼若包含特殊符號如@,需要轉換為 URL Encoding |
database | 指定監聽異動的 database 名稱未指定 database 及 collection 時監聽範圍為整個集群 |
collection | 指定監聽異動的 collection 名稱,此參數只支援單一 collection未指定 collection 時監聽範圍為整個指定 database |
topic.prefix | 指定目標 kafka topic 的前綴字串 |
topic.suffix | 指定目標 kafka topic 的後綴字串 |
copy.existing | 設定是否複製 collection 內既有資料,預設為 false |
task.max | 此 Connector 可以使用的 Worker Task 上限,預設為 1 |
我們需要先建立 Kafka Topic,才能讓 Connector 透過 Broker 寫入
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic src.mydb.mycoll --replication-factor 3 --partition 1
|replication-factor: 一共會有幾份相同的 topic,建議 production 設定為 3
確認剛才建立的 topic 資訊
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic src.mydb.mycoll
之後便可透過以下指令啟動 Source Connector
curl -X POST <http://localhost:8083/connectors> -H "Accept:application/json" -H "Content-Type:application/json" -d @mongo-src-connector.json
觀察 Kafka Topic 內容
我們先來確認 Source Connector 有正常啟動(無錯誤訊息且狀態為 RUNNING),以及表格資料是否如預期寫入 Kafka topic。
檢查 Connector 狀態
curl <http://localhost:8083/connectors/mongo-src-connector/status>
列出所有 Kafka topics
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
查看 Kafka topic 內資料
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic src.mydb.mycoll
新增 Sink Connector
mongo-sink-connector.json
{
"name": "mongo-sink-connector",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"connection.uri": "mongodb://admin:password@mymongosink1:27017,mymongosink2:27017,mymongosink3:27017",
"topics": "src.mydb.mycoll",
"database": "sinkdb",
"collection": "sinkcoll",
"key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false",
"change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler",
"tasks.max": "1" }
}
接著便可透過以下指令啟動 Sink Connector
curl -X POST <http://localhost:8083/connectors> -H "Accept:application/json" -H "Content-Type:application/json" -d @mongo-sink-connector.json
觀察結果
在確認 Sink Connector 正常啟動後,同步工作至此已經架設完成。您可以參考以下指令,在先前建立的 connector 工作持續運作的狀態下,連線到來源端 MongoDB 進行操作。透過另一個連線,在目的端 MongoDB 表格結果是否與來源端一致。
- 連線至 MongoDB
mongosh mongodb://admin:password@mymongosink1:27017,mymongosink2:27017,mymongosink3:27017
- 自來源端表格新增一筆資料
use mydb
db.mycoll.insertOne({ name: "TestField" });
Output:
{ acknowledged: true,
insertedId: ObjectId("63686963a37df5cdbf7fec9d") }
在目標資料庫的表格
sinkdb.sinkcoll
會看到新增了一筆連ObjectId
都與來源端一模一樣的資料
- 自來源端表格更新一筆資料
db.mycoll.updateOne({name: "TestField"}, {$set:{newField: true}});
Output:
{ acknowledged: true,
insertedId: null,
matchedCount: 1,
modifiedCount: 1,
upsertedCount: 0 }
在目標資料庫的表格會看到這筆資料多了一個欄位
newField
- 自來源端表格刪除一筆資料
db.mycoll.deleteOne({name: "TestField", newField: true})
Output:
{ acknowledged: true, deletedCount: 1 }
在目標資料庫的表格會看到這筆資料也跟著被刪除
收尾
如果要結束測試,一樣可以透過 curl 呼叫 API 來停止 connector 工作。
curl -X DELETE <http://localhost:8083/connectors/mongo-src-connector>
curl -X DELETE <http://localhost:8083/connectors/mongo-sink-connector>
結語
本文中我們示範了如何自行架設 Kafka 環境,以及簡單驗證資料庫間利用 CDC 機制同步資料的功能。透過開源的 CDC 同步方案,我們可以更有效地進行巨量資料庫間資料的複製與轉移,並確保資料的一致性。
參考資料
本文章由歐立威技術顧問撰寫而成,轉載請註明出處,內容若有侵權請來信告知