1. Kafka 설치 및 실행
a. Kafka 설치
아래의 링크에서 Kafka 2.8.2 를 설치했다.
https://kafka.apache.org/downloads
b. Zookeeper 실행
Kafka는 Zookeeper를 내부적으로 사용한다. Kafka를 실행하기 전에 아래 명령어로 Zookeeper를 먼저 실행시키도록 하자.
path\bin\windows\zookeeper-server-start.bat config\zookeeper.properties
c. Kafka 실행
아래 명령어로 Kafka를 실행시키자.
path\bin\windows\kafka-server-start.bat config\server.properties
2. Spring Boot Kafka
a. 의존성 추가
Spring Boot 프로젝트를 생성하고 Spring for Apach Kafka를 추가하자.
implementation 'org.springframework.kafka:spring-kafka'
b. application.yml
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092 # Kafka 클러스터에 대한 주소
group-id: foo # 컨슈머 그룹의 고유 식별자인 그룹 ID를 설정
auto-offset-reset: earliest
# Kafka 메시지의 key를 역직렬화할 때 사용할 클래스를 지정
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# Kafka 메시지의 value를 역직렬화할 때 사용할 클래스를 지정
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
# Kafka 클러스터에 대한 주소
bootstrap-servers: localhost:9092
# Kafka 메시지의 key를 직렬화할 때 사용할 클래스를 지정
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# Kafka 메시지의 value를 직렬화할 때 사용할 클래스를 지정
value-serializer: org.apache.kafka.common.serialization.StringSerializer
c. KafkaProducer
### KafkaProducer
```java
@Service
@RequiredArgsConstructor
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage1(String message) {
System.out.println("Producer topic: topic1, message: " + message);
kafkaTemplate.send("topic1", message);
}
public void sendMessage2(String message) {
System.out.println("Producer topic: topic2, message: " + message);
kafkaTemplate.send("topic2", message);
}
}
KafkaTemplate을 통해 Topic과 Message를 Kafka 서버로 전송
d. KafkaConsumer
@Service
public class KafkaConsumer {
@Bean
public NewTopic topic1() {
return TopicBuilder.name("topic1")
.partitions(10) // 10개의 파티션으로 분할
.replicas(1) // 1개의 레플리카로 복
.build();
}
@KafkaListener(topics = "topic1", groupId = "foo")
public void consume1(String message) throws IOException {
System.out.println("Consumer topic: topic1, message: " + message);
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("topic2")
.partitions(10)
.replicas(1)
.build();
}
@KafkaListener(topics = "topic2", groupId = "foo")
public void consume2(String message) throws IOException {
System.out.println("Consumer topic: topic2, message: " + message);
}
}
Consumer의 Topic을 설정하고, @KafkaListener를 통해 Kafak 서버로부터 Message를 전달 받습니다.
e. KafkaController
@RestController
@RequiredArgsConstructor
public class KafkaController {
private final KafkaProducer producer;
@PostMapping("/kafka1")
public void sendMessage1(@RequestBody String message) {
producer.sendMessage1(message);
}
@PostMapping("/kafka2")
public void sendMessage2(@RequestBody String message) {
producer.sendMessage2(message);
}
}
엔드포인트 요청을 통해 Kafka를 테스트 해보았다.
f. 결과
간단한 Kafka 예제를 정리해봤다. Kafka는 확장으로 인한 흩어져있는 여러 데이터를 Kafka로 관리함으로써 데이터를 효유율적으로 수집, 저장, 처리할 수 있어 확장하기에 매우 적합하다.
'Programming > Spring' 카테고리의 다른 글
[Spring] 순환참조 문제 해결 (1) | 2023.10.04 |
---|---|
[Spring Boot] Spring Boot + ES 예제 (0) | 2023.09.28 |
[Spring Boot] Spring Data JPA + PostgreSQL 예제 (0) | 2023.09.25 |
[Spring Boot] Spring Boot + Redis CRUD 예제 (0) | 2023.09.24 |
[Spring Boot] Spring Integration MQTT 예제 (0) | 2023.09.24 |