Kafka detailed tutorial

[Only 9.9 subscriptions to the column collection , the author can read all paid articles]

Recommended [ Kafka Tutorial ] https://bigbird.blog.csdn.net/article/details/108770504
Recommended [ rabbitmq Tutorial ] https://bigbird.blog.csdn.net/article/details/81436980
Recommended [ Flink Tutorial ] https ://http://www.ifindbug.com/doc/id-56856/name-flink-tutorial-1-cluster-installation-and-deployment.html
Recommended [ SpringBoot Tutorial ] https://blog.csdn.net/hellozpc/article/details/107095951
Recommended [ SpringCloud Tutorial ] https: //blog.csdn .net/hellozpc/article/details/83692496
Recommended [ Mybatis Tutorial ] https://blog.csdn.net/hellozpc/article/details/80878563
Recommended [ SnowFlake Tutorial ] https://blog.csdn.net/hellozpc/article /details/108248227
Recommended [ Concurrent Current Limiting Tutorial ] http://www.ifindbug.com/doc/id-56860/name-guava-ratelimiter-current-limiter-2.html
Recommended [ JVM Interview and Tuning Tutorial ] https://bigbird.blog.csdn.net/article/details/113888604

Kafka detailed tutorial

For the complete tutorial , please subscribe to the column tutorial " rabbitmq/kafka combat tutorial " https://blog.csdn.net/zpcandzhj/category_10152842.html

kafka overview

Kafka was originally developed by Linkedin, a distributed message publish-subscribe system based on Scala and Java languages, and has been donated to the Apache Software Foundation . Kafka is best known for its existence as a message queue (mq) system, and in fact Kafka has become a popular distributed stream processing platform. It has the characteristics of high throughput and low latency, and many big data processing systems such as storm, spark, flink , etc. can be well integrated with it. According to Wikipedia, Kafka's core data structure is essentially a "large-scale publish/subscribe message queue according to a distributed transaction log architecture". In general, kafka usually has three roles:

  • message system

    Kafka is similar to traditional message queues such as RabbitMQ, RocketMQ, and ActiveMQ, and supports core functions such as traffic cutting, service decoupling, and asynchronous communication.

  • Stream Processing Platform

    Kafka not only integrates perfectly with most stream computing frameworks , but also provides a complete stream processing library, namely kafka Streaming. Kafka Streaming provides functions like windowing, aggregation, transformation, and connection in Flink .

  • Storage System

    Usually message queues will persist messages to disk to prevent message loss and ensure message reliability. Kafka's message persistence mechanism and multi-copy mechanism enable it to be used as a general-purpose data storage system.

In one sentence: Kafka is a distributed message queue (Message Queue) based on the publish/subscribe model, which is mainly used in the field of real-time big data processing in the industry.

Kafka Architecture

As shown in the figure, the architecture of Kafka usually includes multiple Producers (producers), multiple Consumers (consumers), multiple Brokers (Kafka servers), and a zookeeper cluster.
insert image description here

Several roles in the architecture

  • Producer

    The message sender, the producer, is responsible for producing messages and sending them to the kafka server (broker).

  • Consumer

    The message receiver, the consumer, is responsible for consuming the message. The consumer client actively pulls the message from the kafka server, and the application performs business processing.

  • Broker

    The kafka service instance, that is, the kafka server, allows the producer client and the consumer client to connect, which can be regarded as a transfer station for messages. Multiple Brokers will form a Kafka cluster.

  • zookeeper

    ZooKeeper is a distributed coordinator responsible for managing cluster metadata, controller election and other operations in a Kafka cluster.

Sections and Topics

Topic (topic) and Partition (partition) are two core concepts in Kafka. In Kafka, messages are categorized by topic. The producer must send the message to the specified topic, that is, each message sent to the Kafka cluster must specify a topic; the consumer consumes the message must also specify the topic, that is, the consumer is responsible for subscribing to the topic and consuming it.

Imagine that if a topic only corresponds to one storage file in Kafka, then in the massive data scenario, the I/O of the machine where the file is located will become the performance bottleneck of the topic, and partitioning solves this problem. In Kafka, a topic can be divided into multiple partitions, and each partition is usually stored on different machines in a distributed manner. A particular partition belongs to only one topic. Kafka is often used to process very large-scale data, so multiple partitions can be specified immediately when creating a topic to improve processing performance. Of course, you can also modify the number of partitions after the creation is complete. Different partitions of the same topic contain different messages. On the underlying storage, each partition corresponds to an append-write log file. When a message is appended to the partition log file, a specific offset (offset) is assigned. The offset is the unique identifier of the message in the partition. Kafka uses the offset To ensure the ordering of messages within the partition. The offset does not span partitions, that is, Kafka guarantees partition ordering rather than global ordering.
The following figure shows the append writing of messages:insert image description here

Partitions in Kafka can be distributed on different servers (brokers), so topics can span multiple brokers by partitioning. Compared with a single broker and a single partition, the parallelism is increased, and the performance is improved a lot.

Under the partition, Kafka introduces the concept of replica (Replica). If increasing the number of partitions achieves horizontal expansion, increasing the number of replicas achieves vertical expansion and improves disaster tolerance. The messages held in different replicas of the same partition are identical. It is important to note that replicas are not identical at the same time due to delays in synchronization. There is a master-slave relationship between replicas. The leader replica is responsible for processing read and write requests, and the follower replica is only responsible for synchronizing messages with the leader replica. The replicas exist in different brokers. When the leader replica fails, a new leader replica is re-elected from the follower replica to provide read and write services to the outside world. Kafka realizes the automatic transfer of faults through the multi-copy mechanism. When a broker in the Kafka cluster hangs, the copy mechanism ensures that the partition data on the node is not lost, and the kafka service is still available.

The figure below shows a multi-replica architecture. In this example, there are 4 brokers in the Kafka cluster, the number of topic partitions is 3, and the replication factor is also 3. The producer and consumer clients only interact with the leader replica, and the follower replica is only responsible for synchronizing messages with the leader. Each partition exists in a different broker. If each broker is deployed on a separate machine, then different Partitions and their replicas are physically isolated.
insert image description here

It can be considered that topic is a logical concept, and partition is a physical concept, because each partition corresponds to a .log file stored in the log directory of kafka, and the log file stores the data produced by the producer. The data produced by the Producer will be continuously appended to the end of the log file, and each piece of data has its own offset (offset bit). Each consumer in the consumer group submits an offset to the kafka server every time the data is consumed, so that the consumption can continue from the last position when the error is recovered.

Consumer Group is a unique concept in Kafka's consumption concept, and each consumer belongs to a consumer group. After a producer's message is published to a topic, it will only be delivered to one consumer in each consumer group subscribed to the topic. Each consumer in the consumer group is responsible for consuming data from different partitions, and a partition can only be consumed by one consumer in the group; all consumers have a corresponding consumer group, that is, the consumer group is logical a subscriber. Consumer groups do not affect each other, and multiple different consumer groups can subscribe to a topic at the same time. At this time, messages will be consumed by one consumer in each consumer group at the same time.

Understanding the above concepts helps to plan the number of topic partitions, consumers, and producers in practical applications. In actual production, the general number of partitions and the number of consumers are kept equal. If the number of consumers of this topic is greater than the number of partitions of the topic, the additional consumers will not be able to consume data, and will only waste system resources.

kafka file storage mechanism

insert image description here

As mentioned above, the messages produced by the producer will be continuously appended to the end of the log file. In order to prevent the log file from being too large and causing inefficient data location, Kafka adopts the sharding and indexing mechanism to divide each partition into multiple segments. Each segment has a corresponding .index file, .log file and .timeindex file. These files are located in a folder under the directory specified by the configuration item log.dirs in the kafka configuration file server.properties. The naming rule of the folder is: topic name + partition serial number. For example, if the topic test has three partitions, the corresponding folders test-0, test-1, and test-2 will be created. The index and log format files under each file are as follows. The index and log files are named after the offset of the first message of the current segment.

00000000000000000000.index
0000000000000000000.log
00000000000000000000.timeindex
00000000000000130610.index
00000000000000130610.log012000000000000000000000000000000000000000000000000000000
_

The two index files corresponding to the log segment file are mainly used to improve the speed of finding messages. The offset index establishes the mapping relationship between the message offset (offset) and the physical address; the timestamp index is convenient for searching the offset information according to the specified timestamp. Each time a certain amount of messages (specified by the kafka configuration file parameter log.index.interval.bytes, the default value is 4096, that is, 4KB) is written, the offset index file and the timestamp index file add an offset index item and Timestamp index entry. The density of index entries can be increased or decreased by configuring the value of log.index.interval.bytes.

One of the reasons for Kafka's efficient read and write is the use of sequential write disk technology and zero-copy technology.

1) Sequential write disk

Traditional messaging middleware such as RabbitMQ uses memory as the default storage medium and disk as an alternative medium to achieve high throughput and low latency. In fact, studies have shown that the same disk, the sequential write speed can reach 600MB/s, while the random write speed is only 100K/s. This is related to the mechanical properties of the disk, and sequential writing saves a lot of head addressing time. So sequential writes to disk are even faster than random writes to memory. Therefore, Kafka uses file appending in its design to sequentially write messages to disk. In addition, Kafka also makes full use of the disk page cache to reduce disk IO.

2) Zero copy

Zero-copy refers to copying data directly from the disk to the NIC device without going through the application. Zero copy greatly improves application performance and reduces system context switching between kernel mode and user mode. Zero-Copy technology is also used in frameworks such as netty to improve IO performance. In Linux, zero-copy relies on the underlying sendfile() function of the operating system. In fact, the underlying implementation of the FileChannel.transferTo() method in JDK relies on the sendfile() function.

For example, if the server wants to pass local files to the client, the two different technical processes are as follows:

Traditional non-zero copy technology
insert image description here

First, call the read() system function to copy the file in the disk to the Read Buffer in the kernel mode, and then copy the kernel mode data to the user mode under the control of the CPU. Then call the system function write() to copy the data in user mode to the Socket Buffer in kernel mode. Finally, the data in the Socket Buffer in the kernel state is copied to the hardware network card device for transmission. In the above process, the data "waves" from the kernel state to the user state in vain, that is, 2 copy operations and 4 context switches between the kernel state and the user state. Let's take a look at how zero-copy is handled.

zero-copy technology
insert image description here

The zero-copy technology uses the DMA (Direct Memory Access) technology supported by the operating system to copy the file content to the ReadBuffer in kernel mode. Data is not copied to the Socket Buffer. Only file descriptors containing information about the location and length of the data are passed into the Socket Buffer. Data is directly transferred from the kernel mode to the network card peripherals, and the context switching between the kernel and user mode of the operating system is only 2 times, and the data replication is also reduced.

kafka installation and configuration

stand-alone

Convention: All the software in this article is installed under /usr/local/myapp/, and it is recommended to use the root user for all operations on linux to avoid unnecessary troubles!

jdk installation

Download the linux system jdk compressed package from the official website . This article uses jdk-8u261-linux-x64.tar.gz, download it locally and upload it to the linux server.

  • View and uninstall the system's own jdk

    [[email protected] ~]# java -version
    openjdk version “1.8.0_242”
    OpenJDK Runtime Environment (build 1.8.0_242-b08)
    OpenJDK 64-Bit Server VM (build 25.242-b08, mixed mode)

    It is found that it is the built-in openjdk, query and uninstall

    [[email protected] ~]# rpm -qa |grep jdk
    java-1.8.0-openjdk-headless-1.8.0.242.b08-1.el7.x86_64
    java-1.7.0-openjdk-1.7.0.251-2.6.21.1. el7.x86_64
    java-1.8.0-openjdk-1.8.0.242.b08-1.el7.x86_64
    java-1.7.0-openjdk-headless-1.7.0.251-2.6.21.1.el7.x86_64
    copy-jdk-configs-3.3 -10.el7_5.noarch

    Use the rpm -e --nodeps command to delete sequentially

    rpm -e --nodeps java-1.8.0-openjdk-headless-1.8.0.242.b08-1.el7.x86_64
    rpm -e --nodeps java-1.7.0-openjdk-1.7.0.251-2.6.21.1.el7 .x86_64
    rpm -e --nodeps java-1.8.0-openjdk-1.8.0.242.b08-1.el7.x86_64
    rpm -e --nodeps java-1.7.0-openjdk-headless-1.7.0.251-2.6.21.1 .el7.x86_64

  • Install the downloaded jdk

    Unzip the downloaded jdk archive to the local created directory

    [[email protected] ~]# tar -zxvf jdk-8u261-linux-x64.tar.gz -C /usr/local/myapp/jdk/

  • Configure environment variables

    Add the following configuration to the /etc/profile file

    [[email protected] ~]# vim /etc/profile

    export JAVA_HOME=/usr/local/myapp/jdk/jdk1.8.0_261/
    export CLASSPATH= : C L A S S P A T H : :CLASSPATH: :C L A S S P A T H:JAVA_HOME/lib/
    export PATH= P A T H : PATH: P A T H:JAVA_HOME/bin

    Indicates the home directory, you can also write the full path directly, for example: /home/zpc/soft/jdk/jdk1.8.0_261/

    Use the source command to make the configuration take effect immediately

    [[email protected] ~]# source /etc/profile

  • Check the jdk version again

    [[email protected] ~]# java -version
    java version “1.8.0_261”
    Java™ SE Runtime Environment (build 1.8.0_261-b12)
    Java HotSpot™ 64-Bit Server VM (build 25.261-b12, mixed mode)

    Successful installation!

zooKeeper installation

  • Download the zk installation package from the official website

    This article uses apache-zookeeper-3.6.1-bin.tar.gz

  • Unzip to local specified directory

    [[email protected] ~]# tar -zxvf apache-zookeeper-3.6.1-bin.tar.gz -C /usr/local/myapp/zookeeper

    Rename:

    [[email protected] ~]# mv /usr/local/myapp/zookeeper/apache-zookeeper-3.6.1-bin/ /usr/local/myapp/zookeeper/zookeeper-3.6.1

  • Environment variable configuration

    [[email protected] ~]# vim /etc/profile

    export ZOOKEEPER_HOME=/usr/local/myapp/zookeeper/zookeeper-3.6.1
    export PATH= P A T H : PATH: P A T H:ZOOKEEPER_HOME/bin

    Execute source /etc/profile to make the configuration take effect

  • Modify the zk configuration file

    Enter the zk configuration directory zookeeper-3.6.1/conf to copy the configuration file template and modify it

    [[email protected] conf]# cp zoo_sample.cfg zoo.cfg

    Revisezoo.cfgThe files are as follows:

    #data directory

    dataDir=/usr/local/myapp/zookeeper/zookeeper-3.6.1/dataDir

    #Specify the transaction log directory separately

    dataDir=/usr/local/myapp/zookeeper/zookeeper-3.6.1/dataLogDir

    #zk External service port

    clientPort=2181

    #Basic time unit, and time-related configurations are multiples of this value; that is, the interval time between a single heartbeat of the zk server, in milliseconds

    tickTime=2000

    #The time limit for the zk server to connect to the leader when voting for the leader, initLimit*tickTime is the total timeout time

    initLimit=10

    #When zk is working normally, the maximum number of heartbeats between the leader and the follower is limited. syncLimit*tickTime is the total timeout tolerance time. After this time, the follower will be removed from the zk cluster

    syncLimit=5

    Note: The data and log file paths in the above configuration must first be created in the system

    [[email protected] conf]# mkdir -p /usr/local/myapp/zookeeper/zookeeper-3.6.1/dataDir
    [[email protected] conf]# mkdir -p /usr/local/myapp/zookeeper/zookeeper-3.6.1 /dataLogDir

    For the meaning of other configuration file parameters, please refer to the official website description

  • start zookeeper

    Since environment variables are configured, zkServer.sh start can be executed in any directory

    [[email protected] ~]# zkServer.sh start

    ZooKeeper JMX enabled by default
    Using config: /home/zpc/soft/apache-zookeeper-3.6.1/bin/…/conf/zoo.cfg
    Starting zookeeper … STARTED

    ​ The above output indicates that the startup was successful. If the initial installation fails, for example, an error is reported: Starting zookeeper … FAILED TO START

    It is very likely that the installation package was downloaded incorrectly. Someone downloaded apache-zookeeper-3.6.1.tar.gz, which should be apache-zookeeper-3.6.1-bin.tar.gz. The jar package of zk is required to start in stand-alone mode. Zookeeper-3.6.1.jar must be in the /lib/ directory after zk decompression.

  • View zk process and zk status

    [[email protected] ~]# jps
    6754 QuorumPeerMain
    7013 Jps

    [[email protected] ~]# zkServer.sh status
    ZooKeeper JMX enabled by default
    Using config: /usr/local/myapp/zookeeper/zookeeper-3.6.1/bin/…/conf/zoo.cfg
    Client port found: 2181. Client address: localhost.
    Mode: standalone

    close zookeeper

    [[email protected] ~]# zkServer.sh stop

    So far, zookeeper standalone mode has been successfully installed!

kafka installation

  • Download the installation package from the official website

    This article uses kafka_2.12-2.6.0.tgz

  • Unzip to local specified directory

    [[email protected] ~]# tar -zxvf kafka_2.12-2.6.0.tgz -C /usr/local/myapp/kafka

  • Environment variable configuration

    One of the purposes of configuring environment variables is to execute commands in the kafka bin directory in any directory.

    export KAFKA_HOME=/usr/local/myapp/kafka/kafka_2.12-2.6.0
    export PATH= P A T H : PATH: P A T H:KAFKA_HOME/bin

  • Modify the kafka configuration file

    The configuration file is in config/server.properties, mainly modify the following parameters

    #broker's globally unique id, generally numbered from 0, cannot be repeated

    broker.id=0

    #kafka provides a service listening address to the outside world, set the IP address of the machine running kafka, and the client uses this address to connect to kafka

    listeners=PLAINTEXT://192.168.174.129:9092

    #Log directory, multiple directories can be separated by commas (create a directory in the system first)

    log.dirs=/usr/local/myapp/kafka/kafka_2.12-2.6.0/log/kafka

    #Default number of partitions configuration, generally in the cluster configuration to set up multiple partitions to improve performance

    num.partitions=1

    #The default number of replicas when creating a topic. Generally, multiple replicas should be set in the cluster configuration to improve availability.

    default.replication.factor=1

    #zk server address configuration, generally stand-alone mode zk and kafka run on the same machine, just configure the ip of the machine where kafka is located

    zookeeper.connect=localhost:2181

    #Enable the function of deleting topic, the default is true

    delete.topic.enable=true

    #Number of threads used to handle disk I/O, default is 8

    num.io.threads=8

    #Number of threads used to process network requests, the default is 3

    num.network.threads=3

    #kafka log log retention time, default 7 days (168h)

    log.retention.hours=168

  • start kafka

    Zookeeper must be run before starting kafka, because kafka will connect to zookeeper

    [[email protected] ~]# zkServer.sh start

    [[email protected] ~]# jps
    52358 Jps
    52135 QuorumPeerMain

    Seeing the QuorumPeerMain process means that zk is successfully started

    Next, start kafka, specify the configuration file when starting kafka, & means background execution. Clear the kafka log file before the first run to prevent dirty data

    [[email protected] ~]# rm -rf /usr/local/myapp/kafka/kafka_2.12-2.6.0/log/kafka/*

    [[email protected] ~]# kafka-server-start.sh /usr/local/myapp/kafka/kafka_2.12-2.6.0/config/server.properties &

  • Check whether the kafka process starts successfully

    [[email protected] ~]# jps
    52135
    QuorumPeerMain 54827 Jps
    54399 Kafka

    Note that when closing kafka, use the command to close, do not force close it casually, close kafka first, and then close zookeeper

    [[email protected] ~]# kafka-server-stop.sh /usr/local/myapp/kafka/kafka_2.12-2.6.0/config/server.properties

    [[email protected] ~]# zkServer.sh stop

    The jps view process found that the zk and kafka processes have stopped

  • kafka start and stop scripts

    Encapsulate kafka and zookeeper commands into shell scripts

    [[email protected] ~]# vim kafka_start.sh

    #!/bin/bash #Start
    zookeeper
    zkServer.sh start & sleep 10 #Execute after 10 seconds #Start
    kafka
    kafka-server-start.sh /usr/local/myapp/kafka/kafka_2.12-2.6.0/config /server.properties &

    [[email protected] ~]# vim kafka_stop.sh

    #!/bin/bash #Stop
    kafka
    kafka-server-stop.sh /usr/local/myapp/kafka/kafka_2.12-2.6.0/config/server.properties & sleep 10 #Wait for 10 seconds to execute

    #Stop zookeeper
    zkServer.sh stop

    Due to the configuration of environment variables, the commands in the above script can not write absolute paths (full paths).

    Add execute permission to the script:

    chmod +x kafka_start.sh

    chmod +x kafka_stop.sh

    In this way, you can start and stop kafka with one click:

    [[email protected] ~]# ./kafka_start.sh

    [[email protected] ~]# ./kafka_stop.sh

Cluster environment

Machine planning: 3 servers, 1 leader, 2 followers, each machine is configured with domain name ip mapping in /etc/hosts

machine hostnameMachine IPmachine character
vm1192.168.174.129leader(master)
vm2192.168.174.131follower(slave)
vm3192.168.174.130follower(slave)

All of the following installations can be performed on the first machine, and then copied to other machines in the cluster for modification.

jdk installation

After installing the jdk on vm1, copy it to vm2 and vm3. For the installation on vm1, refer to the single-machine jdk installation .

[[email protected] ~]# scp -r /usr/local/myapp/jdk/jdk1.8.0_261/ [email protected]:/usr/local/myapp/jdk/jdk1.8.0_261/

[[email protected] ~]# scp -r /usr/local/myapp/jdk/jdk1.8.0_261/ [email protected]:/usr/local/myapp/jdk/jdk1.8.0_261/

[[email protected] ~]# scp /etc/profile [email protected]:/etc/profile

[[email protected] ~]# scp /etc/profile [email protected]:/etc/profile

[[email protected] ~]# source /etc/profile

[[email protected] ~]# source /etc/profile

Execute java -version on each machine to check the jdk version and make sure the jdk is installed correctly

zookeeper cluster installation

In principle, a Zookeeper cluster requires 2n+1 instances to ensure cluster validity, so the cluster size is at least 3. For the installation on vm1, refer to the single-machine zookeeper installation .

Add the following configurations to the stand-alone installation:

  • per machine/conf/zoo.cfgAdd node information to the file

    #server.A=B:C:D, A represents the server number; B is the IP; C is the communication port between the server and the leader; D is the communication port used for re-election after the leader hangs up
    server.1=192.168.174.129:2888: 3888
    server.2=192.168.174.131:2888:3888
    server.3=192.168.174.130:2888:3888

  • Create a new myid file in the dataDir directory of each ZK node

    The content of the myid configuration file is the number of the current server, that is, the number after server. Execute an echo assignment on each machine respectively

    echo 1 > /usr/local/myapp/zookeeper/zookeeper-3.6.1/dataDir/myid #Execute on the first station

    echo 2 > /usr/local/myapp/zookeeper/zookeeper-3.6.1/dataDir/myid #Execute on the second station

    echo 3 > /usr/local/myapp/zookeeper/zookeeper-3.6.1/dataDir/myid #Execute on the third station

  • Start the zk service on each server separately, pay attention to turn off the firewall of each machine first

    systemctl stop firewalld

    zkServer.sh start

  • View the status of each node separately

    [[email protected] ~]# zkServer.sh status
    ZooKeeper JMX enabled by default
    Using config: /usr/local/myapp/zookeeper/zookeeper-3.6.1/bin/…/conf/zoo.cfg
    Client port found: 2181. Client address: localhost.
    Mode: leader

    [[email protected] ~]# zkServer.sh status
    ZooKeeper JMX enabled by default
    Using config: /usr/local/myapp/zookeeper/zookeeper-3.6.1/bin/…/conf/zoo.cfg
    Client port found: 2181. Client address: localhost.
    Mode: follower

So far, the zookeeper cluster mode has been successfully built. Execute the zkServer.sh stop command to stop the leader and find that the leader will be re-elected.

kafka installation

According to the steps of single-machine kafka installation and configuration , first decompress and configure kafka on the first machine vm1.

  • configure config/server.properties

    #The id of each broker in the cluster is unique, generally starting from 0

    broker.id=0

    #kafka provides a service listening address to the outside world, set the IP address of the machine running kafka, and the client uses this address to connect to kafka

    listeners=PLAINTEXT://192.168.174.129:9092

    #Log directory, multiple directories can be separated by commas, create a directory first

    log.dirs=/usr/local/myapp/kafka/kafka_2.12-2.6.0/log/kafka

    #Configure the number of partitions, generally set up multiple partitions in cluster mode to improve performance

    num.partitions=3

    #The default number of replicas when creating a topic. In cluster mode, multiple replicas are generally configured to improve availability

    default.replication.factor=3

    #zk server address configuration, a list of addresses to configure zk in cluster mode, separated by commas

    zookeeper.connect=192.168.174.129:2181,192.168.174.130:2181,192.168.174.131:2181

  • Copy to other machine

    Clear the log.dirs of the first kafka before copying to prevent dirty data from affecting other machines

    [[email protected] ~]# rm -rf /usr/local/myapp/kafka/kafka_2.12-2.6.0/log/kafka/*

    [[email protected] ~]# scp -r /usr/local/myapp/kafka/ [email protected]:/usr/local/myapp/

    [[email protected] ~]# scp -r /usr/local/myapp/kafka/ [email protected]:/usr/local/myapp/

    Modify the config/server.properties configuration of other machines respectively

    #The id of each broker in the cluster is unique, generally starting from 0

    broker.id=1

    #kafka The listening address for external services is set to the local ip

    listeners=PLAINTEXT://192.168.174.131:9092

    #The id of each broker in the cluster is unique, generally starting from 0

    broker.id=2

    #kafka The listening address for external services is set to the local ip

    listeners=PLAINTEXT://192.168.174.130:9092

  • Start the kafka cluster

    Start zk first, then start kafka

    Check whether zk is started: zkServer.sh status

    [[email protected] ~]# zkServer.sh start

    [[email protected] ~]# zkServer.sh start

    [[email protected] ~]# zkServer.sh start

    [[email protected] ~]# kafka-server-start.sh /usr/local/myapp/kafka/kafka_2.12-2.6.0/config/server.properties &

    [[email protected] ~]# kafka-server-start.sh /usr/local/myapp/kafka/kafka_2.12-2.6.0/config/server.properties &

    [[email protected] ~]# kafka-server-start.sh /usr/local/myapp/kafka/kafka_2.12-2.6.0/config/server.properties &

    Check whether the zk and Kafka processes start normally:

    [[email protected] ~]# jps
    5524 QuorumPeerMain
    6100 Kafka
    6741 Jps

  • Verify the kafka cluster

    Randomly find a kafka machine to create a topic, and view the cluster topic on another kafka server. If there is, the cluster configuration is normal

    [[email protected] ~]# kafka-topics.sh --create --zookeeper 192.168.174.129:2181 --replication-factor 2 -partitions 2 --topic kafkatest

    [[email protected] ~]# kafka-topics.sh --describe --zookeeper 192.168.174.129:2181
    Topic: kafkatest PartitionCount: 2 ReplicationFactor: 2 Configs:
    Topic: kafkatest Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
    Topic: kafkatest Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2

    [[email protected] ~]# kafka-topics.sh --describe --zookeeper 192.168.174.131:2181
    Topic: kafkatest PartitionCount: 2 ReplicationFactor: 2 Configs:
    Topic: kafkatest Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
    Topic: kafkatest Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2

    [[email protected] ~]# kafka-topics.sh --describe --zookeeper 192.168.174.130:2181
    Topic: kafkatest PartitionCount: 2 ReplicationFactor: 2 Configs:
    Topic: kafkatest Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
    Topic: kafkatest Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2

    You can also use the new version of the command:

    [[email protected] ~]# kafka-topics.sh --create --topic kafkatest2 --bootstrap-server 192.168.174.131:9092

    When creating a topic, if the number of partitions and replicas is not explicitly specified, the default values ​​are used, that is, the configuration in the configuration file server.properties.

    [[email protected] ~]# kafka-topics.sh --describe --bootstrap-server 192.168.174.131:9092
    Topic: kafkatest2 PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824
    Topic: kafkatest2 Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
    Topic: kafkatest2 Partition: 1 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
    Topic: kafkatest2 Partition: 2 Leader: 0 Replicas: 0, 2,1 Isr:
    0,2,1 Topic: kafkatest PartitionCount: 2 ReplicationFactor: 2 Configs: segment.bytes=1073741824
    Topic: kafkatest Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
    Topic: kafkatest Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2

    You can also use the list command on each machine to view the kafka cluster topic

    [[email protected] ~]# kafka-topics.sh --list --bootstrap-server 192.168.174.129:9092
    kafkatest
    kafkatest2
    kafkatest3

    [[email protected] ~]# kafka-topics.sh --list --bootstrap-server 192.168.174.130:9092
    kafkatest
    kafkatest2
    kafkatest3

    [[email protected] ~]# kafka-topics.sh --list --bootstrap-server 192.168.174.131:9092
    kafkatest
    kafkatest2
    kafkatest3

    So far, the kafka cluster has been successfully installed.

    Note: When closing, first close kafka and then close zookeeper!

    kafka-server-stop.sh /usr/local/myapp/kafka/kafka_2.12-2.6.0/config/server.properties &

    zkServer.sh stop

kafka monitoring management interface

Using the web management page or dashboard to manage kafka is more convenient for daily maintenance.

  • Download Kafka Eagle

    Downloads can be slow, be patient.

  • Unzip to local specified directory

    [[email protected] ~]# tar -zxvf kafka-eagle-bin-2.0.1.tar.gz

    [[email protected] ~]# cd kafka-eagle-bin-2.0.1/
    [[email protected] kafka-eagle-bin-2.0.1]# tar -zxvf kafka-eagle-web-2.0.1-bin.tar. gz -C /usr/local/myapp/

    [[email protected] ~]# mv /usr/local/myapp/kafka-eagle-web-2.0.1/ /usr/local/myapp/kafka-eagle

  • Environment variable configuration

    export KE_HOME=/usr/local/myapp/kafka-eagle
    export PATH= P A T H : PATH: P A T H:KE_HOME/bin

    source /etc/profile

  • Modify the Kafka-Eagle configuration file system-config.properties

    # zookeeper and kafka cluster configuration
    ###############################################################################################################################################################################################################################################################################################################################################################################################################################################################################################################################################################################################################
    kafka.eagle.zk.cluster.alias=cluster1
    cluster1.zk.list=192.168.174.129:2181,192.168.174.130:2181,192.168.174.131:2181
    ###############################################################################################################################################################################################################################################################################################################################################################################################################################################################################################################################################################################################################
    # kafka eagle webui port
    # web page access port number
    ###############################################################################################################################################################################################################################################################################################################################################################################################################################################################################################################################################################################################################
    kafka.eagle.webui.port=8048
    ###############################################################################################################################################################################################################################################################################################################################################################################################################################################################################################################################################################################################################
    # kafka jdbc driver address
    # kafka uses the sqlite database that comes with Centos by default, just configure the database file storage path
    ###############################################################################################################################################################################################################################################################################################################################################################################################################################################################################################################################################################################################################
    kafka.eagle.driver=org.sqlite.JDBC
    kafka.eagle.url=jdbc:sqlite:/usr/local/myapp/kafka-eagle/db/ke.db
    kafka.eagle.username=root
    kafka.eagle.password=www.kafka-eagle.org
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
  • start kafka-eagle

    [[email protected] bin]# cd /usr/local/myapp/kafka-eagle/bin

    [[email protected] bin]# ./ke.sh start

    [[email protected] bin]# ./ke.sh status

    If there is an error, check the log log for a problem

    /usr/local/myapp/kafka-eagle/logs

    If there is an error caused by insufficient memory, you can reduce the JVM memory usage set in ke.sh and start it again

    vim kafka-eagle/bin/ke.sh

    export KE_JAVA_OPTS="-server -Xmx256M -Xms256M -XX:MaxGCPauseMillis=20 -XX:+UseG1GC -XX:MetaspaceSize=128m -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio= 80"

    If there is no problem, then log in directly

    http://192.168.174.129:8048

    Default username: admin

    Default password: 123456

insert image description here

  • Open monitoring trend graph

    The monitoring trend graph of Kafka Eagle needs to be connected to the JMX port of kafka, and kafka does not enable JMX by default. If you need to view the monitoring trend graph, you need to open the kafka jmx port. At the same time, enable Kafka Eagle monitoring trend graph, which is configured in the conf/system-config.properties file of Kafka Eagle:

    kafka.eagle.metrics.charts=true

    There are two ways to open the kafka jmx port:

    1. Execute the export JMX_PORT=xxxx command to set temporary environment variables before starting Kafka

    [[email protected] ~]# export JMX_PORT=9966

    [[email protected] ~]# kafka-server-start.sh /usr/local/myapp/kafka/kafka_2.12-2.6.0/config/server.properties &

    Note that the temporary environment variables are deleted after kafka is started, because if the kafka producer script is run on the same machine later, jmx will be opened repeatedly, and the port occupancy failure will be reported.

    [[email protected] ~]# echo $JMX_PORT

    [[email protected] ~]# unset JMX_PORT

    [[email protected] ~]# echo $JMX_PORT

    2. Configure in kafka-run-class.sh

​ The second way is to add: JMX_PORT=9966 to kafka's bin/kafka-run-class.sh script, and then start kafka after adding the configuration to each node in turn.

Kafka production and consumption script demonstration

  • production and consumption

    Send messages to broker0 on the first server and consume kafka messages on another server

    Option description:
    –topic defines the topic name
    –replication-factor defines the number of replicas
    –partitions defines the number of partitions

    --from-beginning: read all the historical data in the topic

    create topic

    [[email protected] ~]# kafka-topics.sh --create --topic testtopic01 --bootstrap-server 192.168.174.129:9092

    View topic

    [[email protected] ~]# kafka-topics.sh --describe --topic testtopic01 --bootstrap-server 192.168.174.129:9092
    Topic: testtopic01 PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824
    Topic: testtopic01 Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
    Topic: testtopic01 Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
    Topic: testtopic01 Partition: 2 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1

    Run the producer, and enter the message

    [[email protected] ~]# kafka-console-producer.sh --topic testtopic01 --bootstrap-server 192.168.174.129:9092

    This is my first message

    Run the consumer on 2 other machines and see the messages received

    [[email protected] ~]# kafka-console-consumer.sh --topic testtopic01 --from-beginning --bootstrap-server 192.168.174.131:9092

    [[email protected] ~]# kafka-console-consumer.sh --topic testtopic01 --from-beginning --bootstrap-server 192.168.174.130:9092

    ctrl+c exit the consumer, producer console

    delete topic

    [[email protected] ~]# kafka-topics.sh --list --bootstrap-server 192.168.174.129:9092

    [[email protected] ~]# kafka-topics.sh --delete --topic testtopic01 --bootstrap-server 192.168.174.129:9092

    [[email protected] ~]# kafka-topics.sh --list --bootstrap-server 192.168.174.129:9092

    Modify the number of partitions for a topic

    [[email protected] ~]# kafka-topics.sh --bootstrap-server 192.168.174.129:9092 --alter --topic testtopic01 --partitions 6

  • Verify cluster fault tolerance

    Create a topic, the number of partitions is 1, and the number of replicas is set to 2. Kill the kafka process on the server where replica 1 is located to see if the kafka producer and kafka consumer are consuming normally; then kill the kafka service where replica 2 is located and try again.

    [[email protected] ~]# kafka-topics.sh --create --topic testtopic02 --bootstrap-server 192.168.174.130:9092 --replication-factor 2 -partitions 1

    [[email protected] ~]# kafka-topics.sh --describe --topic testtopic02 --bootstrap-server 192.168.174.129:9092
    Topic: testtopic02 PartitionCount: 1 ReplicationFactor: 2 Configs: segment.bytes=1073741824
    Topic: testtopic02 Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1,0

    Replicas: 1,0 indicates that the replica is on machine No. 0 and No. 1, namely vm1 and vm2 in this article, so we open two vm3 windows to run the producer and the consumer respectively.

    Producer sends:

    [[email protected] ~]# kafka-console-producer.sh --topic testtopic02 --bootstrap-server 192.168.174.130:9092

    hello msg1
    hello msg2

    Consumers accept:

    [[email protected] ~]# kafka-console-consumer.sh --topic testtopic02 --from-beginning --bootstrap-server 192.168.174.130:9092

    hello msg1
    hello msg2

    Kill broker0 and perform production and consumption demonstration again:

    [[email protected] ~]# kafka-console-producer.sh --topic testtopic02 --bootstrap-server 192.168.174.130:9092

    hello after kill

    [[email protected] ~]# kafka-console-consumer.sh --topic testtopic02 --from-beginning --bootstrap-server 192.168.174.130:9092

    hello after kill

    It is found that production and consumption are normal. At this point, kill broker1, perform production and consumption demonstration again, and find an error:

    WARN [Producer clientId=console-producer] 1 partitions have leader brokers without a matching listener, including [testtopic02-0] (org.apache.kafka.clients.NetworkClient)

​ WARN [Consumer clientId=consumer-console-consumer-32492-1, groupId=console-consumer-32492] 1 partitions have leader brokers without a matching listener, including [testtopic02-0] (org.apache.kafka.clients.NetworkClient )

​ Restarting the kafka service will restore it!

Kafka server configuration summary

1.brokerId

The unique identifier of the broker in the kafka cluster. Generally, it can be incremented from 1. When the broker is started, a virtual node with the current brokerId as the name will be created under the /brokers/ids path in ZooKeeper. The health status check of the broker depends on this virtual node. When the broker goes offline, the virtual node will be automatically deleted. Other broker nodes or clients determine the health status of the broker by judging whether there is a brokerld node of the broker in the /brokers/ids path. After the broker is started, the meta.properties file will be generated in the log directory log.dirs. If the broker.id configured in server.properties and meta.properties are inconsistent, the startup will fail and an exception will be thrown: InconsistentBrokerldException

2.log.dirs

Set the root directory where Kafka log files are stored. Multiple directories can be configured in a comma-separated manner. If not configured, the default value is: /tmp/kafka-logs

3.listeners

The address where the broker provides services to the outside world, that is, the address where the client connects to the kafka broker. For example, listeners=PLAINTEXT://192.168.174.129:9092, the client can use the address 192.168.174.129:9092 to connect to kafka and send or receive messages. There is also an advertised.listeners configuration item, which is used in an IaaS environment. For example, cloud servers are usually equipped with multiple network cards, including private network cards and public network cards. At this time, you can set the advertised.listeners configuration item to bind the public network IP for external use. It is used by the client, and the listeners parameter is configured to bind the intranet IP address for communication between the brokers.

4. zookeeper.connect

Set the service address of the ZooKeeper cluster to which the kafka broker is connected. When there are multiple nodes in the ZooKeeper cluster, commas can be used to separate them.

5.message.max.bytes

Specify the maximum size of the message body that the broker can receive (the maximum size after compression if compression is enabled), in bytes.

6.log.retention.hours

The log file storage time, the default is 7 days.

For more parameter meanings, see the official documentation : http://kafka.apache.org/documentation/

kafka producer client API

Kafka Producer can send messages synchronously or asynchronously.

Send using asynchronous

  • Introduce Java client dependencies

    < dependency > 
        < groupId > org.apache.kafka < / groupId > < artifactId > kafka - clients < / artifactId > < version > 2.6.0 < / version > < / dependency > _ _ _ _ _
        
        
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
  • core class

    KafkaProducer: Producer object, used to send data

    ProducerConfig: Set a series of configuration parameters for the producer

    ProducerRecord: Each piece of data must be encapsulated into a ProducerRecord object

  • Producer client code

    Without callback function: KafkaProducer#send(ProducerRecord<K,V>)

    public  class  MyProducer1  { 
        public  static  void  main ( String [ ] args )  throws  Exception { 
          Properties props =  new  Properties ( ) ; 
            props . put ( ProducerConfig . BOOTSTRAP_SERVERS_CONFIG ,  "vm1:9092,vm2:9092,vm3:9092" ) ; 
          props . put ( ProducerConfig . ACKS_CONFIG ,  "all" ) ;
            props . put ( ProducerConfig . RETRIES_CONFIG ,  1 ) ; 
          props . put ( ProducerConfig . BATCH_SIZE_CONFIG ,  1024  *  32 ) ; 
            props . put ( ProducerConfig . LINGER_MS_CONFIG ,  1 ) ; 
          props . put ( ProducerConfig . BUFFER_MEMORY_CONFIG ,  24  *  1024  *  102) ; 
            props . put ( ProducerConfig . KEY_SERIALIZER_CLASS_CONFIG ,  StringSerializer . class . getName ( ) ) ; 
            props . put ( ProducerConfig . VALUE_SERIALIZER_CLASS_CONFIG ,  StringSerializer . class . getName ( ) ) ; 
            KafkaProducer < String ,  String > producer =  new  KafkaProducer< > ( props ) ; 
            for  ( int i =  0 ; i <  20 ; i ++ )  { 
                producer . send ( new  ProducerRecord < String ,  String > ( "topic_test" ,  Integer . toString ( i ) ,  "value:"  + i ) ) ; 
            }
    
            producer.close ( ) ; } } _ _
        
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    With callback function: KafkaProducer#send(ProducerRecord<K,V>, Callback)

    public  class  MyProducer2  { 
        public  static  void  main ( String [ ] args )  { 
            Properties props =  new  Properties ( ) ; 
            props . put ( ProducerConfig . BOOTSTRAP_SERVERS_CONFIG ,  "vm1:9092,vm2:9092,vm3:9092" ) ; 
            props . put ( ProducerConfig . ACKS_CONFIG ,  "all" ) ; 
            props.put ( ProducerConfig . RETRIES_CONFIG , 1 ) ; 
            props .put ( ProducerConfig . BATCH_SIZE_CONFIG , 1024 * 32 ) ; 
            props .put ( ProducerConfig . LINGER_MS_CONFIG , 1 ) ; 
            props .put ( ProducerConfig . BUFFER_MEMORY_CONFIG , 32 * ; 1024 * 10 _ _ _ _ _          
            props.put ( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG , StringSerializer.class.getName ( ) ) ; props.put ( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG , StringSerializer.class.getName ( ) ) ; KafkaProducer < String , String > producer = new KafkaProducer < > _ _ _ _ _ 
            _ _ _ _ _ _ _ _ _ _ _  
               ( props ) ; 
            for  ( int i =  0 ; i <  100 ; i ++ )  { 
                producer . send ( new  ProducerRecord < String ,  String > ( "topic_test" ,  Integer . toString ( i ) ,  "value:"  + i ) ,  new  Callback ( )  { 
                    @Override 
                    public void  onCompletion ( RecordMetadata metadata ,  Exception exception )  { 
                        if  ( null  == exception )  { 
                            System . out . println ( "send success->"  + metadata . offset ( ) ) ; 
                        }  else  { 
                            exception . printStackTrace ( ) ; 
                        } 
                    } 
                } ) ; 
            } 
            producer .close ( ) ; 
        } 
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    The callback function will be called asynchronously when the producer receives the ack. This method has two parameters: RecordMetadata and Exception. These two parameters are mutually exclusive, that is, if Exception is null, the message is sent successfully, and RecordMetadata must not be null at this time. When the message is sent exception, RecordMetadata is null, but exception is not null. If the message fails to be sent, it will be automatically retried, and there is no need to manually retry in the callback function. The number of retries is set by the parameter retries.

    KafkaProducer sets the parameter retries. If an exception is thrown when sending a message to the broker, and it is an exception that allows retry, then the maximum number of times specified by the retries parameter will be retried.

Send using synchronous

Synchronization means that after a message is sent, the current thread will be blocked until the ack message is returned.

As can be seen from the above asynchronous example, the object returned by the producer's send method is of type Future, so synchronous waiting can be triggered by calling the get() method of the Future object.

So there is only one difference from asynchronous code:

producer . send ( new  ProducerRecord < String ,  String > ( "topic_test" ,  Integer . toString ( i ) ,  "value:"  + i ) ) . get ( ) ;
  • 1

Producer Partitioning Strategy

Using partitions improves the processing performance of topics and improves the concurrency of topics. We can encapsulate the data sent by the kafka producer asProducerRecordobject.ProducerRecordobject haspartitionAttributes. eachProducerRecordwill be sent to a specificpartitionmiddle.
insert image description here

The partition principle when the producer sends a message is as follows:

(1) When the partition is specified in the constructor, the specified value is directly used as the partition value;
(2) If the partition value is not specified but there is a key, the partition value is obtained by taking the remainder of the hash value of the key and the number of partitions of the topic;
( 3) When neither the partition value is specified nor the key value is passed in, an integer is randomly generated at the first call (the integer is incremented for each subsequent call), and the partition value is obtained by modulo the number of partitions of this topic , that is, the round-robin algorithm is used.

data reliability

Kafka uses the ack mechanism to ensure data reliability. After receiving the data sent by the producer, the broker where each partition of the topic is located will send an ack message to the producer. If the producer does not receive an ack, a retry mechanism will be triggered. So when does kafka return an ack response to the producer? In the copy of the partition, there are leaders and followers. If all followers are required to complete synchronization before sending ack, then due to some kind of failure, a follower cannot synchronize with the leader for a long time, so it will wait forever. The ack will not be sent until its synchronization is completed, which will inevitably affect kafka performance. How does kafka optimize this problem?

In kafka, the set of all replicas in a topic's partition is called AR (Assigned Replicas). The leader node maintains a dynamic set, the in-sync replica set (ISR), which refers to the set of followers (including the leader node) that maintain a certain degree of synchronization with the leader. The replicas with too much lag in synchronization with the leader replica form an OSR (Out-of-Sync Replicas) set, so AR=ISR+OSR. Under normal circumstances, all followers should maintain a certain degree of synchronization with the leader, that is, AR=ISR.

Kafka's approach is to confirm when the client ack message is returned based on the user-configured ack level.

The Acks parameter is used to specify how many replicas in the partition receive the message before the producer can confirm that the message was written successfully. There are 3 levels of configuration.

acks=1, the default configuration. After the producer sends the message, as long as the leader copy of the partition successfully writes the message, it will receive a successful ack response from the server. If the message writing to the leader fails, for example, the leader has hung up and is being re-elected, the producer client will receive an error response and can re-send. If the write to the leader is successful and the leader hangs before the follower synchronization is complete, message loss will occur. So acks=1 is a compromise between Kafka message reliability and throughput, and is generally the default configuration.

acks=0, the producer does not need to wait for any response from the server after sending the message. If some exception occurs in the process of message sending to writing to Kafka, so that Kafka does not receive the message, then the producer has no way of knowing, and the message is lost. Under the same other configuration environment, acks=0 can make Kafka achieve the maximum throughput.

acks=-1, the same effect as acks=all. After the message is sent, the producer needs to wait for all replicas in the ISR to successfully write the message before receiving the ack response from the server. Under the same other configuration environment, setting acks to -1 can achieve the highest data reliability. However, if the leader fails after the follower synchronization is completed and before the broker sends the ack to the producer client, it will cause data duplication.

Exactly Once Semantics

Setting the acks level to -1 ensures that no data is lost between the producer and the kafka server, that is, at least once semantics.

Correspondingly, setting the acks level to 0 ensures that each message will only be sent once and will not be repeated, that is, At most once semantics.

In some scenarios, data consumers require that data is neither lost nor duplicated, that is, Exactly Once semantics. The 0.11 version of Kafka introduced a major feature: idempotency. That is, no matter how many times the producer sends repeated data to the server, the server will only persist one piece of data. Set the enable.idompotence parameter in the producer to true to enable idempotency. A Producer with idempotency enabled will allocate a pid when initialized, and messages sent to the same Partition will be accompanied by a sequence number. The Kafka server, that is, the broker, will cache <pid, partition, seqnumber>. When a message with the same key is submitted, the broker will only persist one. However, the PID restarts will change, and different Partitions have different keys, so the idempotency of kafka cannot guarantee Exactly Once across partitions and sessions.

Note that the idempotency of Kafka is only a mechanism of Kafka itself, and the idempotency of the business layer cannot be guaranteed. Usually we need to implement idempotent control on the business side by ourselves.

Producer Interceptor

The producer client generates a producer interceptor by implementing the interface org.apache.kafka.clients.producer.ProducerInterceptor.

Kafka Producer will call the interceptor's onSend() method before message serialization and partition calculation. Users can customize the business before message sending in this method. Generally, the topic, key, partition and other information of the ProducerRecord are not modified. Kafka Producer will call the interceptor's onAcknowledgement() method before the message is acknowledged (ack) or when the message fails to be sent. This method is executed before the asynchronous CallBack() method set by the user. The simpler the business logic of the onAcknowledgement method, the better, otherwise it will affect the sending performance because this method runs in the I/O thread of the Producer.

Interceptor case: define 2 interceptors, and use them

/**
 * Interceptor case - the first interceptor adds timestamp information before the message body
 */ 
public  class  ProducerInterceptor1  implements  ProducerInterceptor < String ,  String >  {

    @Override 
    public  ProducerRecord < String ,  String >  onSend ( ProducerRecord < String ,  String >  record )  { 
        // Add a timestamp to the message body 
        return  new  ProducerRecord < > ( record . topic ( ) ,  record . partition ( ) ,  record . timestamp ( ) ,  record . key ( ) ,
                System . currentTimeMillis ( )  +  "_"  +  record . value ( ) ,  record . headers ( ) ) ; 
    }

    @Override 
    public  void  onAcknowledgement ( RecordMetadata metadata ,  Exception exception )  {

    }

    @Override 
    public  void  close ( )  {

    }

    @Override 
    public  void  configure ( Map < String ,  ? > configs )  {

    } 
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
/**
 * Interceptor case - the 2nd interceptor counts successful and failed messages sent
 */ 
public  class  ProducerInterceptor2  implements  ProducerInterceptor < String ,  String >  {

    private  AtomicInteger successCounter =  new  AtomicInteger ( ) ; 
    private  AtomicInteger failCounter =  new  AtomicInteger ( ) ;

    @Override 
    public  ProducerRecord < String ,  String >  onSend ( ProducerRecord < String ,  String >  record )  { 
        return  record ; 
    }

    @Override 
    public  void  onAcknowledgement ( RecordMetadata metadata ,  Exception exception )  { 
        if  ( exception ==  null )  { 
            successCounter . getAndIncrement ( ) ; 
        }  else  { 
            failCounter . getAndIncrement ( ) ; 
        } 
    }

    @Override 
    public  void  close ( )  { 
        System . out . println ( "successCounter:"  + successCounter . get ( ) ) ; 
        System . out . println ( "failCounter:"  + failCounter . get ( ) ) ; 
        System . out . println ( "Send success rate:"  + successCounter . get ( )  / ( failCounter . get ( )  + successCounter . get ( ) ) ) ; 
    }

    @Override 
    public  void  configure ( Map < String ,  ? > configs )  { 
    } 
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
/**
 * Use a custom producer interceptor
 * Kafka Producer will call the interceptor's onSend method before message serialization and partition calculation, and business customization can be done in this method. Generally do not modify the topic, key, partition and other information of ProducerRecord
 * Kafka Producer will call the interceptor's onAcknowledgement method before the message is acknowledged (ack) or when the message fails to be sent. This method is executed before the asynchronous CallBack method set by the user.
 * The simpler the business logic of the onAcknowledgement method, the better, otherwise it will affect the sending performance, because this method runs in the I/O thread of the Producer
 */ 
public  class  ProducerWithInterceptor  { 
    public  static  void  main ( String [ ] args )  { 
        Properties props =  new  Properties ( ) ; 
        props . put ( ProducerConfig . BOOTSTRAP_SERVERS_CONFIG ,  "vm1:9092,vm2:9092,vm3:9092" ) ; 
        props . put ( ProducerConfig . ACKS_CONFIG ,  "all" ) ;
        props . put ( ProducerConfig . RETRIES_CONFIG ,  3 ) ; 
        props . put ( ProducerConfig . BATCH_SIZE_CONFIG ,  16  *  1024 ) ; 
        props . put ( ProducerConfig . LINGER_MS_CONFIG ,  1 ) ; 
        props . put ( ProducerConfig . BUFFER_MEMORY_CONFIG ,  24  *  1024  *  1024 )) ; 
        props . put ( ProducerConfig . KEY_SERIALIZER_CLASS_CONFIG ,  StringSerializer . class . getName ( ) ) ; 
        props . put ( ProducerConfig . VALUE_SERIALIZER_CLASS_CONFIG ,  StringSerializer . class . getName ( ) ) ;

        //Build interceptor chain 
        List < String > interceptors =  new  ArrayList < > ( ) ; 
        interceptors . add ( ProducerInterceptor1 . class . getName ( ) ) ; 
        interceptors . add ( ProducerInterceptor2 . class . getName ( ) ) ; 
        props . put ( ProducerConfig .INTERCEPTOR_CLASSES_CONFIG _, interceptors ) ;

        KafkaProducer < String ,  String > kafkaProducer =  new  KafkaProducer < > ( props ) ; 
        for  ( int i =  0 ; i <  10 ; i ++ )  { 
            ProducerRecord < String ,  String >  record  =  new  ProducerRecord < > ( "topic_test" ,  " hello "  + i ) ;
            kafkaProducer.send ( record ) ; } _ _
        

        //Close the producer to trigger the close method of the Interceptor 
        kafkaProducer . close ( ) ; 
    } 
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

kafka consumer client API

import dependencies

< dependency > 
    < groupId > org.apache.kafka < / groupId > < artifactId > kafka - clients < / artifactId > < version > 2.6.0 < / version > < / dependency > _ _ _ _ _
    
    

  • 1
  • 2
  • 3
  • 4
  • 5

Core class:

KafkaConsumer: consumer object, used to consume data

ConsumerConfig: Set a series of configuration parameters for the consumer

KafkaConsumer#poll(Duration): The core method of the consumer client, that is, pulling (poll) messages from the server. Subsequent chapters will show how to write consumer client API code under different offset submission methods.

Consumer Partitioning Strategy

Consumer clients can specify a specific partition to consume a topic,KafkaConsumermiddleassign(Collection<TopicPartition> partitions)method can specify the set of partitions to subscribe to. You can also configure the partition.assignment.strategy configuration item in the configuration file to specify a custom partition strategy.

If not specified by default, Kafka uses the built-in allocation strategy. Namely Range (default) and Robin. Of course there is a more complex StickyAssignor assignment strategy.

  • Range

    The Range partitioning strategy is per topic. First, the partitions in the same topic are sorted by serial number, and the consumers are sorted alphabetically. If there are now 10 partitions and 3 consumers, the sorted partitions will be 0, 1, 2, 3, 4, 5, 6, 7, 8, 9; after the consumers are sorted, it will be C1-0 , C2-0, C3-0. Determine how many partitions each consumer should consume by the number of partitions/consumer . If not, then the first few consumers will consume 1 more partition.

    For example, if 10/3 = 3 is more than 1, and the division is inexhaustible, then consumer C1-0 will consume 1 more partition, and the final partition allocation result is as follows:

    consumerconsumption partition
    C1-0Consume 0,1,2,3 partitions
    C2-0Consume 4,5,6 partitions
    C3-0Consume 7,8,9 partitions (if there are 11 partitions, C1-0 will consume 0,1,2,3 partitions, C2-0 will consume 4,5,6,7 partitions C3-0 will consume 8,9 , 10 divisions)

    Disadvantages of range partitioning: The above example is only for one topic, and the impact of C1-0 consumers consuming one more partition is not great. If there are N topics, then for each topic, consumer C1-0 will consume one more partition. The more topics, the more partitions C1-0 consumes will consume N more partitions than other consumers. Some consumers are obviously overloaded.

  • RoundRobin

    The RoundRobin polling partition strategy is to list all partitions and all consumers, then sort them according to hascode, and finally allocate partitions to each consumer through the polling algorithm.

    The polling partition is divided into the following two cases:

    1. The topics subscribed by all consumers in the same consumer group are the same

    2. Consumers in the same consumer group subscribe to different messages

    In the first case, the partition allocation of the RoundRobin strategy is uniform. For example: in the same consumer group, there are 3 consumers C0, C1 and C2, all subscribe to 2 topics t0 and t1, and each topic has 3 partitions (p0, p1, p2), then the subscribed So partitions can be identified as t0p0, t0p1, t0p2, t1p0, t1p1, t1p2. The final partition assignment results are as follows:

    consumerconsumption partition
    Consumer C0Consume t0p0 and t1p0 partitions
    Consumer C1Consume t0p1 and t1p1 partitions
    Consumer C2Consume t0p2 and t1p2 partitions

    In the second case, when RoundRobin performs partition allocation, it is not a complete round-robin allocation. It is possible to cause uneven distribution of partitions. For example: in the same consumer group, there are 3 consumers C0, C1 and C2, they subscribe to 3 topics: t0, t1 and t2, these 3 topics have 1, 2, 3 partitions respectively (ie: t0 has 1 partition (p0), t1 has 2 partitions (p0, p1), t2 has 3 partitions (p0, p1, p2)), all the partitions subscribed by all consumers can be identified as t0p0, t1p0, t1p1 , t2p0, t2p1, t2p2. Specifically, consumer C0 subscribes to topic t0, consumer C1 subscribes to topics t0 and t1, and consumer C2 subscribes to topics t0, t1 and t2, and the final partition assignment results are as follows:

    consumerconsumption partition
    Consumer C0Consumption t0p0 partition
    Consumer C1Consumption t1p0 partition
    Consumer C2Consume t1p1, t2p0, t2p1, t2p2 partitions

    If you prefer to use the RoundRobin polling partition, it is best to use the same topic subscribed by each consumer in the consumer group. Generally, the same is true for each consumer in the consumer group in actual development. Try not to consume one. Subscribers subscribe to multiple topics.

  • offset

    For a partition in Kafka, each message has a unique offset, which is used to represent the corresponding position of the message in the partition. For consumers, it also has a concept of offset, and consumers use offset to indicate the location of a message in the partition. The consumption displacement must be persisted, not just stored in memory, otherwise the consumer will not be able to know the previous consumption displacement after restarting. Before Kafka version 0.9, the consumer saved the offset in Zookeeper by default, and the new version of the consumer client saved the offset in Kafka's built-in topic __consumer_offsets by default. The action of storing (persisting) the consumption displacement is usually called "commit", and the consumer needs to perform the submission of the consumption displacement after consuming the message.

    How to get the offset? First modify the configuration file consumer.properties: exclude.internal.topics=false

    Read the offset using the kafka script:

    kafka-console-consumer.sh --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning

Consumer displacement submission method

  • Automatically commit offsets

    The action of automatic displacement submission (commit) is completed in the poll() method. Each time a pull request is initiated to the server, it will check whether the displacement can be submitted. If so, the displacement of the last poll will be submitted. The method of automatically submitting consumption displacement is very simple, eliminating the need for complex displacement submission logic, making the application layer code very concise. If the consumer is down before the next automatic submission of the consumption displacement, then the consumption must be restarted from the place where the last displacement was submitted, which will lead to repeated consumption. You can reduce the time interval for shift commits to reduce the time window for message repetition, but this makes shift commits more frequent.

    Code:

    /**
     * Automatically commit offset
     */ 
    public  class  MyConsumer1  { 
        public  static  void  main ( String [ ] args )  { 
            Properties props =  new  Properties ( ) ; 
            props . put ( ConsumerConfig . BOOTSTRAP_SERVERS_CONFIG ,  "vm1:9092,vm2:9092,vm3:9092" ) ; 
            // Those with the same group.id belong to the same consumer group 
            props . put ( ConsumerConfig . GROUP_ID_CONFIG ,  "group_test") ; 
            props . put ( ConsumerConfig . ENABLE_AUTO_COMMIT_CONFIG ,  "true" ) ;
             //Automatically submit offsets, and submit 
            props every 1s . put ( ConsumerConfig . AUTO_COMMIT_INTERVAL_MS_CONFIG ,  "1000" ) ; 
            props . put ( ConsumerConfig . VALUE_DESERIALIZER_CLASS_CONFIG ,  StringDeserializer . class . getName ( ) ) ; 
            props .put ( ConsumerConfig . KEY_DESERIALIZER_CLASS_CONFIG ,  StringDeserializer . class . getName ( ) ) ; 
            KafkaConsumer < String ,  String > kafkaConsumer =  new  KafkaConsumer < > ( props ) ; 
            kafkaConsumer . subscribe ( Arrays . asList ( "topic_test" ) ) ; 
            // Consumer start The infinite loop keeps consuming 
            while  ( true)  { 
                //Return once the data is pulled, otherwise wait up to the time set by duration 
                ConsumerRecords < String ,  String > records = kafkaConsumer . poll ( Duration . ofMillis ( 100 ) ) ; 
                records . forEach ( record  ->  { 
                    System . out . printf ( "topic = %s ,partition = %d,offset = %d, key = %s, value = %s%n" ,  record . topic ( ) ,  record .partition ( ) , 
                            record . offset ( ) ,  record . key ( ) ,  record . value ( ) ) ; 
                } ) ; 
            } 
        } 
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
  • Manually commit the offset

    • Synchronously
    /**
     * Manually sync commit offset
     */ 
    public  class  MyConsumer2  { 
        public  static  void  main ( String [ ] args )  { 
            Properties props =  new  Properties ( ) ; 
            props . put ( ConsumerConfig . BOOTSTRAP_SERVERS_CONFIG ,  "vm1:9092,vm2:9092,vm3:9092" ) ; 
            // Those with the same group.id belong to the same consumer group 
            props . put ( ConsumerConfig . GROUP_ID_CONFIG ,  "group_test") ; 
            //Turn off auto-commit offset, manually commit 
            props . put ( ConsumerConfig . ENABLE_AUTO_COMMIT_CONFIG ,  "false" ) ;
    
            props.put ( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG , StringDeserializer.class.getName ( ) ) ; props.put ( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG , StringDeserializer.class.getName ( ) ) ; KafkaConsumer < String , String > kafkaConsumer = new KafkaConsumer < _ _ _ 
            _ _ _ _ _ _ _ _ _ _ _ _ _  
               > ( props ) ; 
            kafkaConsumer . subscribe ( Arrays . asList ( "topic_test" ) ) ; 
            // The consumer starts an infinite loop and continues to consume 
            while  ( true )  { 
                // Once the data is pulled, it will return, otherwise it will wait at most for the duration set time 
                ConsumerRecords < String ,  String > records = kafkaConsumer . poll ( Duration . ofMillis ( 100 ) ) ; 
                records . forEach( record  ->  { 
                    System . out . printf ( "topic = %s ,partition = %d,offset = %d, key = %s, value = %s%n" ,  record . topic ( ) ,  record . partition ( ) , 
                            record . offset ( ) ,  record . key ( ) ,  record . value ( ) ) ; 
                } ) ;
    
                //Synchronous submission, the thread will block until the current batch offset is successfully submitted 
                kafkaConsumer . commitSync ( ) ; 
            } 
        } 
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • Asynchronous way
    /**
     * Manually submit offset asynchronously
     */ 
    public  class  MyConsumer3  { 
        public  static  void  main ( String [ ] args )  { 
            Properties props =  new  Properties ( ) ; 
            props . put ( ConsumerConfig . BOOTSTRAP_SERVERS_CONFIG ,  "vm1:9092,vm2:9092,vm3:9092" ) ; 
            // Those with the same group.id belong to the same consumer group 
            props . put ( ConsumerConfig . GROUP_ID_CONFIG ,  "group_test") ; 
            //Turn off auto-commit offset, manually commit 
            props . put ( ConsumerConfig . ENABLE_AUTO_COMMIT_CONFIG ,  "false" ) ;
    
            props.put ( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG , StringDeserializer.class.getName ( ) ) ; props.put ( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG , StringDeserializer.class.getName ( ) ) ; KafkaConsumer < String , String > kafkaConsumer = new KafkaConsumer < _ _ _ 
            _ _ _ _ _ _ _ _ _ _ _ _ _  
               > ( props ) ; 
            kafkaConsumer . subscribe ( Arrays . asList ( "topic_test" ) ) ; 
            // The consumer starts an infinite loop and continues to consume 
            while  ( true )  { 
                // Once the data is pulled, it will return, otherwise it will wait at most for the duration set time 
                ConsumerRecords < String ,  String > records = kafkaConsumer . poll ( Duration . ofMillis ( 100 ) ) ; 
                records . forEach( record  ->  { 
                    System . out . printf ( "topic = %s ,partition = %d,offset = %d, key = %s, value = %s%n" ,  record . topic ( ) ,  record . partition ( ) , 
                            record . offset ( ) ,  record . key ( ) ,  record . value ( ) ) ; 
                } ) ;
    
                //Asynchronous submission, with callback function, thread will not block 
                kafkaConsumer . commitAsync ( new  OffsetCommitCallback ( )  { 
                    @Override 
                    public  void  onComplete ( Map < TopicPartition ,  OffsetAndMetadata > offsets ,  Exception exception )  { 
                        if  ( exception !=  null )  { 
                            System .out .println ( " Commit failed: " + offsets ) ; } } 
                        
                    
                } ) ; 
            } 
        } 
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

  • custom offset

    ​ In Kafka, the offset is stored in the broker's built-in topic by default, and we can customize the storage location. For example, in order to ensure that the consumption and submission of offsets succeed or fail at the same time, we can use database transactions to achieve this, and store the offset in Mysql. The following example is just sample code, in which the getOffset and commitOffset methods can be implemented by themselves according to the selected offset storage system (such as mysql).

    /**
     * Custom offset submission
     * In Kafka, the offset is stored in the broker's built-in Topic by default, and we can customize the storage location
     * For example, in order to ensure that the consumption and submission of offsets succeed or fail at the same time, we can use database transactions to achieve this, and store the offset in Mysql.
     * The following example is just a sample code, in which the getOffset and commitOffset methods can be implemented by themselves according to the selected offset storage system (such as mysql)
     */ 
    public  class  MyConsumer4  { 
        public  static  Map < TopicPartition ,  Long > currentOffset =  new  HashMap < > ( ) ;
    
        public  static  void  main ( String [ ] args )  {
    
            Properties props =  new  Properties ( ) ; 
            props . put ( ConsumerConfig . BOOTSTRAP_SERVERS_CONFIG ,  "vm1:9092,vm2:9092,vm3:9092" ) ; 
            // Those with the same group.id belong to the same consumer group 
            props . put ( ConsumerConfig . GROUP_ID_CONFIG ,  "group_test" ) ; 
            //Turn off auto-commit offset, manually commit 
            props . put ( ConsumerConfig . ENABLE_AUTO_COMMIT_CONFIG ,  "false" ) ;
    
            props.put ( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG , StringDeserializer.class.getName ( ) ) ; props.put ( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG , StringDeserializer.class.getName ( ) ) ; KafkaConsumer < String , String > kafkaConsumer = new KafkaConsumer < _ _ _ 
            _ _ _ _ _ _ _ _ _ _ _ _ _  
               > ( props ) ; 
            kafkaConsumer . subscribe ( Arrays . asList ( "topic_test" ) ,  new  ConsumerRebalanceListener ( )  { //This method will call 
                @Override before Rebalanced 
                public void onPartitionsRevoked ( Collection < TopicPartition > partitions ) { commitOffset ( currentOffset ) ; }
                   
                    
                
    
                
                //This method will call @Override 
                after Rebalanced public  void  onPartitionsAssigned ( Collection < TopicPartition > partitions )  { 
                    currentOffset . clear ( ) ; 
                    for  ( TopicPartition partition : partitions )  { 
                        // Locate the most recently submitted offset position of each partition and continue to consume 
                        kafkaConsumer.seek ( partition , getOffset ( partition ) ) ; } } } ) _ _ 
                    
                
            ;
    
            //Consumer starts an infinite loop and continues to consume 
            while  ( true )  { 
                //Return once the data is pulled, otherwise it will wait up to the time set by duration 
                ConsumerRecords < String ,  String > records = kafkaConsumer . poll ( Duration . ofMillis ( 100 ) ) ; 
                records . forEach ( record  ->  { 
                    System . out . printf ( "topic = %s ,partition = %d,offset = %d, key = %s, value = %s%n" ,  record . topic( ) ,  record . partition ( ) , 
                            record . offset ( ) ,  record . key ( ) ,  record . value ( ) ) ; 
                    currentOffset . put ( new  TopicPartition ( record . topic ( ) ,  record . partition ( ) ) ,  record . offset ( )) ; 
                } ) ;
    
                //Commit offset 
                commitOffset ( currentOffset ) ; 
            } 
        }
    
        /**
         * Get the latest offset of a partition
         *
         * @param partition
         * @return
         */ 
        private  static  long  getOffset ( TopicPartition partition )  { 
            return  0 ; 
        }
    
        /**
         * Commit the offsets of all partitions of the consumer
         *
         * @param currentOffset
         */ 
        private  static  void  commitOffset ( Map < TopicPartition ,  Long > currentOffset )  { 
        } 
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72

consumer interceptor

The Kafka consumer will call the interceptor's onConsume() method before the poll() method returns, in which the message can be customized in advance. The Kafka consumer will call the interceptor's onCommit() method after submitting the consumption displacement.

Interceptor case: define 1 consumer interceptor and use

/**
 * Custom consumer interceptor
 * Kafka consumer will call the interceptor's onConsume method before the poll() method returns, in which the message can be customized in advance
 * Kafka consumer will call the interceptor's onCommit method after submitting the consumption displacement,
 * This interceptor function:
 * Judge the timestamp of the message and filter out the messages that do not meet the time limit (expiration)
 * Print displacement information after consumption is complete
 */ 
public  class  ConsumerInterceptor1  implements  ConsumerInterceptor < String ,  String >  {

    private  static  final  long EXPIRE_INTERVAL =  10000 ;

    @Override 
    public  ConsumerRecords < String ,  String >  onConsume ( ConsumerRecords < String ,  String > records )  { 
        HashMap < TopicPartition ,  List < ConsumerRecord < String ,  String >> > newRecords =  new  HashMap < > ( 64 ) ; 
        for  ( TopicPartition partition : records _.partitions ( ) ) { List < ConsumerRecord < String , String > > recordsInPartition = records . records ( partition ) ; List < ConsumerRecord < String , String > > filteredRecords = new ArrayList < > ( ) ; for ( ConsumerRecord < String , String > _ 
             
               
               record  : recordsInPartition )  { 
                if  ( System . currentTimeMillis ( )  -  record . timestamp ( )  < EXPIRE_INTERVAL )  { 
                    filteredRecords . add ( record ) ; 
                } 
            } 
            if  ( ! filteredRecords . isEmpty ( ) )  { 
                newRecords . put ( partition , filteredRecords) ; 
            } 
        } 
        return  new  ConsumerRecords < > ( newRecords ) ; 
    }

    @Override 
    public  void  onCommit ( Map < TopicPartition ,  OffsetAndMetadata > offsets )  { 
        offsets . forEach ( ( tp , offset )  ->  { 
            System . out . println ( tp +  ":"  + offset . offset ( ) ) ; 
        } ) ; 
    }

    @Override 
    public  void  close ( )  {

    }

    @Override 
    public  void  configure ( Map < String ,  ? > configs )  {

    } 
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
/**
 * Use a custom consumer interceptor
 */ 
public  class  ConsumerWithInterceptor1  { 
    public  static  void  main ( String [ ] args )  { 
        Properties props =  new  Properties ( ) ; 
        props . put ( ConsumerConfig . BOOTSTRAP_SERVERS_CONFIG ,  "vm1:9092" ) ; 
        // same group.id belongs to the same group consumer group 
        props . put ( ConsumerConfig . GROUP_ID_CONFIG ,  "group_test") ; 
        props . put ( ConsumerConfig . ENABLE_AUTO_COMMIT_CONFIG ,  "true" ) ;
         //Automatically submit offsets, and submit 
        props every 1s . put ( ConsumerConfig . AUTO_COMMIT_INTERVAL_MS_CONFIG ,  "1000" ) ; 
        props . put ( ConsumerConfig . VALUE_DESERIALIZER_CLASS_CONFIG ,  StringDeserializer . class . getName ( ) ) ; 
        props .put ( ConsumerConfig . KEY_DESERIALIZER_CLASS_CONFIG ,  StringDeserializer . class . getName ( ) ) ;

        // Use custom consumer interceptor 
        props.put ( ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG , ConsumerInterceptor1.class.getName ( ) ) ; KafkaConsumer < String , String > kafkaConsumer = new KafkaConsumer < > ( props ) ; 
        kafkaConsumer.subscribe ( Arrays.asList ( _ _ _ _ _ _ _ _ "topic_test" ) ) ; 
           
        //Consumer starts an infinite loop and continues to consume 
        while  ( true )  { 
            //Return once the data is pulled, otherwise it will wait up to the time set by duration 
            ConsumerRecords < String ,  String > records = kafkaConsumer . poll ( Duration . ofMillis ( 100 ) ) ; 
            records . forEach ( record  ->  { 
                System . out . printf ( "topic = %s ,partition = %d,offset = %d, key = %s, key = %d, value = %s%n" ,  record .topic ( ) ,  record . partition ( ) , 
                        record . offset ( ) ,  record . key ( ) ,  record . timestamp ( ) ,  record . value ( ) ) ; 
            } ) ; 
        } 
    } 
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
/**
 * Test whether the timestamp filter in ConsumerWithInterceptor takes effect
 * You can manually modify the timestamp of the Producer Record when sending a message
 */ 
public  class  Producer4ConsumerWithInterceptor1  { 
    private  static  final  long EXPIRE_INTERVAL =  10000 ;

    public  static  void  main ( String [ ] args )  { 
        Properties props =  new  Properties ( ) ; 
        props . put ( ProducerConfig . BOOTSTRAP_SERVERS_CONFIG ,  "vm1:9092,vm2:9092,vm3:9092" ) ; 
        props . put ( ProducerConfig . ACKS_CONFIG ,  "all" ) ; 
        props . put ( ProducerConfig .RETRIES_CONFIG ,  3 ) ; 
        props . put ( ProducerConfig . BATCH_SIZE_CONFIG ,  16  *  1024 ) ; 
        props . put ( ProducerConfig . LINGER_MS_CONFIG ,  1 ) ; 
        props . put ( ProducerConfig . BUFFER_MEMORY_CONFIG ,  32  *  1024  *  1024 ) ; 
        props . _ _ProducerConfig . KEY_SERIALIZER_CLASS_CONFIG ,  StringSerializer . class . getName ( ) ) ; 
        props . put ( ProducerConfig . VALUE_SERIALIZER_CLASS_CONFIG ,  StringSerializer . class . getName ( ) ) ;

        KafkaProducer < String ,  String > kafkaProducer =  new  KafkaProducer < > ( props ) ; 
        for  ( int i =  0 ; i <  10 ; i ++ )  { 
            ProducerRecord < String ,  String >  record ; 
            if  ( i %  2  ==  0 )  { 
                record  =  new  ProducerRecord <> ( "topic_test" ,  "hello "  + i ) ; 
            }  else  { 
                //The odd-numbered message timestamp is adjusted forward to test whether the consumer interceptor can filter out 
                record  =  new  ProducerRecord < > ( "topic_test" ,  null ,  System .currentTimeMillis ( ) - EXPIRE_INTERVAL , null , " hello " + i ) ; } 
            kafkaProducer.send ( record ) ; }    
            
        

        //Close the producer to trigger the close method of the Interceptor 
        kafkaProducer . close ( ) ; 
    } 
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

kafka transaction

For details , please subscribe to the column tutorial " rabbitmq/kafka practical tutorial " https://blog.csdn.net/zpcandzhj/category_10152842.html

SpringBoot integrates kafka

For details , please subscribe to the column tutorial " rabbitmq/kafka practical tutorial " https://blog.csdn.net/zpcandzhj/category_10152842.html

Kafka interview questions collection

For the complete content , please subscribe to the column tutorial " rabbitmq/kafka practical tutorial " https://blog.csdn.net/zpcandzhj/category_10152842.html


Welcome to follow the public account [Cheng Yuanwei Tan] for various tutorials.

insert image description here

Related: Kafka detailed tutorial