Kafka-Config
Consumer
Name | Description | Default |
---|---|---|
bootstrap.servers | 服务地址 | |
key.deserializer | 消息 Key 的反序列化方式 | |
value.deserializer | 消息 Value 的反序列化方式 | |
group.id | 消费组 ID | “” |
auto.offset.reset | offset 重置方式 - earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费。 - latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据。 - none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 - anything else:抛出异常 |
latest |
enable.auto.commit | offset 提交方式 - true:自动提交 - false:手动提交 |
true |
max.poll.interval.ms | 使用消费组管理时的的拉取时间间隔 | 300000 |
max.poll.records | 一次拉取时的最大记录数 | 500 |
sasl.mechanism | 用于客户端连接的 SASL 机制(一般用 PLAIN) | GSSAPI |
security.protocol | 连接 broker 的协议 - PLAINTEXT - SSL - SASL_PLAINTEXT - SASL_SSL |
PLAINTEXT |
Spring-Kafka-Listener
Name | Description | Default |
---|---|---|
spring.kafka.listener.ack-mode | 提交 offset 方式 spring-kafak负责提交: - record:每处理一个消息,就提交一次 - batch:将上一次poll得到消息进行提交 - time:达到指定时间间隔,就提交 - count:达到指定次数,就提交 - count_time:达到指定次数和间间隔,就提交 手动提交: - manual:调用后先存放至本地缓存,在下一次poll之前取出批量提交 - manual_immediate:调用后立即提交 |
Authentication using SASL/PLAIN
Broker
在每一个 Kafka broker 的 config 目录中, 添加一个 JAAS 文件:
kafka_server_jaas.conf
1
2
3
4
5
6
7KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_alice="alice-secret";
};将 JAAS 配置文件的路径作为 JVM 的参数, 并传递到每一个 Kafka broker
1
-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
在
server.properties
中配置 SASL 端口和 SASL 机制1
2
3
4listeners=SASL_SSL://host.name:port
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAINConsumer
1
2
3
4
5sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="alice" \
password="alice-secret";
security.protocol=SASL_SSL
sasl.mechanism=PLAIN