1. MQTT Broker 설치
실습에 앞서 먼저 MQTT Broker를 설치해야 한다. 간단한 예제를 하기 때문에 로컬환경에 설치했다.
MQTT Broker 중 Mosquitto를 설치하여 사용하였고, 아래 링크로 접속해서 설치를 진행했다.
window일 경우 아래 사진을 참고해서 설치하면 된다.
https://mosquitto.org/download/
Download
Source mosquitto-2.0.17.tar.gz (GPG signature) Git source code repository (github.com) Older downloads are available at https://mosquitto.org/files/ Binary Installation The binary packages listed be
mosquitto.org
2. MQTT Broker 구동
설치한 Mosquitto를 구동하자.
CMD에 path\mosquitto -v 를 입력하면 Broker가 구동된다.
3. MQTT Explorer 클라이언트
MQTT Explorer 설치
MQTT Explorer를 설치했다. Broker로 데이터를 전달(발행)하기 위해 사용했다.
MQTT Explorer
An all-round MQTT client that provides a structured topic overview
mqtt-explorer.com
MQTT Broker 연결
name, host, port를 확인하고 CONNECT 버튼을 클릭하면 된다.
“test/topic”이라는 topic으로 발행
4. Spring Boot Mqtt
시작하기에 앞서 EAI 패턴을 구현한 프레임워크인 Spring Integration에 대해서 조금 공부할 필요가 있다.
a. Spring Integration
Spring Integration에 대해서 간략히 정리한 내용이다.
Spring Integration
- 시스템 통합에 대해 프로그램 간에 빠르고 신뢰할 수 있는 Messaging을 통해 해결해 왔으며 이를 패턴화한 것이 EAI(기업 통합 패턴)이라 한다.
- EAI 패턴을 구현한 프레임워크가 Spring Integration이다.
Spring Integration 구성
- Message: 전송할 데이터가 담긴 Wrraper Class로 데이터 전송 단위이다.
- MessageHeaders: 메타 데이터(key - value)
- Payload: 전송할 가치가 있는 실제 데이터(JSON 등)
- Channel: Message가 한 시스템에서 다른 시스템으로 전달되는 파이프 역할을 수행
- Point to Point Channel : 1:1 전송 용도
- Publish/Subscribe Channel : 1:N 전송 용도
- Service Activator: Message를 처리할 때 사용된다.
- Adapter: 외부 시스템과의 통신을 단순화하고 메시지를 주고받을 수 있게 한다.
- inbound Adapter: 외부 시스템으로 메시지를 가져올 때 사용
- outbound Adapter: 메시지를 외부로 보내는 데 사용
![](https://blog.kakaocdn.net/dn/csa2zB/btsxtlYDRgk/hegE5GqLeKiUeKDxnm5iak/img.png)
b. MQTT 실습 코드
ⅰ. 의존성 추가
implementation 'org.springframework.boot:spring-boot-starter-integration'
implementation 'org.springframework.integration:spring-integration-mqtt'
참고로 필자는 아래 버전으로 진행했다.
- Spring Boot 2.7.15
- JAVA 8
- Spring Integraion mqtt 5.5.18
ⅱ. outBound 예제
MqttConfig
- 여기서 아웃바운드 채널은 (Spring Application → 외부시스템(MQTT Broker)) 메시지 전달을 위한 채널
@Configuration
public class MqttConfig {
@Value("${spring.mqtt.broker-url}")
private String brokerUrl;
@Value("${spring.mqtt.client-id}")
private String clientId;
@Value("${spring.mqtt.topic}")
private String topic;
// 클라이언트 팩토리
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{brokerUrl});
factory.setConnectionOptions(options);
return factory;
}
// 채널 구성
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(topic);
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
// 메시지 전송 채널 지정
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MyGateway {
void sendToMqtt(String data);
}
}
MqttService
- process() 메서드가 호출될 때 mqtt broker로 메시지를 전송한다.
@Service
@RequiredArgsConstructor
@Slf4j
public class MqttService {
private final MqttConfig.MyGateway myGateway;
public void process() {
String msg = "Hello MQTT!";
myGateway.sendToMqtt(msg);
}
}
ⅲ. Inbound 예제
MqttConfig
- 여기서 인바운드 채널은 Topic을 구독을 위한 채널 구성(MQTT Broker → Spring Application)
@Configuration
public class MqttConfig {
private static final String BROKER_URL = "tcp://localhost:1883"; // 브로커 주소
private static final String MQTT_CLIENT_ID = MqttAsyncClient.generateClientId(); // 클라이언트 고유식볋자
private static final String TOPIC_FILTER = "test/topic"; // Topic
@Bean
public MessageChannel mqttInputChannel() {
// Point to Point Channel 중 가장 기본
// messageHandler에게 Message를 전송
// DirectChannel을 구독하는 핸들러 하나에게만 브로드 캐스트
// PublishSubscribeChannel은 Publish/Subscribe Channel로
// 해당 채널을 구독한 모든 핸들러에게 브로드 캐스트
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
// 메시지 수신을 위한 채널을 구성
// 생성자를 통해 topic을 여러개 추가할 수 있음.
// addTopic() 메서드도 존재함.
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(BROKER_URL, MQTT_CLIENT_ID, TOPIC_FILTER);
adapter.setCompletionTimeout(5000); // 메시지 처리가 5초를 초과하면 타임아웃으로 간주
adapter.setConverter(new DefaultPahoMessageConverter()); // MQTT 메시지를 Spring Integration 메시지로 변환
adapter.setQos(1); // Qos 설정
adapter.setOutputChannel(mqttInputChannel()); // Spring Integration 채널 설정
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler messageHandler() {
// MQTT 클라이언트에 의해 수신된 페이로드는
// Broker -> inbound ➤ mqttInputChannel ➤ MessageHandler
// 순으로 이동되어 MessageHandle에서 메시지를 확인할 수 있음.
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
// TODO Auto-generated method stub
System.out.println(message.getHeaders());
System.out.println("Topic: " + message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));
System.out.println("Payload: " + message.getPayload());
}
};
}
}
message.getHeaders 메타정보에는 아래와 같은 정보가 포함되어 있다.
- mqtt_receivedRetained=false, => Retained flag : true이면 메시지가 브로커에 보관
- mqtt_id=0, => 패킷 식별자
- mqtt_duplicate=false, => true이면 메시지가 중복 전송된 것
- id=230df40a-46b7-3695-9776-ab1f9bed887b, => 메시지 식별자
- mqtt_receivedTopic=test/topic, => MQTT Topic
- mqtt_receivedQos=0, => QoS 레벨
- timestamp=1695012377433 => 수신된 시간
message.getPayload
실제 데이터가 포함되어 있다.
'Programming > Spring' 카테고리의 다른 글
[Spring Boot] Spring Data JPA + PostgreSQL 예제 (0) | 2023.09.25 |
---|---|
[Spring Boot] Spring Boot + Redis CRUD 예제 (0) | 2023.09.24 |
[Spring] Bean 생명주기 콜백 (0) | 2023.09.16 |
[Spring] 의존관계 자동으로 주입하는 방법 (0) | 2023.09.07 |
[Spring] 컴포넌트 스캔(Component Scan) (0) | 2023.09.07 |