Hướng dẫn đồng bộ dữ liệu Mysql to Kafka sử dụng Debezium với N Tables tốc độ REALTIME

Nội dung bài viết

Video học lập trình mỗi ngày

Trong hệ thống hiện đại, việc đồng bộ dữ liệu giữa các thành phần khác nhau là vô cùng quan trọng để đảm bảo tính nhất quán và hiệu suất của hệ thống. Một trong những trường hợp phổ biến là việc đồng bộ dữ liệu từ cơ sở dữ liệu MySQL sang Kafka - một hệ thống xử lý dòng dữ liệu phân tán mạnh mẽ. Kafka không chỉ giúp lưu trữ và xử lý dữ liệu theo thời gian thực mà còn hỗ trợ khả năng mở rộng và quản lý dữ liệu dễ dàng.

Debezium là gì?

Debezium là một công cụ mạnh mẽ cho phép chúng ta theo dõi các thay đổi trong cơ sở dữ liệu MySQL và phát những thay đổi đó dưới dạng các sự kiện vào Kafka. Điều này cho phép dữ liệu từ nhiều bảng trong MySQL có thể được đồng bộ một cách liên tục và thời gian thực với Kafka, từ đó các dịch vụ khác có thể tiêu thụ và xử lý dữ liệu này mà không cần phải truy vấn trực tiếp từ MySQL.

Trong bài viết này sẽ cung cấp các câu lệnh docker và hướng dẫn chi tiết cách thiết lập và sử dụng Debezium để đồng bộ dữ liệu từ MySQL sang Kafka với nhiều bảng (N Tables) và đạt được tốc độ xử lý thời gian thực (REALTIME). Bạn sẽ tìm hiểu cách cài đặt, cấu hình Debezium và cách xử lý dữ liệu thay đổi trong Kafka một cách hiệu quả.

Triển khai Debezium với MySQL và Kafka

Chuẩn bị file Docker-compose;

version: '3'
services:
  zookeeper:
    image: quay.io/debezium/zookeeper:${DEBEZIUM_VERSION}
    container_name: demo_zookeeper
    ports:
     - 2181:2181
     - 2888:2888
     - 3888:3888
  kafka:
    image: quay.io/debezium/kafka:${DEBEZIUM_VERSION}
    container_name: demo_kafka
    ports:
     - 9092:9092
    links:
     - zookeeper
    environment:
     - ZOOKEEPER_CONNECT=zookeeper:2181
  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: demo_kafka-ui
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: demo_kafka:9092
    ports:
      - "9089:8080"
    links:
      - kafka
  mysql:
    image: mysql:latest
    container_name: demo_mysql_container
    environment:
      MYSQL_ROOT_PASSWORD: root1234
      MYSQL_DATABASE: shopdevgo
      MYSQL_USER: crm_user
      MYSQL_PASSWORD: root1234
    ports:
      - "30306:3306"
  redis:
    image: redis:7.0
    container_name: demo_redis_container
    ports:
      - "6379:6379"   
  connect:
    image: quay.io/debezium/connect:${DEBEZIUM_VERSION}
    container_name: demo_connect
    ports:
     - 8083:8083
    links:
     - kafka
     - mysql
    environment:
     - BOOTSTRAP_SERVERS=kafka:9092
     - GROUP_ID=1
     - CONFIG_STORAGE_TOPIC=my_connect_configs
     - OFFSET_STORAGE_TOPIC=my_connect_offsets
     - STATUS_STORAGE_TOPIC=my_connect_statuses

Tiếp theo chuản bị file register-mysql.json:

{
    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "dbz",
        "database.server.id": "184054",
        "topic.prefix": "dbserver1",
        "database.include.list": "inventory",
        "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
        "schema.history.internal.kafka.topic": "schema-changes.inventory"
    }
}

Lưu ý: Hãy tạo connect Mysql cho đúng authentication. Nếu chưa quen quy trình vui lòng xem lại Hướng dẫn toàn diện về đồng bộ dữ liệu Mysql to Kafka sử dụng Debezium với N Tables ở đó mọi câu lệnh sẽ giúp bạn trở nên mạnh mẽ..

Có thể bạn đã bị missing