개발이 취미인 사람

[Kafka] - Kafka Consumer 메시지 소비 본문

백앤드(Back-End)/Kafka

[Kafka] - Kafka Consumer 메시지 소비

RyanSin 2024. 8. 2. 13:31
반응형

개요

안녕하세요. 이번 시간에는 Kafka Consumer의 대해 알아보겠습니다.

 

지난 시간에는 Kafka Producer의 대해 알아봤습니다. 혹시 놓치고 오신 분들은 꼭 학습을 하고 오시는 걸 추천드리겠습니다.

[Kafka] - Kafka Producer 메시지 생성

 

[Kafka] - Kafka Producer 메시지 생성

개요안녕하세요. 이번 시간에는 Kafka Producer의 대해 알아보겠습니다. 지난 시간에 로컬환경에 Kafka를 설치한 후 간단한 메시지를 생성하고 소비하는 방법에 대해 알아봤습니다.혹시 놓치고 오신

any-ting.tistory.com

 

기본 개념

Kafka는 메시지 브로커 소프트웨어입니다.

메시지 브로커 소프트웨어 구조

 

Consumer는 기본적으로 특정 토픽 안에 생성된 파티션에 저장되어 있는 메시지를 가져와서 처리하는 역할을 합니다.

단순히 메시지를 가져와서 처리하는 것보단 내부적으로 Consumer Group, Consumer Rebalancing 등 여러 동작을 통해 빠르게 메시지를 소비하고 장애에 대응합니다.

 

아무리 Producer가 Queue에 메시지를 빠르게 전송한다고 해도 Consumer가 빠르게 가져갈 수 없다면 이것 또한... 큰 문제입니다.

결국 지연이 발생하고 올바른 Kafka 퍼포먼스를 보장할 수 없습니다.

 

Consumger Group

Consumer Group은 Consumer를 그룹화해서 관리하기 위해 사용된다고 생각하시면 됩니다.

 

왜 그룹을 만들어서 관리할까요? 하나의 파티션에 하나의 Consumer가 연결되어 있다면 Consumer Group은 의미가 없습니다. 하지만 대량에 데이터를 여러 파티션에 저장하고 해당 파티션에 여러 Consumer가 연결되어 있는 상황에서 Consumer Gorup이 효과를 봅니다.

 

이슈는 Rebalancing 기능을 통해 하나의 Consumer가 장애가 발생하면 다른 Consumer가 해당 작업을 처리합니다.

Consumer Rebalancing

 

간혹 그럼 파티션 수 보다 Consumer를 수를 더 늘려서 운영하면 좋겠다고 생각할 수 있지만 파티션 수와 컨슈머 수는 1:1 매핑되는 것이 이상적입니다.

 

컨슈머 수가 많다고 해서 그 만큼 일을 하는 게 아니라 단순히 대기 상태로 아무것도 하지 않기 때문입니다.

 

컨슈머 주요 옵션

옵션 설명
bootstrap.servers 클라이언트가 카프카 클러스터에 처음 연결하기 위한 호스트와 포트 정보입니다.
fetch.min.bytes 한 번에 가져올 수 있는 최소 데이터 크기입니다. 만약 지정한 크기보다 작은 경우 요청에 응답하지 안고 데이터가 누적될 때까지 기다립니다.
fetch.max.bytes 한 번에 가져오기 요청으로 가져올 수 있는 최대 크기입니다.
fetch.max.wait.ms fetch.min.bytes에 의해 설정된 데이터보다 적은 경우 요청에 대한 응답을 기다리는 최대 시간입니다
group.id 컨슈머가 속한 컨슈머 그룹을 식별하는 식별자 키 입니다. 
heartbeat.interval.ms 컨슈머가 브로커에게 현재 살아있고 작업을 하고 있다 라는 헬스체크 시간을 설정하는 값 입니다.
session.timeout.ms와 밀접한 관계가 있으며, session.timeout.ms 보다 낮은 값으로 설정해야 합니다. 일반적으로 session.timeout.ms 의 1/3로 설정합니다.
max.partition.fetch.bytes 파티션당 가져올 수 있는 최대 크기를 의미합니다.
session.timeout.ms  해당 설정 값을 통해 컨슈머가 종료된 것인지를 판단합니다.
컨슈머는 주기적으로 브로커에게 하트비트를 보내야하고, 만약 설정한 시간동안 하트비트를 보내지 않으면 해당 컨슈머는 종료된 것으로 간주하고 컨슈머 그룹에서 제외하고 리벨런싱을 시작합니다.
enable.auto.commit 백그라운드 주기적으로 오프셋을 커밋합니다.
auto.offset.reset 카프카에서 초기 오프셋이 없거나 현재 오프셋이 더 이상 존재하지 않는 경우에 다음 옵션으로 reset합니다.
- earliest: 가장 초기의 오프셋 값으로 설정합니다.
- latest: 가장 마지막의 오프셋 값으로 설정합니다.
- none: 이전 오프셋 값을 찾지 못하면 에러를 나타냅니다.
group.instance.id 컨슈머의 고유한 식별자입니다. 만약 설정한다면 static 맴버로 간주되어 불필요한 리벨런싱을 하지 않습니다.
isolation.level 트랜잭션 컨슈머에서 사용되는 옵션으로, read_uncommitted는 기본값으로 모든 메시지를 읽고, read_committed는 트랜잭션이 완료된 메시지만 읽습니다.
max.poll.records 한번에 poll() 요청시 가져오는 최대 메시지 수를 정의합니다.
partition.assignment.strategy 파티션 할당 전략이며, 기본값은 range입니다.

 

 

위 옵션중에 여러가지 설정이 있지만 데이터를 실시간으로 처리하는 방식과 어느정도 모았다가 처리하는 방식에 따라 Consumer 전략이 달라집니다.

 

실시간 처리 방식

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "real-time-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); // 자동 커밋 설정
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000); // 자동 커밋 간격 설정
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1); // 한 번에 최대 하나의 레코드만 가져오기
        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1); // 최소 가져오기 바이트 수 설정
        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 100); // 최대 대기 시간 설정

        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

 

배치 처리 방식

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "batch-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 수동 커밋 설정
        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1048576); // 최소 1MB
        props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 52428800); // 최대 50MB
        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500); // 최대 500ms 대기

        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true); // 배치 리스너 설정
        return factory;
    }
}

 

위 옵션은 제가 설정한 부분이기 때문에 꼭 팀내 의논을 통해서 설정 하시는 걸 추천드리겠습니다.