개발이 취미인 사람

[Kafka] - Kafka Producer 메시지 생성 본문

백앤드(Back-End)/Kafka

[Kafka] - Kafka Producer 메시지 생성

RyanSin 2024. 8. 1. 13:41
반응형

개요

안녕하세요. 이번 시간에는 Kafka Producer의 대해 알아보겠습니다.

 

지난 시간에 로컬환경에 Kafka를 설치한 후 간단한 메시지를 생성하고 소비하는 방법에 대해 알아봤습니다.

혹시 놓치고 오신 분들은 이전 내용을 학습하고 오시는 걸 추천드리겠습니다.

[Kafka] - 로컬 개발 환경 구축

 

[Kafka] - 로컬 개발 환경 구축

개요안녕하세요 이번 시간에는 로컬 PC 환경에 Kafka 환경을 구축하고 토픽을 생성해서 콘솔 환경에서 메시지를 전송하고 메시지를 읽는 실습 시간을 가져보겠습니다. 혹시 이전 시간에 Kafka에

any-ting.tistory.com

 

Producer Flow

Kafka에서 메시지를 전송하는 방법은 Producer를 구현해서 메시지를 전송하는 방법이 있습니다.

그럼 내부적으로 프로듀서는 어떤 식으로 동작하는지 우리는 제대로 알고 사용해야 합니다. 아래 그림은 Producer 실행 흐름을 보여줍니다.

 

출처: https://dzone.com/articles/take-a-deep-dive-into-kafka-producer-api

 

 

ProducerRecord는 우리가 Kafka Broker로 메시지 보내는 실제 데이터이며, 레코드는 토픽, 파티션, 키, 벨류로 구성됩니다.

  1. Topic(토픽): 메시지를 보내기 위한 대상토픽 (필수)
  2. Partition(파티션): 토픽 안에 있는 파티션을 지정, 지정하지 않다면 Kafka는 옵션에 따라 메시지를 파티션을 설정합니다. (미필수)
  3. Key(키): 동일한 파티션에 메시지의 순서를 보장하기 위해 설정하는 값이다.(미필수)
  4. Value: 실제 전송하려는 데이터 

 

Send() 메서드를 호출하면 프로듀서 내부적으로 Serializer와 Partitioner를 거치게 됩니다. 여기서 중요한 부분은 Paritioner입니다. Serializer는 우리가 생각하는 데이터를 직렬화 작업을 진행합니다. Paritioner는 ProducerRecord에서 Partition 설정에 따라 내부 동작이 달라집니다.

 

파티션 정보를 지정 했다면 파티셔너는 아무 동작도 하지 않고 지정된 파티션으로 레코드를 전달합니다.

파티션 정보를 지정하지 않았다면 키를 가지고 파티션을 선택해 레코드를 전달합니다.

 

마지막으로 프로듀서 내부적으로 Send() 메서드 호출 이후 레코드들을 파티션별로 잠시 모아둡니다. 그러는 이유는 카프카로 전송하기 전, 배치 전송을 하기 위함입니다. 전송이 실패하면 재시도 동작이 이뤄지고, 지정된 횟수만큼의 재시도가 실패하면 최종 실패로 처리합니다.

 

만약 메시지를 성공적으로 전송하면 메타데이터를 리턴 받게 됩니다.

 

프로듀서 주요 옵션

옵션 설명
bootstrap.servers 클라이언트가 카프카 클러스터에 처음 연결하기 위한 호스트와 포트 정보입니다.
client.dns.lookup 하나의 호스트에 여러 IP를 매핑해 사용하는 일부 환경에서 클라이언트가 하나의 IP와 연결하지 못할 경우에 다른 IP로 시도하는 설정입니다.
acks 프로듀서가 카프카 토픽의 리더 측에 메시지를 전송한 후 요청을 완료하기를 결정하는 옵셥입니다. 0, 1, -1(all)로 설정할 수 있습니다.

acks = 0: 프로듀서가 브로커의 응답을 기다리지 않습니다. (가장 빠르지만, 메시지 손실될 수 있습니다.)
acks = 1: 리더 브로커가 메시지를 기록하면 응답합니다. (가장 보편적인 설정입니다.)
acks = -1: 모든 ISR(In-Sync Replica) 복제 브로커가 메시지를 기록해야 응답을 보냅니다.
                  가장 높은 신뢰성을 보장하지만, 성능이 저하도리 수 있습니다.
buffer.memory 프로듀서가 카프카 서버로 데이터를 보내기 위해 잠시 대기할 수 있는 전체 메모리 바이트입니다.
compression.type 프로듀서가 메시지 전송시 압축 타입을 설정합니다. none, gzip, snappy, lz4, zstd 중에 설정할 수 있습니다.
enalbe.idempotence 해당 옵션을 true로 설정하면 중복 없는 전송이 가능합니다.
이와 동시에 max.in.flight.requests.per.connection은 5이하, retries는 0 acks는 -1(all)로 설정해야합니다.
max.in.flight.requests.per.connection 하나의 커넥션에서 프로듀서가 최대한 ACK 없이 전송할 수 있느 요청 수입니다.
메시지의 순서가 중요하다면 1로 설정할 것을 권장하지만, 성능 저하가 발생할 수 있습니다.
retries 일시적인 오류로 인해 전송에 실패한 데이터를 다시 보내는 횟수입니다.
batch.size 프로듀서는 동일한 파티션으로 보내는 여러 데이터를 한번에 배치로 처리할 때 설정하는 배치 크기 입니다.
linger.ms 데이터를 배치 처리하기 전 기다리는 시간입니다.
batch.size만큼 데이터가 쌓이지 않았다면 linger.ms 설정한 제한 시간까지 기다렸다가 메시지를 전송합니다.
transaction.id "정확히 한 번 전송"을 위해 사용하는 옵션이며, 동일한 TranscationId에 한해 정확히 한 번을 보장합니다.
해당 옵션을 사용하기 전 enalbe.idempotence 값을 true 설정해야합니다.

 

 

예시 코드

@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

 

Producer 설정 정보는 위 코드를 보시는 것처럼 옵션들을 추가할 수 있습니다.

 

필요한 설정에 맞게 옵션을 설정해서 처리하는 게 중요합니다. 팀 내부적으로 내용을 검토하고 진행하는 걸 추천드립니다.

메시지 발송 방식

Kafka Producer는 메시지를 전송하는 방식이 다양합니다.

  1. 동기식 메시지 발송
  2. 비동기식 메시지 발송
  3. 콜백 메시지 발송
  4. 정확히 한 번만 발송

정확히 한 번만 발송이라는 방식은 다른 글에서 설명하도록 하겠습니다.

 

동기 및 비동기에 대한 개념은 어느 정도 아시겠지만, 모르시는 분들은 아래 링크를 통해 학습하고 오시는 걸 추천드리겠습니다.

[CS] Blocking/Non-blocking & Sync/Async에 대해서

 

[CS] Blocking/Non-blocking & Sync/Async 에 대해서

개요 안녕하세요. 이번 시간에는 Blocking/Non-blocking과 Sync/Async에 대해 알아보겠습니다. 이전에 공부하면서 이해하고 어느 정도 실무에 잘 반영하고 있다고 생각하고 있었는데 다시 공부하는 시점

any-ting.tistory.com

 

기본적으로 Kafka Producer는 상황에 맞는 방식으로 전송을 하면 되지만 비동기식 메시지 발송 방식을 사용해서 메시지를 전송합니다.

예시 코드

동기식 메시지 발송

/**
 * @author Ryan
 * @description 동기 전송
 */
public void sendSynchronousMessage(String topic, String key, Object message) {
    try {
        kafkaTemplate.send(topic, key, message).get();
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}

 

send() 메서드를 호출하고 메시지 체이닝 방식을 통해 get()을 호출하면 메시지를 발송하고 해당 메시지에 대한 결과 값을 확인하겠다는 뜻입니다. 즉 동기식으로 결과를 기다리게 됩니다.

 

비동기식 메시지 발송

/**
 * @author Ryan
 * @description 비동기 전송
 */
public void sendAsynchronousMessage(String topic, String key, Object message) {
    kafkaTemplate.send(topic, key, message);
}

 

동기식 발송과 다르게 get() 메서드를 호출하지 않고 있죠. 그렇기 때문에 비동기 방식으로 메시지를 발송할 수 있습니다.

 

콜백 메시지 전송

/**
 * @author Ryan
 * @description 콜백 전송
 */
public void sendMessageWithCallback(String topic, String key, Object value) {
    CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, key, value);
    future.thenAccept(result -> {
        RecordMetadata metadata = result.getRecordMetadata();
        log.info("Message sent to topic: {} partition: {} offset: {}", metadata.topic(), metadata.partition(), metadata.offset());
    }).exceptionally(ex -> {
        ex.printStackTrace();
        return null;
    });
}

 

메시지 발송은 비동기 방식으로 처리합니다. send() 메서드를 호출한 후 CompletableFuture 객체를 반환해서 나중에 결과 여부를 확인합니다.

 

thenAccept 메서드가 호출되면 메시지가 정상적으로 처리됐다는 뜻입니다. 반대로 exceptionally 메서드가 호출되면 실패를 했다고 처리됩니다.

 

이번 시간에는 Kafka에서 메시지를 전송하는 Producer에 대해 알아봤습니다. 꼭 직접 실습을 해보시는 걸 추천드리겠습니다.

 

Spring boot에서는 실제 Kafka 서버를 실행하지 않고 자체 Kafka 서버를 실행해 테스트가 가능합니다. 해당 내용은 아래 소스코드에 있습니다. 확인하시면 좋을 것 같아요!!

 

소스코드

https://github.com/Ryan-Sin/spring-kafka-example

 

GitHub - Ryan-Sin/spring-kafka-example

Contribute to Ryan-Sin/spring-kafka-example development by creating an account on GitHub.

github.com