[Kafka] - Producer 정확히 한번 전송
개요
안녕하세요. 이번 시간에는 Kafka Producer에서 정확히 한번 발송하는 방법에 대해 알아보겠습니다.
혹시 지난 시간에 내용을 놓치고 오신 분들은 학습하고 오시는 걸 추천드리겠습니다.
트랜잭션 API
이전 시간에는 멱등성 옵션을 활용해서 중복 없는 전송에 대해 알아봤습니다. PID(Producer Id) 값과 메시지 번호를 통해 메시지를 비교해 중복 메시지 처리했습니다.
중복 없는 전송은 한번만 전송하는 게 아니라 중복해서 전송할 수 있습니다. 카프카에서 정확히 한번 전송은 트랜잭션과 같은 전체적인 프로세스 처리를 의미하며, 중복 없는 전송은 정확히 한번 전송의 일부 기능이라 할 수 있습니다.
전체적인 프로세스를 관리하기 위해 카프카에서는 정확히 한번 처리를 담당하는 별도에 프로세스가 있는데 이를 트랜잭션 API라고 부릅니다.
프로듀서가 카프카 브로커로 정확히 한번 메시지를 전송할 때, 프로듀서가 보내는 메시지를 원자적(atomic)으로 처리되어 전송에 성공하거나 실패하게 됩니다.
ACID에서 A는 원자성을 뜻 하며, 트랜잭션 내의 모든 작업이 완전히 수행되거나 전혀 수행되지 않음을 보장하는 속성을 말합니다.
카프카에서는 프로듀서의 전송을 위해 컨슈머 그룹 코디네이터와 동일한 개념으로 트랜잭션 코디네이터(Transaction coorinator)라는 것이 서버 측에 존재합니다.
트랜잭션 코디네이터는 프로듀서 메시지를 관리하며, 커밋 또는 중단 등을 표시합니다. 내부 오프셋을 관리하듯 트랜잭션 정보도 브로커 내부 토픽인 _transaction_state에 저장합니다.
_transaction_state는 카프카의 내부 토픽이지만 이 역시 토픽이므로 파티션 수와 리플리케이션 팩터 수가 존재하며, 브로커의 설정을 통해 관리자가 설정할 수 있습니다. 기본 값은 다음과 같습니다.
- transaction.state.log.num.partitions = 50
- transaction.state.log.replication.factor = 3
중요한 내용으로 프로듀서가 해당 토픽에 트랜잭션 로그를 직접 기록하는 것이 아닌 트랜잭션 코디네이터가 관리하고 기록합니다.
카프카에서는 트랜잭션 정보를 올바르게 커밋하기 위해 컨트롤러 메시지 라고 불리는 특별한 타입에 메시지를 추가합니다.
컨트롤러 메시지는 메시지에 포함되거나 노출되지 않습니다. 오직 브로커와 클라이언트 통신에서만 사용됩니다.
트랜잭션 API 프로세스
트랜잭션 API에 대한 개념을 토대로 어떻게 동작하는지 설명 하도록 하겠습니다.
기본적으로 중복 없는 전송과 다른 옵션은 TRANSACTIONAL_ID_CONFIG 입니다. 프로듀서의 TRANSACTIONAL_ID_CONFIG 옵션은 실행하는 프로듀서 프로세스마다 고유한 아이디로 설정해야 합니다. (중복 설정 X)
전체적인 프로세스는 다음 순서로 동작합니다.
- 트랜잭션 코디네이터 찾기
- 프로듀서 초기화
- 트랜잭션 시작
- 트랜잭션 상태 추가
- 메시지 전송
- 트랜잭션 종료 요청
- 사용자 토픽에 표시 요청
- 트랜잭션 완료
1. 트랜잭션 코디네이터 찾기
프로듀서는 브로커에게 FindCoordinatorRequest를 보내서 트랜잭션 코디네이터의 위치를 찾습니다.
트랜잭션 코디네이터는 브로커에 위치하며 PID(Producer Id)와 trancation.id를 매핑하고 해당 트랜잭션 전체를 관리하는 것입니다.
만약 트랜잭션 코디네이터가 존재하지 않는다면 신규 트랜잭션 코디네이터가 생성됩니다.
또한 _transaction_state 토픽의 파티션 번호는 transactional.id를 기반으로 해시하여 결정되며, 리더 브로커가 트랜잭션 코디네이터의 브로커로 최종 선정이 됩니다.
2. 프로듀서 초기화
다음 동작으로 프로듀서 초기화 동작입니다. 프로듀서는 initTransactions() 메서드를 통해 트랜잭션 전송을 위한 InitPidRequest를 트랜잭션 코디네이터로 보냅니다.
이때 TID(transactional.id)가 설정된 경우에는 InitPidRequest와 함께 TID가 트랜잭션 코디네이터에 기록합니다.
트랜잭션 코디네이터는 TID, PID를 맵핑하고 해당 정보를 트랜잭션 로그에 기록하며, PID 에포트(Epoch)를 한 단계 올리는 동작을 하게 됩니다.
PID 에포크가 올라감에 따라 이전의 동일한 PID와 이전 에포크에 대한 쓰기 요청은 무시됩니다.
3. 트랜잭션 시작
트랜잭션을 시작할 때는 프로듀서 쪽에서 beginTransaction() 메서드를 이용해서 새로운 트랜잭션을 시작을 알립니다.
프로듀서 내부적으로는 트랜잭션을 시작했음을 기록하지만, 트랜잭션 코디네이터 관점에서는 첫 번째 레코드가 전송될 때까지 트랜잭션이 시작된 것은 아닙니다.
4. 트랜잭션 상태 추가
프로듀서쪽에서 트랜잭션 시작을 기록한 뒤 다음으로는 트랜잭션 코디네이터에게 해당 내용을 전송합니다.
해당 내용을 전달받은 트랜잭션 코디네이터는 트랜잭션 상태를 추가하고 관리합니다. 위 그림과 같이 TID와 PO(파티션 0)의 정보가 트랜잭션 로그에 기록되며, 현재 상태는 Ongoing으로 표시합니다.
첫 번째 파티션이라면 트랜잭션 코디네이터는 해당 트랜잭션에 대한 타이머를 시작하며, 기본값으로는 1분 동안 트랜잭션 상태에 대한 업데이트가 없다면 해당 트랜잭션은 실패로 처리됩니다.
5. 메시지 전송
트랜잭션 코디네이터에게 상태를 알린 다음으로는 실제 메시지를 전송합니다. 메시지 내용으로는 PID, 에포크, 시퀀스 번호가 함께 전송됩니다.
여기서 브로커가 두개 있는 이유는 트랜잭션 코디네이터가 있는 브로커와 프로듀서가 전송하는 메시지를 받는 브로커가 서로 다르기 때문입니다.
6. 트랜잭션 종료 요청
메시지를 전송한 다음으로는 트랜잭션 종료 요청 동작입니다. 메시지 전송을 완료 한 프로듀서는 commitTransaction() 메서드 또는 abortTransaction() 메서드 중 하나를 반드시 호출해야 합니다.
위 두 메서드를 통해 트랜잭션이 완료되었다고 트랜잭션 코디네이터에게 반드시 알려야 합니다.
트랜잭션 코디네이터는 두 단계 커밋 과정을 시작하게 되며, 첫 번째 단계로 트랜잭션 로그에 해당 트랜잭션에 대한 PrepareCommit 또는 PrepareAbort를 기록합니다.
7. 사용자 토픽에 표시 요청
트랜잭션 코디네이터는 두 번째 단계로서 트랜잭션 로그에 기록된 토픽의 파티션에 트랜잭션 커밋 표시를 기록합니다. 여기서 기록하는 메시지가 바로바로 컨트롤 메시지입니다.
예를 들어, 트랜잭션 프로듀서가 파티션 0에 메시지를 전송했고 해당 메시지의 오프셋이 1이 라고 가정한다면, 트랜잭션 코디네이터는 파티션0에 트랜잭션 커밋 표시 메시지를 기록하고, 이 추가 메시지(컨트롤 메시지)로 인해 파티션 0의 마지막 오프셋은 2로 증가합니다.
해당 메시지는 해당 PID의 메시지가 제대로 전송됐는지 여부를 컨슈머에게 나타내는 용도로도 사용됩니다.
트랜잭션 커밋이 끝나지 않은 메시지는 컨슈머에게 반환하지 않으며, 오프셋의 순서 보장을 위해 트랜잭션 성공 또는 실패를 나타내는 LSO(last stable offset)라는 오프셋을 유지하게 됩니다.
7. 트랜잭션 완료
마지막 단계로 트랜잭션 코디네이터는 완료됨(committed)이라고 트랜잭션 로그에 기록하며, 프로듀서에게 해당 트랜잭션이 완료됨을 알린 다음 해당 트랜잭션에 대한 처리로 마무리합니다.
트랜잭션을 이용하는 컨슈머는 read_committed설정을 하면 트랜잭션 성공한 메시지만 읽을 수 있습니다.
예시 코드
- Producer Config
package com.ryan.springkafkaexample.config;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.producer.enable-idempotence_config}")
private String enableIdempotenceConfig;
@Value("${spring.kafka.producer.acks-config}")
private String acksConfig;
@Value("${spring.kafka.producer.max-in-flight-requests-per-connection}")
private String maxInFlightRequestsPerConnection;
@Value("${spring.kafka.producer.retries-config}")
private String retriesConfig;
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, Object> exactlyOnceProducerFactory() {
Map<String, Object> exactlyOnceConfigProps = new HashMap<>();
exactlyOnceConfigProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
exactlyOnceConfigProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
exactlyOnceConfigProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
exactlyOnceConfigProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotenceConfig);
exactlyOnceConfigProps.put(ProducerConfig.ACKS_CONFIG, acksConfig);
exactlyOnceConfigProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, maxInFlightRequestsPerConnection);
exactlyOnceConfigProps.put(ProducerConfig.RETRIES_CONFIG, retriesConfig);
exactlyOnceConfigProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "peter-transaction-01");
return new DefaultKafkaProducerFactory<>(exactlyOnceConfigProps);
}
@Bean
public KafkaTemplate<String, Object> exactlyOnceKafkaTemplate() {
return new KafkaTemplate<>(exactlyOnceProducerFactory());
}
}
- 발송 코드
/**
* @author Ryan
* @description 정확히 한번 발송을 위한 트랜잭션 API 사용
*/
public void exactlyOnceMessage(String topic, String key, Object value) {
exactlyOnceKafkaTemplate.executeInTransaction(operations -> {
try {
operations.send(topic, key, value).get();
log.info("Message sent successfully to topic: {}", topic);
} catch (Exception e) {
log.error("Error sending message: {}", e.getMessage());
}
return null;
});
}
위 코드를 실행하면 실제 다음과 같은 토픽이 생성됩니다.
이번 시간에는 정확히 한번 발송에 대해 알아봤습니다. 꼭 실습을 하시는 걸 추천드리겠습니다.
소스코드
https://github.com/Ryan-Sin/spring-kafka-example