fbpx

運用 Kafka 實現 CDC 資料同步 – 以 MongoDB 為例

Tom 資料工程師 歐立威科技

前言

關於 CDC 技術的簡介及 Kafka 如何用在 CDC,可以參考文章「淺談 Change Data Capture (CDC) 及重要性,它如何實現資料同步?」的介紹。

本文以透過 MongoDB Kafka Connector 實現 MongoDB 集群間的資料同步為例,開源的 Kafka Connect 外掛程式有 Debezium MongoDB Connector 和 MongoDB Kafka Connector 等,下表作簡單比較:

工具 Debezium MongoDB Connector MongoDB Kafka Connector
支援性 僅提供來源端連接器驅動程式 提供來源端及目的端連接器驅動程式
同步機制 Oplog Mode (~v1.9) Change Streams Mode (v1.8+) Change Streams Mode
接收端 CDC 資料處理 需搭配 MongoDB Kafka Connector 提供的 Debezium CDC Handler MongoDB CDC Handler
作用 MongoDB 節點 僅支援單一節點 Replica Set/Sharded Cluster 支援多節點 Replica Set/Sharded Cluster

本文旨在提供讀者自行架設簡易測試環境的步驟作為參考,並用以驗證資料庫間的資料串流功能。以下內容將著重在說明如何建置 Kafka 服務及設定資料同步的工作。 

你需要準備

      • 一台已安裝 Linux 穩定發行版本的 VM 或實體機器,本文使用 CentOS 7。 建議配置:
          • vCPUs: 4 以上

          • RAM: 8 GB 以上

          • Disk: 500 GB 以上

      • Linux 基本操作知識

      • 來源端及目的端 MongoDB 集群(含連線資訊、具 ChangeStreams 讀取及相關表格讀寫等權限可供 Connector 工作使用的連線帳密)
          • 需為 Replica Set 或是 Sharded Cluster

      • 來源端 MongoDB 表格(含資料),本文假設為 mydb database 下的 mycoll 表格

      • MongoDB 集群的連線工具,如 MongoDB Compass 或 terminal


    環境架構

    Kafka 架構示意圖

    Kafka 架構示意圖

    本文在同一台機器上安裝單節點 Kafka 及 Kafka Connect,並在 Kafka Connect 中加入 MongoDB Kafka Connector 外掛程式。

    MongoDB 的 CDC 機制 (Change Streams)

    MongoDB 的 Change Streams 功能就像資料庫的實時脈搏,它可以捕捉資料庫中發生的任何變化,並將這些變化傳遞給其他系統。

    Change Streams 如何運作?

    Change Streams 會監控 MongoDB 的操作日誌 (oplog),並將資料的異動 (例如新增、修改、刪除) 轉換成異動事件 (Change Event)。

    當 MongoDB 資料庫發生變化時,Change Streams 會捕捉這些變化,並將其傳遞給 Kafka。

    更具體地說,整個流程如下:

    1. 資料庫異動: 當來源端的 MongoDB 資料表格發生異動時,例如新增、修改或刪除文件,MongoDB 會產生新的異動事件,並記錄在 oplog 中。
    2. 捕捉異動事件: Source Connector 會持續監控 oplog,捕捉這些異動事件,並將其集結成批次。
    3. 發送至 Kafka: Source Connector 會將批次的異動事件發送到 Kafka Broker,並寫入指定的 Kafka Topic。
    4. 同步至目標系統: Sink Connector 會從 Kafka Broker 讀取指定 Topic 中的資料,並根據異動事件的內容,將新的文件寫入目的端 MongoDB 的指定表格,或執行其他同步操作。

    Kafka 服務相關元件和名詞介紹

    在 Kafka 的訊息旅程中,許多幕後推手扮演著重要的角色,確保訊息能順利地從生產者傳遞到消費者。以下介紹 Kafka 服務中幾個關鍵的元件和名詞:

    協調者:

    • ZooKeeper: 如同一位經驗豐富的指揮家,ZooKeeper 負責協調 Kafka 各服務節點的狀態管理和工作分配,確保整個系統的穩定運行。在 Kafka 2.8.0 以後的版本,Kafka 引入了 KRaft 模式,可以脫離 ZooKeeper 運作,更加輕量化和高效。

    訊息的歸宿:

    • Broker: Kafka 的節點,如同一個個郵局,負責處理對 Topic 寫入或讀取資料的請求。
    • Topic: 存放資料的容器,如同郵局中的郵箱,用於存放不同類型的訊息。
    • Partition: Topic 的分割,如同將郵箱分成多個隔層,讓 Consumer 可以同時拉取不同 Partition 中的資料,提升處理效率。

    訊息的生產者和消費者:

    • Producer: 將資料送往 Kafka Broker 寫入的資料製造者,如同寄件人,將訊息投遞到郵箱。
    • Consumer / Consumer Groups: Topic 訊息的消費者,如同收件人,從郵箱中取出訊息。同一個 Topic 可能有很多個消費者,因此以群組形式存在,確保每個訊息都能被消費。

    訊息旅程的可靠保障:

    • Leader / Followers: Kafka 通常以多節點集群的形式運作,集群中會有一個主要節點作為主節點 (Leader),其他節點則為副節點 (Followers),從主節點同步狀態及資料,確保訊息的可靠性和容錯能力。

    資料同步的橋樑:

    • Kafka Connect: 負責 Connector 的管理,如同一位聯絡員,負責協調資料的同步工作。
    • plugin: 供 Kafka Connect 使用的外掛驅動程式,如同不同的轉接頭,讓 Kafka Connect 可以連接到不同的資料庫或系統。
    • Connector: 與外部資料環境及 Kafka 對接,用來進行指定讀/寫資料工作的單位,如同一位搬運工,負責將資料從來源系統搬運到 Kafka,或從 Kafka 搬運到目標系統。
    • Worker / Worker Task: 一個 Sink Connector 下依據 Topic partition 數目可以有一或多個 process 平行執行 Connector 設定中定義的工作,如同多位搬運工同時工作,提升資料同步的效率。


    操作步驟

    下載 Apache Kafka

    curl <https://downloads.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgztar> -zvf kafka_2.13-3.3.1.tgzcd kafka_2.13-3.3.1

    啟動 Kafka 服務

    ./bin/kafka-server-start.sh

    Kafka 預設使用 port 9092。

    啟動 Kafka Connect 服務

    Kafka Connect 分有 standalone (預設使用 ./config/connect-standalone.properties 中的設定) 及 distributed (預設使用 ./config/connect-distributed.properties 中的設定)。差異在於 distributed 版本支援分散儲存 topic 中的資料,一般建議使用 distributed 版本

    在啟動 Kafka Connect 之前,我們需要先安裝 MongoDB Kafka Connector 外掛,在啟動 Kafka Connect 服務時才會讀取到該外掛程式的存在。

    開啟 connect-distributed.properties,往下滾動至最底部,找到 plugin.path 一行,修改為您要放置外掛檔案的路徑位置。

    vi ./config/connect-distributed.properties

    下載外掛程式並移至 .properties 中指定 plugin.path 即可,這邊假設為 kafka_2.13-3.3.1/plugins

    curl <https://search.maven.org/remotecontent?filepath=org/mongodb/kafka/mongo-kafka-connect/1.8.0/mongo-kafka-connect-1.8.0-all.jarmkdir> -p pluginscp mongo-kafka-connect-1.8.0-all.jar plugins

    再來啟動 Kafka Connect,預設使用 port 8083

    ./bin/connect-distributed.sh

    可以透過以下指令查看已安裝的 Kafka connector plugins,此時應該要看得到 MongoSourceConnectorMongoSinkConnector

    curl <http://localhost:8083/connector-plugins>

    新增 Source Connector

    Kafka Connect 提供 REST API 管理執行中的 connector 工作,包含新增 connector、查看 connector 設定內容、變更 connector 設定內容、刪除 connector 等。

    在新增 connector 時透過 API 傳入的設定為 json 格式,為方便可以將設定存為檔案再由指令呼叫設定檔內容。

    以下提供 source connector 範例及常用設定表列說明。

    mongo-src-connector.json
    {   
    "name": "mongo-src-connector",    
    "config": {        
    "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
    "connection.uri": "mongodb://admin:password@mymongosrc1:27017,mymongosrc2:27017,mymongosrc3:27017", 
    "database": "mydb", 
    "collection": "mycoll",        
    "topic.prefix": "src",        
    "copy.existing": true,       
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",        "key.converter.schemas.enable": "false",       
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",        "value.converter.schemas.enable": "false",        
    "tasks.max": "1"    }
    }

    設定參數 說明
    name 用以辨別此 Connector 工作的名稱
    connector.class Kafka connector plugin 的 class 名稱
    connection.uri MongoDB 集群連線字串密碼若包含特殊符號如@,需要轉換為 URL Encoding
    database 指定監聽異動的 database 名稱未指定 database 及 collection 時監聽範圍為整個集群
    collection 指定監聽異動的 collection 名稱,此參數只支援單一 collection未指定 collection 時監聽範圍為整個指定 database
    topic.prefix 指定目標 kafka topic 的前綴字串
    topic.suffix 指定目標 kafka topic 的後綴字串
    copy.existing 設定是否複製 collection 內既有資料,預設為 false
    task.max 此 Connector 可以使用的 Worker Task 上限,預設為 1

    我們需要先建立 Kafka Topic,才能讓 Connector 透過 Broker 寫入

    ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic src.mydb.mycoll --replication-factor 3 --partition 1

    replication-factor: 一共會有幾份相同的 topic,建議 production 設定為 3

    確認剛才建立的 topic 資訊

    ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic src.mydb.mycoll

    之後便可透過以下指令啟動 Source Connector

    curl -X POST <http://localhost:8083/connectors> -H "Accept:application/json" -H "Content-Type:application/json" -d @mongo-src-connector.json

    觀察 Kafka Topic 內容

    我們先來確認 Source Connector 有正常啟動(無錯誤訊息且狀態為 RUNNING),以及表格資料是否如預期寫入 Kafka topic。

    檢查 Connector 狀態

    curl <http://localhost:8083/connectors/mongo-src-connector/status>

    列出所有 Kafka topics

    ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

    查看 Kafka topic 內資料

    ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic src.mydb.mycoll

    新增 Sink Connector

    mongo-sink-connector.json
    {   
    "name": "mongo-sink-connector",   
    "config": {        
    "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",        
    "connection.uri": "mongodb://admin:password@mymongosink1:27017,mymongosink2:27017,mymongosink3:27017",
    "topics": "src.mydb.mycoll",        
    "database": "sinkdb",        
    "collection": "sinkcoll",        
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",        "key.converter.schemas.enable": "false",        
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",        "value.converter.schemas.enable": "false",        
    "change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler",        
    "tasks.max": "1"    }
    }

    接著便可透過以下指令啟動 Sink Connector

    curl -X POST <http://localhost:8083/connectors> -H "Accept:application/json" -H "Content-Type:application/json" -d @mongo-sink-connector.json

    觀察結果

    在確認 Sink Connector 正常啟動後,同步工作至此已經架設完成。您可以參考以下指令,在先前建立的 connector 工作持續運作的狀態下,連線到來源端 MongoDB 進行操作。透過另一個連線,在目的端 MongoDB 表格結果是否與來源端一致。

      • 連線至 MongoDB

    mongosh mongodb://admin:password@mymongosink1:27017,mymongosink2:27017,mymongosink3:27017

      • 自來源端表格新增一筆資料

    use mydb
    db.mycoll.insertOne({ name: "TestField" });

    Output:

    { acknowledged: true,
      insertedId: ObjectId("63686963a37df5cdbf7fec9d") }

    在目標資料庫的表格 sinkdb.sinkcoll 會看到新增了一筆連 ObjectId 都與來源端一模一樣的資料

      • 自來源端表格更新一筆資料

    db.mycoll.updateOne({name: "TestField"}, {$set:{newField: true}});

    Output:

    { acknowledged: true,
      insertedId: null,
      matchedCount: 1,
      modifiedCount: 1,
      upsertedCount: 0 }

    在目標資料庫的表格會看到這筆資料多了一個欄位 newField

      • 自來源端表格刪除一筆資料

    db.mycoll.deleteOne({name: "TestField", newField: true})

    Output:

    { acknowledged: true, deletedCount: 1 }

    在目標資料庫的表格會看到這筆資料也跟著被刪除

    收尾

    如果要結束測試,一樣可以透過 curl 呼叫 API 來停止 connector 工作。

    curl -X DELETE <http://localhost:8083/connectors/mongo-src-connector>
    curl -X DELETE <http://localhost:8083/connectors/mongo-sink-connector>

    結語

    本文中我們示範了如何自行架設 Kafka 環境,以及簡單驗證資料庫間利用 CDC 機制同步資料的功能。透過開源的 CDC 同步方案,我們可以更有效地進行巨量資料庫間資料的複製與轉移,並確保資料的一致性。

    參考資料

      1. MongoDB Kafka Connector 官方文件



    延伸閱讀 :

    Related Posts