fbpx

運用 Kafka 實現 CDC 資料同步 – 以 MongoDB 為例

Tom 資料工程師 歐立威科技

前言

關於 CDC 技術的簡介及 Kafka 如何用在 CDC,可以參考文章「淺談 Change Data Capture (CDC) 及重要性,它如何實現資料同步?」的介紹。

本文以透過 MongoDB Kafka Connector 實現 MongoDB 集群間的資料同步為例,開源的 Kafka Connect 外掛程式有 Debezium MongoDB Connector 和 MongoDB Kafka Connector 等,下表作簡單比較:

工具Debezium MongoDB ConnectorMongoDB Kafka Connector
支援性僅提供來源端連接器驅動程式提供來源端及目的端連接器驅動程式
同步機制Oplog Mode (~v1.9) Change Streams Mode (v1.8+)Change Streams Mode
接收端 CDC 資料處理需搭配 MongoDB Kafka Connector 提供的 Debezium CDC HandlerMongoDB CDC Handler
作用 MongoDB 節點僅支援單一節點 Replica Set/Sharded Cluster支援多節點 Replica Set/Sharded Cluster

本文旨在提供讀者自行架設簡易測試環境的步驟作為參考,並用以驗證資料庫間的資料串流功能。以下內容將著重在說明如何建置 Kafka 服務及設定資料同步的工作。

你需要準備

  • 一台已安裝 Linux 穩定發行版本的 VM 或實體機器,本文使用 CentOS 7。 建議配置:
    • vCPUs: 4 以上
    • RAM: 8 GB 以上
    • Disk: 500 GB 以上
  • Linux 基本操作知識
  • 來源端及目的端 MongoDB 集群(含連線資訊、具 ChangeStreams 讀取及相關表格讀寫等權限可供 Connector 工作使用的連線帳密)
    • 需為 Replica Set 或是 Sharded Cluster
  • 來源端 MongoDB 表格(含資料),本文假設為 mydb database 下的 mycoll 表格
  • MongoDB 集群的連線工具,如 MongoDB Compass 或 terminal

環境架構

Kafka 架構示意圖

Kafka 架構示意圖

本文在同一台機器上安裝單節點 Kafka 及 Kafka Connect,並在 Kafka Connect 中加入 MongoDB Kafka Connector 外掛程式。

MongoDB 的 CDC 機制 (Change Streams)

Change Streams 為監聽 oplog 中的表格資料異動並產生異動事件 (Change Event) 資料。 當一個正在透過 Kafka 進行資料同步的來源端 MongoDB 表格發生異動,會發生以下事情:

  1. 來源端 MongoDB 資料表格的異動,產生新的異動事件。
  2. Source Connector 捕捉異動事件,並集結成批送往 Kafka,對 Kafka Broker 送寫入請求。
  3. Kafka Broker 將事件資料寫入指定 Topic。
  4. Sink Connector 透過 Kafka Broker 讀取指定 Topic 中的資料,並根據內容將新的文件寫入目的端 MongoDB 的指定表格。

Kafka 服務相關元件和名詞介紹

  • ZooKeeper: Apache 的另一個專案,這裡可協調各 Kafka 服務節點間的狀態管理及工作分配。
    • 在 Kafka 2.8.0 以後版本改為使用 KRaft,可脫離 ZooKeeper 運作。
  • Broker: 相當於一個 Kafka 節點,負責處理對 Topic 寫入或讀取資料的請求。
  • Topic: Kafka 中用來存放資料的容器。
  • Partition: 指 Topic 分割。一個 Topic 可以分割成許多個 Partition,如此 Consumer 可以同時拉取不同 partition 中的資料,提升速率。
  • Producer: 指將資料送往 Kafka Broker 寫入的資料製造者。
  • Consumer / Consumer Groups: Topic 訊息的消費者。同一個 Topic 可能有很多個消費者,因此以群組形式存在。
  • Leader / Followers: Kafka 通常以多節點集群的形式運作,集群中會有一個主要節點作為主節點,其他節點則為副節點,從主節點同步狀態及資料。
  • Kafka Connect: 從 Kafka 分出來的服務,負責 Connector 的管理。
  • plugin: 即供 Kafka Connect 使用的外掛驅動程式,需安裝該種類連接器的驅動才能夠建立使用該種連接器的 Connector。
  • Connector: 與外部資料環境及 Kafka 對接,用來進行指定讀/寫資料工作的單位。
  • Worker / Worker Task: 一個 Sink Connector 下依據 Topic partition 數目可以有一或多個 process 平行執行 Connector 設定中定義的工作。

操作步驟

下載 Apache Kafka

curl <https://downloads.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgztar> -zvf kafka_2.13-3.3.1.tgzcd kafka_2.13-3.3.1

啟動 Kafka 服務

./bin/kafka-server-start.sh

Kafka 預設使用 port 9092。

啟動 Kafka Connect 服務

Kafka Connect 分有 standalone (預設使用 ./config/connect-standalone.properties 中的設定) 及 distributed (預設使用 ./config/connect-distributed.properties 中的設定)。差異在於 distributed 版本支援分散儲存 topic 中的資料,一般建議使用 distributed 版本

在啟動 Kafka Connect 之前,我們需要先安裝 MongoDB Kafka Connector 外掛,在啟動 Kafka Connect 服務時才會讀取到該外掛程式的存在。

開啟 connect-distributed.properties,往下滾動至最底部,找到 plugin.path 一行,修改為您要放置外掛檔案的路徑位置。

vi ./config/connect-distributed.properties

下載外掛程式並移至 .properties 中指定 plugin.path 即可,這邊假設為 kafka_2.13-3.3.1/plugins

curl <https://search.maven.org/remotecontent?filepath=org/mongodb/kafka/mongo-kafka-connect/1.8.0/mongo-kafka-connect-1.8.0-all.jarmkdir> -p pluginscp mongo-kafka-connect-1.8.0-all.jar plugins

再來啟動 Kafka Connect,預設使用 port 8083

./bin/connect-distributed.sh

可以透過以下指令查看已安裝的 Kafka connector plugins,此時應該要看得到 MongoSourceConnectorMongoSinkConnector

curl <http://localhost:8083/connector-plugins>

新增 Source Connector

Kafka Connect 提供 REST API 管理執行中的 connector 工作,包含新增 connector、查看 connector 設定內容、變更 connector 設定內容、刪除 connector 等。

在新增 connector 時透過 API 傳入的設定為 json 格式,為方便可以將設定存為檔案再由指令呼叫設定檔內容。

以下提供 source connector 範例及常用設定表列說明。

mongo-src-connector.json
{   
"name": "mongo-src-connector",    
"config": {        
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri": "mongodb://admin:password@mymongosrc1:27017,mymongosrc2:27017,mymongosrc3:27017", 
"database": "mydb", 
"collection": "mycoll",        
"topic.prefix": "src",        
"copy.existing": true,       
"key.converter": "org.apache.kafka.connect.json.JsonConverter",        "key.converter.schemas.enable": "false",       
"value.converter": "org.apache.kafka.connect.json.JsonConverter",        "value.converter.schemas.enable": "false",        
"tasks.max": "1"    }
}
設定參數說明
name用以辨別此 Connector 工作的名稱
connector.classKafka connector plugin 的 class 名稱
connection.uriMongoDB 集群連線字串密碼若包含特殊符號如@,需要轉換為 URL Encoding
database指定監聽異動的 database 名稱未指定 database 及 collection 時監聽範圍為整個集群
collection指定監聽異動的 collection 名稱,此參數只支援單一 collection未指定 collection 時監聽範圍為整個指定 database
topic.prefix指定目標 kafka topic 的前綴字串
topic.suffix指定目標 kafka topic 的後綴字串
copy.existing設定是否複製 collection 內既有資料,預設為 false
task.max此 Connector 可以使用的 Worker Task 上限,預設為 1

我們需要先建立 Kafka Topic,才能讓 Connector 透過 Broker 寫入

./bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic src.mydb.mycoll --replication-factor 3 --partition 1

replication-factor: 一共會有幾份相同的 topic,建議 production 設定為 3

確認剛才建立的 topic 資訊

./bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic src.mydb.mycoll

之後便可透過以下指令啟動 Source Connector

curl -X POST <http://localhost:8083/connectors> -H "Accept:application/json" -H "Content-Type:application/json" -d @mongo-src-connector.json

觀察 Kafka Topic 內容

我們先來確認 Source Connector 有正常啟動(無錯誤訊息且狀態為 RUNNING),以及表格資料是否如預期寫入 Kafka topic。

檢查 Connector 狀態

curl <http://localhost:8083/connectors/mongo-src-connector/status>

列出所有 Kafka topics

./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

查看 Kafka topic 內資料

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic src.mydb.mycoll

新增 Sink Connector

mongo-sink-connector.json
{   
"name": "mongo-sink-connector",   
"config": {        
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",        
"connection.uri": "mongodb://admin:password@mymongosink1:27017,mymongosink2:27017,mymongosink3:27017",
"topics": "src.mydb.mycoll",        
"database": "sinkdb",        
"collection": "sinkcoll",        
"key.converter": "org.apache.kafka.connect.json.JsonConverter",        "key.converter.schemas.enable": "false",        
"value.converter": "org.apache.kafka.connect.json.JsonConverter",        "value.converter.schemas.enable": "false",        
"change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler",        
"tasks.max": "1"    }
}

接著便可透過以下指令啟動 Sink Connector

curl -X POST <http://localhost:8083/connectors> -H "Accept:application/json" -H "Content-Type:application/json" -d @mongo-sink-connector.json

觀察結果

在確認 Sink Connector 正常啟動後,同步工作至此已經架設完成。您可以參考以下指令,在先前建立的 connector 工作持續運作的狀態下,連線到來源端 MongoDB 進行操作。透過另一個連線,在目的端 MongoDB 表格結果是否與來源端一致。

  • 連線至 MongoDB
mongosh mongodb://admin:password@mymongosink1:27017,mymongosink2:27017,mymongosink3:27017
  • 自來源端表格新增一筆資料
use mydb
db.mycoll.insertOne({ name: "TestField" });

Output:

{ acknowledged: true,
  insertedId: ObjectId("63686963a37df5cdbf7fec9d") }

在目標資料庫的表格 sinkdb.sinkcoll 會看到新增了一筆連 ObjectId 都與來源端一模一樣的資料

  • 自來源端表格更新一筆資料
db.mycoll.updateOne({name: "TestField"}, {$set:{newField: true}});

Output:

{ acknowledged: true,
  insertedId: null,
  matchedCount: 1,
  modifiedCount: 1,
  upsertedCount: 0 }

在目標資料庫的表格會看到這筆資料多了一個欄位 newField

  • 自來源端表格刪除一筆資料
db.mycoll.deleteOne({name: "TestField", newField: true})

Output:

{ acknowledged: true, deletedCount: 1 }

在目標資料庫的表格會看到這筆資料也跟著被刪除

收尾

如果要結束測試,一樣可以透過 curl 呼叫 API 來停止 connector 工作。

curl -X DELETE <http://localhost:8083/connectors/mongo-src-connector>
curl -X DELETE <http://localhost:8083/connectors/mongo-sink-connector>

結語

本文中我們示範了如何自行架設 Kafka 環境,以及簡單驗證資料庫間利用 CDC 機制同步資料的功能。透過開源的 CDC 同步方案,我們可以更有效地進行巨量資料庫間資料的複製與轉移,並確保資料的一致性。

參考資料

  1. MongoDB Kafka Connector 官方文件

本文章由歐立威技術顧問撰寫而成,轉載請註明出處,內容若有侵權請來信告知


延伸閱讀 :

相關文章