Programming/Spring

[Spring Boot] Spring Boot + InfluxDB 연동

kmindev 2025. 2. 6. 18:45

1. 개요

Spring Boot로 시계열 데이터베이스인 InfluxDB 2와 연동하는 방법에 대해서 알아보자.

 

2. InfluxDB 서버 환경 구성

먼저 InfluxDB를 도커로 띄워보자.

 

https://docs.influxdata.com/influxdb/v2/install/use-docker-compose/

 

Install InfluxDB using Docker Compose | InfluxDB OSS v2 Documentation

Thank you for your feedback! Let us know what we can do better:

docs.influxdata.com

docker-compose.yml

version: '3.8'

services:
  influxdb:
    image: influxdb:2
    environment:
      - TZ=Asia/Seoul
      - DOCKER_INFLUXDB_INIT_MODE=setup
      - DOCKER_INFLUXDB_INIT_USERNAME=user1      # 관리자 계정
      - DOCKER_INFLUXDB_INIT_PASSWORD=password1  # 관리자 비밀번호
      - DOCKER_INFLUXDB_INIT_ORG=org1            # 조직 이름
      - DOCKER_INFLUXDB_INIT_BUCKET=testbucket   # 버킷 이름
    volumes:
      - influxdb-data:/var/lib/influxdb2
    ports:
      - "8086:8086"  # InfluxDB HTTP API 포트
    networks:
      - test-network

volumes:
  influxdb-data:

networks:
  test-network:
    driver: bridge

 

3. API Token 발급

  • InfluxDB는 RestfulAPI 기반으로 통신한다. API로 통신하기 위해서 API Token이 필요하다. 
  • http://localhost:8086/ 에 접속하면 InfluxDB UI 화면이 나온다.
  • Load Data - API TOKENS로 이동하여 GENERATE API TOKEN 버튼을 눌려 API 토큰을 생성하자. 
    • TOKEN을 생성할 때 권한을 설정할 수 있다. => 보안이 중요한 환경에서 활용하자
    • TOKEN 값은 생성할 때만 볼 수 있으므로 따로 기록해두자.

 

4. 의존성 추가

  • Spring Boot에서 InfluxDB API를 사용하기 위해 의존성을 추가한다.
  • com.influxdb:flux-dsl:7.2.0 은 FluxDSL을 사용하기 위해 추가한 것이다. 사용하지 않는다면 생략하자.
// Influx DB
implementation "com.influxdb:influxdb-client-java:7.2.0"
implementation "com.influxdb:flux-dsl:7.2.0"

 

5. 연결 설정 및 WriteApi 정의

InfluxdbConfig.java

  • Influxdb는 앞서 언급했듯이, Restful API 기반으로 통신하다.
  • WriteAPI는 비동기식, WriteApiBlocking은 동기식으로 동작한다.
  • 성능이 중요한 환경에서는 WriteAPI를 사용하도록 하자.
@Configuration
public class InfluxdbConfig {
    /**
     * 비동기 동작 (응답 대기 x)
     * 데이터를 모아서 한번에 write
     */
    @Bean
    public WriteApi writeApi(InfluxDBClient influxDBClient) {
        WriteOptions writeOptions = WriteOptions.builder()
                .batchSize(1000) // 한 번에 처리할 데이터 개수
                .flushInterval(1000) // 설정 시간(초)마다 저장
                .bufferLimit(10000) // 버퍼 사이즈 조정
                .build();
        return influxDBClient.makeWriteApi(writeOptions);
    }

    /**
     * 동기식 (응답 대기)
     */
    @Bean
    public WriteApiBlocking writeApiBlocking(InfluxDBClient influxDBClient) {
        return influxDBClient.getWriteApiBlocking();
    }

    /**
     * 쿼리 API (조회용)
     */
    @Bean
    public QueryApi queryApi(InfluxDBClient influxDBClient) {
        return influxDBClient.getQueryApi();
    }

    @Bean
    public InfluxDBClient influxDBClient() {
        return InfluxDBClientFactory.create(influxDBClientOptions());
    }

    @Bean
    public InfluxDBClientOptions influxDBClientOptions() {
        return InfluxDBClientOptions.builder()
                .url("http://127.0.0.1:8086")
                .authenticateToken("api token 값을 입력해주세요".toCharArray())
                .org("org1")
                .bucket("sensor")
                .logLevel(LogLevel.BASIC) // NONE, BASIC, HEADERS, BODY
                .build();
    }
}

 

6. InfluxDB 저장 예제 코드

Temperature.java

  • @Measurement를 사용하여 정의한다.
  • Measurement는 RDBMS에서 테이블과 유사한 개념으로 보면 된다.
  • InfluxDB는 UTC 시간대를 기준으로 데이터를 저장한다. 
    • LocalDateTime 대신 Instant 를 사용하는 이유이기도 하다.
@ToString
@AllArgsConstructor(access = AccessLevel.PRIVATE)
@Getter
@Measurement(name = "temperature")
public class Temperature {
    @Column(tag = true) private String location;
    @Column private float value;
    @Column(timestamp = true) private Instant time;

    public static Temperature of(String location, float value) {
        return new Temperature(location, value, Instant.now());
    }
}

 

TemperatureService.java

  • Measurement를 저장하는 코드이다. 
  • Measurement 말고도, Point, Recode 타입으로도 write 할 수 있다.
@RequiredArgsConstructor
@Service
public class TemperatureService {
    private final WriteApi writeApi; // 비동기식: 데이터를 모아서 한번에 write
    private final WriteApiBlocking writeApiBlocking; // 동기식: 즉시 요청 - 응답

    public void addTemperature(TemperatureRequest request) {
        Temperature temperature = Temperature.of(request.location(), request.value());
        writeApi.writeMeasurement(WritePrecision.S, temperature); // 정밀도 설정(S: 초)
    }

    public void addTemperatureBlocking(TemperatureRequest request) {
        Temperature temperature = Temperature.of(request.location(), request.value());
        writeApiBlocking.writeMeasurement(WritePrecision.S, temperature); // 정밀도 설정(S: 초)
    }
}

 

저장 성공!

 

 

7. 데이터 조회 예제

InfluxDB 2 에서 flux라는 쿼리 언어를 사용한다. 필요하다면, 아래 사이트를 이용하길 바란다.

 

https://docs.influxdata.com/influxdb/v2/query-data/flux/

 

Query data with Flux | InfluxDB OSS v2 Documentation

Thank you for your feedback! Let us know what we can do better:

docs.influxdata.com

 

TemperatureService.java

  • flux 쿼리를 문자열로 직접 작성할 수 있지만, 문자열로 작성하면 오타나 에러를 잡기 힘들다.
  • 이런 문제를 해결하기 위해 FluxDSL API를 제공한다.
@RequiredArgsConstructor
@Service
public class TemperatureService {
    private final QueryApi queryApi; // 쿼리 API(조회용)

    public List<TemperatureResponse> getTemperatures(String location, LocalDateTime start, LocalDateTime stop) {
        String query = createQuery(location, start, stop);
        List<FluxTable> tables = queryApi.query(query);
        return tables.stream()
                .flatMap(table -> table.getRecords().stream())
                .map(TemperatureResponse::from)
                .toList();
    }

    /*
    FluxDSL을 사용하여 빌드할 경우 아래와 같은 flux 쿼리가 생성된다.
    
    import "timezone"
    option location = timezone.fixed(offset: 9h)

    from(bucket:"sensor")
	|> range(start:2025-03-05T01:00:00.000000000Z, stop:2025-03-07T01:00:00.000000000Z)
	|> filter(fn: (r) => (r["_measurement"] == "temperature" and r["_field"] == "value" and r["location"] == "거실"))
	|> aggregateWindow(every:1s, fn:mean, column:"_value", timeSrc:"_start", timeDst:"_time", createEmpty:false)
     */
    private String createQuery(String location, LocalDateTime start, LocalDateTime stop) {
        // LocalDateTime을 Instan 타입으로 변환
        Instant startInstant = start.atZone(ZoneId.systemDefault()).toInstant();
        Instant stopInstant = stop.atZone(ZoneId.systemDefault()).toInstant();

        return Flux.from("sensor")
                .withLocationFixed("9h") // 타임존 설정(서울=UTC+9)
                .range(startInstant, stopInstant) // 시간 범위 지정
                .filter(Restrictions.and(
                        Restrictions.measurement().equal("temperature"),
                        Restrictions.field().equal("value"),
                        Restrictions.tag("location").equal(location))
                )
                .aggregateWindow()
                .withEvery("1s")
                .withAggregateFunction("mean") // 평균값
                .withColumn("_value")
                .withTimeSrc("_start")
                .withTimeDst("_time")
                .withCreateEmpty(false)
                .toString();
    }

}

 

조회 성공!

 

예제 코드

https://github.com/kmindev/boot-influxdb

 

GitHub - kmindev/boot-influxdb: spring boot - influxdb 연동 예제

spring boot - influxdb 연동 예제. Contribute to kmindev/boot-influxdb development by creating an account on GitHub.

github.com

 

참고자료

https://github.com/influxdata/influxdb-client-java

 

GitHub - influxdata/influxdb-client-java: InfluxDB 2 JVM Based Clients

InfluxDB 2 JVM Based Clients. Contribute to influxdata/influxdb-client-java development by creating an account on GitHub.

github.com

 

https://docs.influxdata.com/influxdb/v2/

 

InfluxDB OSS v2 Documentation

Thank you for your feedback! Let us know what we can do better:

docs.influxdata.com