運用 Kafka 實現 CDC 資料同步 – 以 MongoDB 為例

內容目錄
Toggle前言
關於 CDC 技術的簡介及 Kafka 如何用在 CDC,可以參考文章「淺談 Change Data Capture (CDC) 及重要性,它如何實現資料同步?」的介紹。
本文以透過 MongoDB Kafka Connector 實現 MongoDB 集群間的資料同步為例,開源的 Kafka Connect 外掛程式有 Debezium MongoDB Connector 和 MongoDB Kafka Connector 等,下表作簡單比較:
工具 | Debezium MongoDB Connector | MongoDB Kafka Connector |
---|---|---|
支援性 | 僅提供來源端連接器驅動程式 | 提供來源端及目的端連接器驅動程式 |
同步機制 | Oplog Mode (~v1.9) Change Streams Mode (v1.8+) | Change Streams Mode |
接收端 CDC 資料處理 | 需搭配 MongoDB Kafka Connector 提供的 Debezium CDC Handler | MongoDB CDC Handler |
作用 MongoDB 節點 | 僅支援單一節點 Replica Set/Sharded Cluster | 支援多節點 Replica Set/Sharded Cluster |
本文旨在提供讀者自行架設簡易測試環境的步驟作為參考,並用以驗證資料庫間的資料串流功能。以下內容將著重在說明如何建置 Kafka 服務及設定資料同步的工作。
你需要準備
-
- 一台已安裝 Linux 穩定發行版本的 VM 或實體機器,本文使用 CentOS 7。 建議配置:
-
- vCPUs: 4 以上
-
- RAM: 8 GB 以上
-
- Disk: 500 GB 以上
-
- 一台已安裝 Linux 穩定發行版本的 VM 或實體機器,本文使用 CentOS 7。 建議配置:
-
- Linux 基本操作知識
-
- 來源端及目的端 MongoDB 集群(含連線資訊、具
ChangeStreams
讀取及相關表格讀寫等權限可供 Connector 工作使用的連線帳密)-
- 需為 Replica Set 或是 Sharded Cluster
-
- 來源端及目的端 MongoDB 集群(含連線資訊、具
-
- 來源端 MongoDB 表格(含資料),本文假設為
mydb
database 下的mycoll
表格
- 來源端 MongoDB 表格(含資料),本文假設為
-
- MongoDB 集群的連線工具,如 MongoDB Compass 或 terminal
環境架構

Kafka 架構示意圖
本文在同一台機器上安裝單節點 Kafka 及 Kafka Connect,並在 Kafka Connect 中加入 MongoDB Kafka Connector 外掛程式。
MongoDB 的 CDC 機制 (Change Streams)
MongoDB 的 Change Streams 功能就像資料庫的實時脈搏,它可以捕捉資料庫中發生的任何變化,並將這些變化傳遞給其他系統。
Change Streams 如何運作?
Change Streams 會監控 MongoDB 的操作日誌 (oplog),並將資料的異動 (例如新增、修改、刪除) 轉換成異動事件 (Change Event)。
當 MongoDB 資料庫發生變化時,Change Streams 會捕捉這些變化,並將其傳遞給 Kafka。
更具體地說,整個流程如下:
- 資料庫異動: 當來源端的 MongoDB 資料表格發生異動時,例如新增、修改或刪除文件,MongoDB 會產生新的異動事件,並記錄在 oplog 中。
- 捕捉異動事件: Source Connector 會持續監控 oplog,捕捉這些異動事件,並將其集結成批次。
- 發送至 Kafka: Source Connector 會將批次的異動事件發送到 Kafka Broker,並寫入指定的 Kafka Topic。
- 同步至目標系統: Sink Connector 會從 Kafka Broker 讀取指定 Topic 中的資料,並根據異動事件的內容,將新的文件寫入目的端 MongoDB 的指定表格,或執行其他同步操作。
Kafka 服務相關元件和名詞介紹
在 Kafka 的訊息旅程中,許多幕後推手扮演著重要的角色,確保訊息能順利地從生產者傳遞到消費者。以下介紹 Kafka 服務中幾個關鍵的元件和名詞:
協調者:
- ZooKeeper: 如同一位經驗豐富的指揮家,ZooKeeper 負責協調 Kafka 各服務節點的狀態管理和工作分配,確保整個系統的穩定運行。在 Kafka 2.8.0 以後的版本,Kafka 引入了 KRaft 模式,可以脫離 ZooKeeper 運作,更加輕量化和高效。
訊息的歸宿:
- Broker: Kafka 的節點,如同一個個郵局,負責處理對 Topic 寫入或讀取資料的請求。
- Topic: 存放資料的容器,如同郵局中的郵箱,用於存放不同類型的訊息。
- Partition: Topic 的分割,如同將郵箱分成多個隔層,讓 Consumer 可以同時拉取不同 Partition 中的資料,提升處理效率。
訊息的生產者和消費者:
- Producer: 將資料送往 Kafka Broker 寫入的資料製造者,如同寄件人,將訊息投遞到郵箱。
- Consumer / Consumer Groups: Topic 訊息的消費者,如同收件人,從郵箱中取出訊息。同一個 Topic 可能有很多個消費者,因此以群組形式存在,確保每個訊息都能被消費。
訊息旅程的可靠保障:
- Leader / Followers: Kafka 通常以多節點集群的形式運作,集群中會有一個主要節點作為主節點 (Leader),其他節點則為副節點 (Followers),從主節點同步狀態及資料,確保訊息的可靠性和容錯能力。
資料同步的橋樑:
- Kafka Connect: 負責 Connector 的管理,如同一位聯絡員,負責協調資料的同步工作。
- plugin: 供 Kafka Connect 使用的外掛驅動程式,如同不同的轉接頭,讓 Kafka Connect 可以連接到不同的資料庫或系統。
- Connector: 與外部資料環境及 Kafka 對接,用來進行指定讀/寫資料工作的單位,如同一位搬運工,負責將資料從來源系統搬運到 Kafka,或從 Kafka 搬運到目標系統。
- Worker / Worker Task: 一個 Sink Connector 下依據 Topic partition 數目可以有一或多個 process 平行執行 Connector 設定中定義的工作,如同多位搬運工同時工作,提升資料同步的效率。
操作步驟
下載 Apache Kafka
curl <https://downloads.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgztar> -zvf kafka_2.13-3.3.1.tgzcd kafka_2.13-3.3.1
啟動 Kafka 服務
./bin/kafka-server-start.sh
Kafka 預設使用 port 9092。
啟動 Kafka Connect 服務
Kafka Connect 分有 standalone (預設使用 ./config/connect-standalone.properties
中的設定) 及 distributed (預設使用 ./config/connect-distributed.properties
中的設定)。差異在於 distributed 版本支援分散儲存 topic 中的資料,一般建議使用 distributed 版本
在啟動 Kafka Connect 之前,我們需要先安裝 MongoDB Kafka Connector 外掛,在啟動 Kafka Connect 服務時才會讀取到該外掛程式的存在。
開啟 connect-distributed.properties
,往下滾動至最底部,找到 plugin.path
一行,修改為您要放置外掛檔案的路徑位置。
vi ./config/connect-distributed.properties
下載外掛程式並移至 .properties
中指定 plugin.path
即可,這邊假設為 kafka_2.13-3.3.1/plugins
。
curl <https://search.maven.org/remotecontent?filepath=org/mongodb/kafka/mongo-kafka-connect/1.8.0/mongo-kafka-connect-1.8.0-all.jarmkdir> -p pluginscp mongo-kafka-connect-1.8.0-all.jar plugins
再來啟動 Kafka Connect,預設使用 port 8083
./bin/connect-distributed.sh
可以透過以下指令查看已安裝的 Kafka connector plugins,此時應該要看得到 MongoSourceConnector
及 MongoSinkConnector
。
curl <http://localhost:8083/connector-plugins>
新增 Source Connector
Kafka Connect 提供 REST API 管理執行中的 connector 工作,包含新增 connector、查看 connector 設定內容、變更 connector 設定內容、刪除 connector 等。
在新增 connector 時透過 API 傳入的設定為 json 格式,為方便可以將設定存為檔案再由指令呼叫設定檔內容。
以下提供 source connector 範例及常用設定表列說明。
mongo-src-connector.json
{
"name": "mongo-src-connector",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri": "mongodb://admin:password@mymongosrc1:27017,mymongosrc2:27017,mymongosrc3:27017",
"database": "mydb",
"collection": "mycoll",
"topic.prefix": "src",
"copy.existing": true,
"key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false",
"tasks.max": "1" }
}
設定參數 | 說明 |
---|---|
name | 用以辨別此 Connector 工作的名稱 |
connector.class | Kafka connector plugin 的 class 名稱 |
connection.uri | MongoDB 集群連線字串密碼若包含特殊符號如@,需要轉換為 URL Encoding |
database | 指定監聽異動的 database 名稱未指定 database 及 collection 時監聽範圍為整個集群 |
collection | 指定監聽異動的 collection 名稱,此參數只支援單一 collection未指定 collection 時監聽範圍為整個指定 database |
topic.prefix | 指定目標 kafka topic 的前綴字串 |
topic.suffix | 指定目標 kafka topic 的後綴字串 |
copy.existing | 設定是否複製 collection 內既有資料,預設為 false |
task.max | 此 Connector 可以使用的 Worker Task 上限,預設為 1 |
我們需要先建立 Kafka Topic,才能讓 Connector 透過 Broker 寫入
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic src.mydb.mycoll --replication-factor 3 --partition 1
|replication-factor: 一共會有幾份相同的 topic,建議 production 設定為 3
確認剛才建立的 topic 資訊
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic src.mydb.mycoll
之後便可透過以下指令啟動 Source Connector
curl -X POST <http://localhost:8083/connectors> -H "Accept:application/json" -H "Content-Type:application/json" -d @mongo-src-connector.json
觀察 Kafka Topic 內容
我們先來確認 Source Connector 有正常啟動(無錯誤訊息且狀態為 RUNNING),以及表格資料是否如預期寫入 Kafka topic。
檢查 Connector 狀態
curl <http://localhost:8083/connectors/mongo-src-connector/status>
列出所有 Kafka topics
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
查看 Kafka topic 內資料
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic src.mydb.mycoll
新增 Sink Connector
mongo-sink-connector.json
{
"name": "mongo-sink-connector",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"connection.uri": "mongodb://admin:password@mymongosink1:27017,mymongosink2:27017,mymongosink3:27017",
"topics": "src.mydb.mycoll",
"database": "sinkdb",
"collection": "sinkcoll",
"key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false",
"change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler",
"tasks.max": "1" }
}
接著便可透過以下指令啟動 Sink Connector
curl -X POST <http://localhost:8083/connectors> -H "Accept:application/json" -H "Content-Type:application/json" -d @mongo-sink-connector.json
觀察結果
在確認 Sink Connector 正常啟動後,同步工作至此已經架設完成。您可以參考以下指令,在先前建立的 connector 工作持續運作的狀態下,連線到來源端 MongoDB 進行操作。透過另一個連線,在目的端 MongoDB 表格結果是否與來源端一致。
-
- 連線至 MongoDB
mongosh mongodb://admin:password@mymongosink1:27017,mymongosink2:27017,mymongosink3:27017
-
- 自來源端表格新增一筆資料
use mydb
db.mycoll.insertOne({ name: "TestField" });
Output:
{ acknowledged: true,
insertedId: ObjectId("63686963a37df5cdbf7fec9d") }
在目標資料庫的表格
sinkdb.sinkcoll
會看到新增了一筆連ObjectId
都與來源端一模一樣的資料
-
- 自來源端表格更新一筆資料
db.mycoll.updateOne({name: "TestField"}, {$set:{newField: true}});
Output:
{ acknowledged: true,
insertedId: null,
matchedCount: 1,
modifiedCount: 1,
upsertedCount: 0 }
在目標資料庫的表格會看到這筆資料多了一個欄位
newField
-
- 自來源端表格刪除一筆資料
db.mycoll.deleteOne({name: "TestField", newField: true})
Output:
{ acknowledged: true, deletedCount: 1 }
在目標資料庫的表格會看到這筆資料也跟著被刪除
收尾
如果要結束測試,一樣可以透過 curl 呼叫 API 來停止 connector 工作。
curl -X DELETE <http://localhost:8083/connectors/mongo-src-connector>
curl -X DELETE <http://localhost:8083/connectors/mongo-sink-connector>
結語
本文中我們示範了如何自行架設 Kafka 環境,以及簡單驗證資料庫間利用 CDC 機制同步資料的功能。透過開源的 CDC 同步方案,我們可以更有效地進行巨量資料庫間資料的複製與轉移,並確保資料的一致性。