Kafka: Hướng dẫn phỏng vấn dưới góc nhìn của một DEV chuẩn bị layoff.

Nội dung bài viết

Video học lập trình mỗi ngày

Nếu bạn vẫn còn dò đường, hãy đi chung con đường với tôi được không: Trên con đường kỹ sư phần mềm

Notes: Video về các thực hành MQ với Backend

Gần đây, hiện tại và sắp tới sau khi kỳ tết 2025 thì chuần bị là mùa cao điểm của việc nhảy việc, và nhiều anh em đã gửi các câu hỏi or đã hỏi tôi về các câu hỏi của các stask ví dụ như phỏng vấn của NVIDIA.

Tôi đã sắp xếp tất cả các câu hỏi và câu trả lời thực tế mà tôi đã đọc và trong các cuộc phỏng vấn của mình trước đó, và tôi cũng không ngoại lệ cho cuộc phỏng vấn sắp tới. Vì vậy những gì tôi ôn lại bạn cũng nên ôn lại.

Hãy đi từ đơn giản đến phức tạp và hãy hiểu công cụ đó là gì? Vì sao lại cần nó và sau đó mới tìm hiểu cách sử dụng nó. Hãy nhớ là vậy..

Tôi bổ sung thêm code cho anh em go và java dễ hình dung, tốt hơn hết xem hai video JAVA 12, và GO 27. Chiêm nghiệm thêm phần đồng bộ dữ liệu cao cấp sử dụng kafka

1. Message Queue là gì?

Trả lời chung chung:

Message Queue (phần mềm trung gian nhắn tin) là một hệ thống phần mềm hỗ trợ, dựa trên công nghệ hàng đợi (queue) và truyền tin nhắn (message passing), cung cấp khả năng truyền tin nhắn đồng bộ hoặc không đồng bộ, đáng tin cậy giữa các ứng dụng trong môi trường mạng.

Nó tận dụng cơ chế truyền tin nhắn hiệu quả và tin cậy để trao đổi dữ liệu độc lập với nền tảng (platform-independent) và tích hợp các hệ thống phân tán dựa trên giao tiếp dữ liệu. Bằng cách cung cấp mô hình truyền tin nhắn và xếp hàng tin nhắn, nó có thể mở rộng giao tiếp giữa các tiến trình trong môi trường phân tán.

Góc nhìn của Anh CAO CHÍ LUỸ (Trong phim 'Upstream'):

  • MQ giúp giải quyết các vấn đề về kết nối giữa các hệ thống, giảm sự phụ thuộc lẫn nhau (loose coupling), tăng khả năng mở rộng (scalability) và khả năng chịu lỗi (fault tolerance) của hệ thống.

  • Một số MQ phổ biến khác ngoài Kafka: RabbitMQ, ActiveMQ, Redis (Pub/Sub),... Mỗi loại có ưu nhược điểm riêng, phù hợp với các bài toán khác nhau.

  • Việc lựa chọn MQ nào phụ thuộc vào yêu cầu cụ thể của hệ thống: lưu lượng tin nhắn, độ trễ, độ tin cậy, thứ tự tin nhắn,.

2. Kafka là gì? Nó có tác dụng gì?

Kafka là một nền tảng xử lý luồng (streaming platform) phân tán, được sử dụng rộng rãi nhờ các đặc tính như:

Throughput cao (High-throughput): Xử lý lượng lớn dữ liệu một cách nhanh chóng.

Khả năng lưu trữ lâu dài (Persistence): Lưu trữ tin nhắn trên đĩa, đảm bảo dữ liệu không bị mất.

Khả năng mở rộng theo chiều ngang (Horizontal scalability): Dễ dàng mở rộng bằng cách thêm các máy chủ (broker) mới.

Hỗ trợ xử lý luồng dữ liệu (Stream data processing): Cung cấp các công cụ để xử lý dữ liệu theo thời gian thực.

Vai trò chính của Kafka thể hiện qua ba điểm:

Hệ thống nhắn tin (Messaging system): Giống như các Message Queue truyền thống, Kafka cung cấp các tính năng như: decoupling hệ thống, lưu trữ dự phòng (redundancy), giảm tải lưu lượng (traffic shaping), bộ đệm (buffering), giao tiếp không đồng bộ (asynchronous communication), khả năng mở rộng (scalability) và khả năng phục hồi (recoverability). Ngoài ra, Kafka còn đảm bảo thứ tự tin nhắn và khả năng đọc lại tin nhắn (replay), điều mà hầu hết các hệ thống nhắn tin khác khó thực hiện được.

Hệ thống lưu trữ (Storage system): Kafka lưu trữ tin nhắn trên đĩa, giúp giảm thiểu rủi ro mất dữ liệu so với các hệ thống dựa trên bộ nhớ. Nhờ vào cơ chế lưu trữ lâu dài (persistence) và sao chép dữ liệu (replication), Kafka có thể được sử dụng như một hệ thống lưu trữ dài hạn, bằng cách thiết lập chính sách lưu giữ dữ liệu là "vĩnh viễn" hoặc kích hoạt tính năng nén log (log compaction) cho chủ đề.

Nền tảng xử lý luồng (Stream processing platform): Kafka cung cấp nguồn dữ liệu tin cậy cho các framework xử lý luồng phổ biến. Nó cũng cung cấp một framework xử lý luồng hoàn chỉnh, bao gồm các hoạt động khác nhau như windowing, joining, transformation và aggregation.

Nếu thêm được nữa thì cố gắng thêm sau nữa anh em:

  • Kafka thường được sử dụng trong các hệ thống xử lý dữ liệu lớn (big data), phân tích thời gian thực (real-time analytics), thu thập log, giám sát hệ thống (monitoring),...

  • Kafka Streams là một thư viện mạnh mẽ, nhưng cũng có thể kết hợp Kafka với các framework xử lý luồng khác như Apache Spark, Apache Flink,...

Tôi bổ sung thêm code cho anh em go và java dễ hình dung, tốt hơn hết xem hai video JAVA 12, và GO 27. Chiêm nghiệm thêm phần đồng bộ dữ liệu cao cấp sử dụng kafka

Với java

// with Producer
Properties props = new Properties();
props.put("shopdev.server", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("my-topic", "key", "value"));
producer.close();

// with Consumer
Properties props = new Properties();
props.put("shopdev.server", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
}

Với Go:

// Producer
import "github.com/segmentio/kafka-go"

writer := &kafka.Writer{
    Addr:     kafka.TCP("localhost:9092"),
    Topic:    "my-topic",
    Balancer: &kafka.LeastBytes{},
}

err := writer.WriteMessages(context.Background(),
    kafka.Message{
        Key:   []byte("Key-A"),
        Value: []byte("Hello World!"),
    },
)
if err != nil {
    log.Fatal("failed to write messages:", err)
}

if err := writer.Close(); err != nil {
    log.Fatal("failed to close writer:", err)
}

// Consumer
reader := kafka.NewReader(kafka.ReaderConfig{
    Brokers:   []string{"localhost:9092"},
    GroupID:   "my-group",
    Topic:     "my-topic",
    MinBytes:  10e3, // 10KB
    MaxBytes:  10e6, // 10MB
})

for {
    m, err := reader.ReadMessage(context.Background())
    if err != nil {
        break
    }
    fmt.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
}

if err := reader.Close(); err != nil {
    log.Fatal("failed to close reader:", err)
}

3. Kiến trúc của Kafka như thế nào?

Một kiến trúc Kafka điển hình bao gồm:

Producer: Gửi tin nhắn đến Broker.

Consumer: Nhận tin nhắn từ Broker.

Zookeeper cluster: Quản lý và điều phối các Broker (từ phiên bản 2.8.0, Zookeeper có thể được thay thế bằng KRaft để tự quản lý cluster).

Các khái niệm cơ bản trong Kafka:

Producer: Gửi tin nhắn đến Broker.

Consumer: Nhận tin nhắn từ Broker.

Consumer Group: Nhóm các Consumer. Mỗi Consumer trong nhóm chịu trách nhiệm tiêu thụ dữ liệu từ các partition khác nhau. Một partition chỉ có thể được tiêu thụ bởi một Consumer trong một nhóm. Các Consumer Group hoạt động độc lập với nhau. Tất cả các Consumer đều thuộc về một Consumer Group nhất định (nhóm Consumer là một subscriber logic).

Broker: Một nút (node) Kafka hoặc một instance Kafka. Nếu chỉ có một instance Kafka được triển khai trên một máy chủ, thì Broker cũng có thể được coi là một máy chủ Kafka.

Topic: Một khái niệm logic, chứa nhiều Partition. Các tin nhắn trong các Partition khác nhau của cùng một Topic là không giống nhau.

Partition: Để tăng khả năng mở rộng, một Topic lớn có thể được phân phối trên nhiều Broker. Một Topic có thể được chia thành nhiều Partition, mỗi Partition là một hàng đợi có thứ tự.

Replica: Bản sao. Các bản sao khác nhau của cùng một Partition lưu trữ cùng một tin nhắn. Để đảm bảo rằng dữ liệu Partition trên một node không bị mất khi node đó gặp sự cố và Kafka vẫn có thể tiếp tục hoạt động, Kafka cung cấp cơ chế sao chép. Mỗi Partition của một Topic có một số bản sao, một Leader và một số Follower.

Leader: "Bản sao chính" của một Partition. Producer và Consumer chỉ tương tác với Leader.

Follower: "Bản sao phụ" của một Partition. Đồng bộ dữ liệu từ Leader theo thời gian thực, duy trì sự đồng bộ dữ liệu với Leader. Khi Leader gặp sự cố, một Follower mới sẽ được bầu làm Leader để cung cấp dịch vụ.

Cố gắng thêm nếu có thể:

  • Hiểu rõ kiến trúc của Kafka giúp tối ưu hiệu suất và cấu hình hệ thống phù hợp với nhu cầu.

  • Việc lựa chọn số lượng partitions, replicas, cách thức phân phối dữ liệu,... ảnh hưởng rất lớn đến hiệu năng và độ tin cậy của hệ thống.

  • KRaft (Kafka Raft) là chế độ Metadata mới cho Kafka, loại bỏ sự phụ thuộc vào ZooKeeper, giúp đơn giản hóa việc triển khai và vận hành Kafka.

4. Replica của Kafka được quản lý như thế nào?

AR (Assigned Replicas): Tất cả các Replica của một Partition.

ISR (In-Sync Replicas): Tất cả các Replica đồng bộ với Leader (bao gồm cả Leader).

OSR (Out-of-Sync Replicas): Các Replica đồng bộ chậm hơn so với Leader.

Leader duy trì và theo dõi trạng thái độ trễ của tất cả các Follower trong ISR. Khi một Follower bị tụt lại quá xa, nó sẽ bị chuyển sang OSR. Khi một Follower bắt kịp tiến độ của Leader, nó sẽ được chuyển lại vào ISR.

Mặc định, chỉ các Replica trong ISR mới đủ điều kiện để trở thành Leader.

Cố gắng thêm nếu anh em có thể:

  • Chỉ các replicas trong ISR mới đủ điều kiện để được bầu chọn làm Leader mới nếu Leader hiện tại gặp sự cố.

  • Cấu hình replica.lag.time.max.ms và min.insync.replicas ảnh hưởng đến tính nhất quán và độ sẵn sàng của hệ thống.

  • Việc theo dõi và giám sát các metrics liên quan đến replication (như under-replicated partitions, ISR shrink/expansion rate,...) rất quan trọng để đảm bảo sức khỏe của hệ thống Kafka.

5. Làm thế nào để xác định tin nhắn nào có thể đọc được?

Partition tương đương với một tệp log. Trước tiên, hãy xem qua một số khái niệm:

Ví dụ (Hình 3):

Giả sử có 7 tin nhắn, với offset (độ lệch tin nhắn) từ 0 đến 6.

0 đại diện cho sự bắt đầu của tệp log.

HW (High Watermark) là 4. Offset từ 0 đến 3 đại diện cho phạm vi có thể tiêu thụ của tệp log. Consumer chỉ có thể tiêu thụ bốn tin nhắn này.

LEO (Log End Offset) đại diện cho offset của tin nhắn sắp được ghi vào.

Mỗi Replica trong ISR của Partition duy trì LEO của riêng mình. LEO nhỏ nhất trong ISR là HW của Partition.

Ví dụ (Hình 4):

Ba Replica của Partition đều nằm trong ISR. LEO nhỏ nhất là 3, vì vậy HW của Partition là 3. Do đó, Partition hiện tại chỉ có thể tiêu thụ ba tin nhắn từ 0 đến 2.

Cố gắng bổ sung thêm nha anh em:

  • Hiểu rõ cách thức hoạt động của HW và LEO giúp đảm bảo tính nhất quán dữ liệu và tránh việc đọc dữ liệu chưa được replicated.

  • Consumer offset được lưu trữ trong một topic đặc biệt là __consumer_offsets.

  • Các cơ chế commit offset (auto commit, manual commit) ảnh hưởng đến việc xử lý tin nhắn "at least once" hay "exactly once".

  • Việc giám sát consumer lag (khoảng cách giữa offset hiện tại của consumer và HW) rất quan trọng để đảm bảo consumer không bị tụt hậu quá xa.

  • Consumer group đóng vai trò quan trọng trong việc quản lý offset.

Ví dụ sử dụng ngôn ngữ Go và Java:

Go:

package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "os/signal"
    "sync"
    "time"

    "github.com/segmentio/kafka-go"
)

const (
    topic         = "my-topic"
    brokerAddress = "localhost:9092"
)

func main() {
    // Tạo một context có thể bị hủy
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // Thiết lập lắng nghe tín hiệu ngắt
    sigchan := make(chan os.Signal, 1)
    signal.Notify(sigchan, os.Interrupt)

    // Tạo producer
    producer := newKafkaWriter(brokerAddress, topic)
    defer producer.Close()

    // Tạo consumer group
    groupID := "my-group"
    reader := newKafkaReader(brokerAddress, topic, groupID)
    defer reader.Close()

    // Bắt đầu goroutine cho producer
    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        defer wg.Done()
        produceMessages(ctx, producer)
    }()

    // Bắt đầu goroutine cho consumer
    wg.Add(1)
    go func() {
        defer wg.Done()
        consumeMessages(ctx, reader)
    }()

    // Chờ tín hiệu ngắt hoặc context bị hủy
    select {
    case <-sigchan:
        log.Println("Nhận được tín hiệu ngắt, đang dừng...")
        cancel()
    case <-ctx.Done():
        log.Println("Context bị hủy, đang dừng...")
    }

    // Chờ các goroutine hoàn thành
    wg.Wait()
    log.Println("Đã dừng.")
}

func newKafkaWriter(kafkaURL, topic string) *kafka.Writer {
    return &kafka.Writer{
        Addr:     kafka.TCP(kafkaURL),
        Topic:    topic,
        Balancer: &kafka.LeastBytes{},
    }
}

func newKafkaReader(kafkaURL, topic, groupID string) *kafka.Reader {
    return kafka.NewReader(kafka.ReaderConfig{
        Brokers:  []string{kafkaURL},
        GroupID:  groupID,
        Topic:    topic,
        MinBytes: 10e3, // 10KB
        MaxBytes: 10e6, // 10MB
    })
}

func produceMessages(ctx context.Context, w *kafka.Writer) {
    for i := 0; ; i++ {
        select {
        case <-ctx.Done():
            return
        default:
            msg := kafka.Message{
                Key:   []byte(fmt.Sprintf("Key-%d", i)),
                Value: []byte(fmt.Sprintf("Value-%d", i)),
            }
            err := w.WriteMessages(ctx, msg)
            if err != nil {
                log.Printf("Lỗi khi ghi tin nhắn: %v", err)
            } else {
                log.Printf("Đã gửi tin nhắn: %s", msg.Value)
            }
            time.Sleep(time.Second)
        }
    }
}

func consumeMessages(ctx context.Context, r *kafka.Reader) {
    for {
        select {
        case <-ctx.Done():
            return
        default:
            m, err := r.ReadMessage(ctx)
            if err != nil {
                log.Printf("Lỗi khi đọc tin nhắn: %v", err)
                continue
            }
            fmt.Printf("Đã nhận tin nhắn tại offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
            // Ở đây bạn có thể xử lý tin nhắn
        }
    }
}

Java:


import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaExample {

    private final static String TOPIC = "my-topic";
    private final static String BOOTSTRAP_SERVERS = "localhost:9092";

    public static void main(String[] args) throws InterruptedException {
        // Tạo producer
        Producer<String, String> producer = createProducer();

        // Tạo consumer
        Consumer<String, String> consumer = createConsumer();
        consumer.subscribe(Collections.singletonList(TOPIC));

        // Luồng cho producer
        Thread producerThread = new Thread(() -> {
            for (int i = 0; i < 100; i++) {
                ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "Key-" + i, "Value-" + i);
                try {
                    producer.send(record).get();
                    System.out.println("Đã gửi tin nhắn: " + record.value());
                    Thread.sleep(1000);
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            }
            producer.close();
        });

        // Luồng cho consumer
        Thread consumerThread = new Thread(() -> {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Đã nhận tin nhắn: offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                    // Xử lý tin nhắn ở đây
                }
            }
        });

        producerThread.start();
        consumerThread.start();

        producerThread.join();
        // consumerThread.join(); // Consumer thread sẽ chạy vô hạn, hãy dừng nó bằng cách khác (ví dụ: Ctrl+C)
    }

    private static Producer<String, String> createProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        return new KafkaProducer<>(props);
    }

    private static Consumer<String, String> createConsumer() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Đọc từ đầu topic nếu chưa có offset
        return new KafkaConsumer<>(props);
    }
}

Còn 20 câu nữa, hãy chờ tôi ở Member...

Có thể bạn đã bị missing