Kafka-Deployment

Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

Download

Start

  • 启动脚本:
    • Linux:文件夹bin/
    • Windows:文件夹bin/windows
  • 启动 Zookeeper:bin/zookeeper-server-start.sh config/zookeeper.properties
  • 启动 Kafka:bin/kafka-server-start.sh config/server.properties

Spring for Kafka

Dependency

1
2
3
4
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

Config

1
2
3
4
5
6
7
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=group
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

auto-offset-reset

  • earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
  • latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据

SendMessage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Service
public class ProducerService {

@Resource
private KafkaTemplate<Object, Object> template;

public void send(String topic, String message) {
template.send(topic, message);
}

public void send(String message) {
template.send(AppConstant.TOPIC_NAME, message);
}
}

ReceiveMessage

1
2
3
4
5
6
7
8
@Service
public class ConsumerService {

@KafkaListener(id = "001", topics= AppConstant.TOPIC_NAME, groupId = AppConstant.GROUP_ID)
public void receive(String message) {
// TODO
}
}

参考文献