Programming/Spring

[Spring Boot] Spring Boot + Kafka 예제

kmindev 2023. 9. 26. 22:32

1. Kafka 설치 및 실행

a. Kafka 설치

아래의 링크에서 Kafka 2.8.2 를 설치했다.

https://kafka.apache.org/downloads

 

Apache Kafka

Apache Kafka: A Distributed Streaming Platform.

kafka.apache.org

 

b. Zookeeper 실행

Kafka는 Zookeeper를 내부적으로 사용한다. Kafka를 실행하기 전에 아래 명령어로 Zookeeper를 먼저 실행시키도록 하자.

path\bin\windows\zookeeper-server-start.bat config\zookeeper.properties

 

c. Kafka 실행

아래 명령어로 Kafka를 실행시키자.

path\bin\windows\kafka-server-start.bat config\server.properties

 

 

2. Spring Boot Kafka 

a. 의존성 추가

Spring Boot 프로젝트를 생성하고 Spring for Apach Kafka를 추가하자.

implementation 'org.springframework.kafka:spring-kafka'

 

b. application.yml

spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:9092 # Kafka 클러스터에 대한 주소
      group-id: foo    # 컨슈머 그룹의 고유 식별자인 그룹 ID를 설정
      auto-offset-reset: earliest
			
      # Kafka 메시지의 key를 역직렬화할 때 사용할 클래스를 지정
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer

	  # Kafka 메시지의 value를 역직렬화할 때 사용할 클래스를 지정
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    

	producer:
	  # Kafka 클러스터에 대한 주소
      bootstrap-servers: localhost:9092

	  # Kafka 메시지의 key를 직렬화할 때 사용할 클래스를 지정
      key-serializer: org.apache.kafka.common.serialization.StringSerializer

	  # Kafka 메시지의 value를 직렬화할 때 사용할 클래스를 지정
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

 

c. KafkaProducer

@Service
@RequiredArgsConstructor
public class KafkaProducer {
    private final KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage1(String message) {
    	System.out.println("Producer topic: topic1, message: " + message);
        kafkaTemplate.send("topic1", message);
    }
    
    public void sendMessage2(String message) {
    	System.out.println("Producer topic: topic2, message: " + message);
        kafkaTemplate.send("topic2", message);
    }
}

KafkaTemplate을 통해 Topic과 Message를 Kafka 서버로 전송

 

d. KafkaConsumer

@Service
public class KafkaConsumer {
	
	@Bean
    public NewTopic topic1() {
    	return TopicBuilder.name("topic1")
    			.partitions(10) // 10개의 파티션으로 분할
    			.replicas(1) // 1개의 레플리카로 복
    			.build();
    }

    @KafkaListener(topics = "topic1", groupId = "foo")
    public void consume1(String message) throws IOException {
    	System.out.println("Consumer topic: topic1, message: " + message);
    }
    
    @Bean
    public NewTopic topic2() {
    	return TopicBuilder.name("topic2")
    			.partitions(10)
    			.replicas(1)
    			.build();
    }
    
    @KafkaListener(topics = "topic2", groupId = "foo")
    public void consume2(String message) throws IOException {
    	System.out.println("Consumer topic: topic2, message: " + message);
    }
}

Consumer의 Topic을 설정하고, @KafkaListener를 통해 Kafak 서버로부터 Message를 전달 받습니다.

 

 

e. KafkaController

@RestController
@RequiredArgsConstructor
public class KafkaController {
    private final KafkaProducer producer;

    @PostMapping("/kafka1")
    public void sendMessage1(@RequestBody String message) {
        producer.sendMessage1(message);
    }
    
    @PostMapping("/kafka2")
    public void sendMessage2(@RequestBody String message) {
        producer.sendMessage2(message);
    }
}

엔드포인트 요청을 통해 Kafka를 테스트 해보았다.

 

f. 결과

 

 

간단한 Kafka 예제를 정리해봤다. Kafka는 확장으로 인한 흩어져있는 여러 데이터를 Kafka로 관리함으로써 데이터를 효유율적으로 수집, 저장, 처리할 수 있어 확장하기에 매우 적합하다.