Kafka-Config

Consumer

Name Description Default
bootstrap.servers 服务地址
key.deserializer 消息 Key 的反序列化方式
value.deserializer 消息 Value 的反序列化方式
group.id 消费组 ID “”
auto.offset.reset offset 重置方式
- earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费。
- latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据。
- none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
- anything else:抛出异常
latest
enable.auto.commit offset 提交方式
- true:自动提交
- false:手动提交
true
max.poll.interval.ms 使用消费组管理时的的拉取时间间隔 300000
max.poll.records 一次拉取时的最大记录数 500
sasl.mechanism 用于客户端连接的 SASL 机制(一般用 PLAIN) GSSAPI
security.protocol 连接 broker 的协议
- PLAINTEXT
- SSL
- SASL_PLAINTEXT
- SASL_SSL
PLAINTEXT

Spring-Kafka-Listener

Name Description Default
spring.kafka.listener.ack-mode 提交 offset 方式
spring-kafak负责提交:
- record:每处理一个消息,就提交一次
- batch:将上一次poll得到消息进行提交
- time:达到指定时间间隔,就提交
- count:达到指定次数,就提交
- count_time:达到指定次数和间间隔,就提交
手动提交:
- manual:调用后先存放至本地缓存,在下一次poll之前取出批量提交
- manual_immediate:调用后立即提交

Authentication using SASL/PLAIN

Broker

  1. 在每一个 Kafka broker 的 config 目录中, 添加一个 JAAS 文件:kafka_server_jaas.conf

    1
    2
    3
    4
    5
    6
    7
    KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-secret"
    user_admin="admin-secret"
    user_alice="alice-secret";
    };
  2. 将 JAAS 配置文件的路径作为 JVM 的参数, 并传递到每一个 Kafka broker

    1
    -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
  3. server.properties 中配置 SASL 端口和 SASL 机制

    1
    2
    3
    4
    listeners=SASL_SSL://host.name:port
    security.inter.broker.protocol=SASL_SSL
    sasl.mechanism.inter.broker.protocol=PLAIN
    sasl.enabled.mechanisms=PLAIN

    Consumer

    1
    2
    3
    4
    5
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="alice" \
    password="alice-secret";
    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN

Reference