-
카프카 프로듀서(producer), 컨슈머(consumer) (2)Kafka 2023. 7. 2. 21:40
스프링 카프카
스프링 카프카는 카프카를 스프링 프레임워크에서 효과적으로 사용할 수 있도록 만들어진 라이브러리다.
기존 카프카 클라이언트 라이브러리를 래핑하여 만든 스프링 카프카 라이브러리는 카프카 클라이언트에서 사용하는 여러 가지 패턴을 미리 제공한다.
스프링 카프카 라이브러리를 사용하기 위해서 다음과 같이 build.gradle에 디펜던시를 추가한다.
dependencies { implementation 'org.springframework.kafka:spring-kafka:2.5.10.RELEASE' implementation 'org.springframework.boot:spring-boot-starter:2.4.0' }
1. 스프링 카프카 프로듀서
스프링 카프카 프로듀서는 '카프카 템플릿'이라고 불리는 클래스를 사용하여 데이터를 전송할 수 있다.
카프카 템플릿을 사용하는 방법은 크게 두가지가 있다. 첫 번째는 스프링 카프카에서 제공하는 기본 카프카 템플릿을 사용하는 방법이고,
두번째는 직접 사용자가 카프카 템플릿을 프로듀서 팩토리로 생성하는 방법이다.
application.yaml 내용
spring: kafka: producer: bootstrap-servers: my-kafka:9092 acks: all
다음은 스프링 부트 애플리케이션을 실행하고 test0부터 test9까지 메시지 값을 클러스터로 보내는 프로듀서 애플리케이션을 작성해보자.
package com.example; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.kafka.core.KafkaTemplate; @SpringBootApplication public class SpringProducerApplication implements CommandLineRunner { private static String TOPIC_NAME = "test"; @Autowired private KafkaTemplate<Integer, String> template; public static void main(String[] args) { SpringApplication application = new SpringApplication(SpringProducerApplication.class); application.run(args); } @Override public void run(String... args) { for (int i = 0; i < 10; i++) { template.send(TOPIC_NAME, "test" + i); } System.exit(0); } }
여기서 send() 메서드를 사용해서 토픽 이름과 메시지 값을 넣어 전송한다.
카프카 프로듀서의 send()메서드와 유사한 것을 확인할 수 있다.
for (int i = 0; i < 10; i++) { template.send(TOPIC_NAME, "test" + i); }
1.1 커스텀 카프카 템플릿
커스텀 카프카 템플릿은 프로듀서 팩토리를 통해 만든 카프카 템플릿 객체를 빈으로 등록하여 사용하는 것이다.
프로듀서에 필요한 각종 옵션을 선언하여 사용할 수 있으며, 한 스프링 카프카 애플리케이션 내부에 다양한 종류의 카프카 프로듀서 인스턴스를 생성하고 싶다면 이 방식을 사용하면 된다.
예를 들어, A클러스터로 전송하는 카프카 프로듀서와 B클러스터로 전송하는 카프카 프로듀서를 동시에 사용하고 싶다면,
커스텀 카프카 템플릿을 사용하여 2개의 카프카 템플릿을 빈으로 등록하여 사용할 수 있다.
package com.example; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.*; import java.util.HashMap; import java.util.Map; @Configuration public class KafkaTemplateConfiguration { @Bean public KafkaTemplate<String, String> customKafkaTemplate() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "my-kafka:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.ACKS_CONFIG, "all"); ProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(props); return new KafkaTemplate<>(pf); } }
package com.example; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.WebApplicationType; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.kafka.core.KafkaProducerException; import org.springframework.kafka.core.KafkaSendCallback; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.util.concurrent.ListenableFuture; @SpringBootApplication public class SpringProducerApplication implements CommandLineRunner { private static String TOPIC_NAME = "test"; @Autowired private KafkaTemplate<String, String> customKafkaTemplate; public static void main(String[] args) { SpringApplication application = new SpringApplication(SpringProducerApplication.class); application.run(args); } @Override public void run(String... args) { ListenableFuture<SendResult<String, String>> future = customKafkaTemplate.send(TOPIC_NAME, "test"); future.addCallback(new KafkaSendCallback<String, String>() { @Override public void onSuccess(SendResult<String, String> result) { } @Override public void onFailure(KafkaProducerException ex) { } }); System.exit(0); } }
2. 스프링 카프카 컨슈머
스프링 카프카 컨슈머는 기존 카프카 컨슈머를 2개의 타입으로 나누고 커밋을 7가지로 나누어 세분화했다.
우선, 타입은 레코드 리스너(MessageListener)와 배치 리스너(BatchMessageListener) 가 있다.
리스너 종류에 따라 한번 호출하는 메서드에서 처리하는 레코드의 개수가 달라진다.
레코드 리스너
- 단 1개의 레코드를 처리한다.
배치 리스너
- 기존 카프카 클라이언트 라이브러리의 poll() 메서드로 리턴받은 ConsumerRecords처럼 한 번에 여러 개 레코드들을 처리할 수 있다.
스프링 카프카 컨슈머의 기본 리스너 타입은 레코드 리스너이다. 레코드 리스너와 배치 리스너 외에도 각 리스너에서 파생된 형태로 다음과 같은 형태가 있다.
RECORD Type: Record 인스턴스 단위로 프로세싱
리스너 이름 생성 메서드 파라미터 설명 MessageListener onMessage(ConsumerRecord<K, V> data)
onMessage(V data)오토 커밋 또는 컨슈머 컨테이너의 AckMode를 사용하는 경우 AcknowledgingMessageListener onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment)
onMessage(V data, Acknowledgment acknowledgment)매뉴얼 커밋을 사용하는 경우 ConsumerAwareMessageListener onMessage(ConsumerRecord<K, V> data, Consumer<K, V> consumer)
onMessage(V data, Consumer<K, V> consumer)컨슈머 객체를 활용하고 싶은 경우 AcknowledgingConsumerAwareMessageListener onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer)
onMessage(V data, Acknowledgment acknowledgment, Consumer<K, V> consumer)매뉴얼 커밋을 사용하고 컨슈머 객체를 활용하고 싶은 경우 BATCH Type: Records 인스턴스 단위로 프로세싱
리스너 이름 파라미터 설명 BatchMessageListener onMessage(ConsumerRecords<K, V> data)
onMessage(List<V> data)오토 커밋 또는 컨슈머 컨테이너의 AckMode를 사용하는 경우 BatchAcknowledgingMessageListener onMessage(ConsumerRecords<K, V> data, Acknowledgment acknowledgment)
onMessage(List<V> data, Acknowledgment acknowledgment)매뉴얼 커밋을 사용하는 경우 BatchConsumerAwareMessageListener onMessage(ConsumerRecords<K, V> data, Consumer<K, V> consumer)
onMessage(List<V> data, Consumer<K, V> consumer)컨슈머 객체를 활용하고 싶은 경우 BatchAcknowledgingConsumerAwareMessageListener onMessage(ConsumerRecords<K, V> data, Acknowledgment acknowledgment, Consumer<K, V> consumer)
onMessage(List<V> data, Acknowledgment acknowledgment, Consumer<K, V> consumer)매뉴얼 커밋을 사용하고 컨슈머 객체를 활용하고 싶은 경우 카프카 컨슈머에서 커밋을 직접 구현할 때는
오토 커밋, 동기 커밋, 비동기 커밋 크게 세가지로 나뉘지만 실제 운영환경에서는 다양한 종류의 커밋을 구현해서 사용한다.
예를 들어, 특정 타이밍마다 커밋을 하거나 레코드 개수에 따라 커밋을 하는 규칙을 적용할 경우에는 로직을 새로 작성해야 한다.
스프링 카프카에서는 커밋이라고 부르지 않고 'AckMode'라고 한다.
스프링 카프카 컨슈머의 AckMode 기본값은 BATCH이고 컨슈머의 enable.auto.commit 옵션은 false로 지정된다.
AckMode 종류와 설명
RECORD 레코드 단위로 프로세싱 이후 커밋 BATCH poll() 메서드로 호출된 레코드가 모두 처리된 이후 커밋
스프링 카프카 컨슈머의 AckMode 기본값TIME 특정 시간 이후에 커밋
이 옵션을 사용할 경우 시간 간격을 선언하는 AckTime 옵션을 설정해야 한다.COUNT 특정 개수만큼 레코드가 처리된 이후 커밋
이 옵션을 사용할 경우에는 레코드 개수를 선언하는 AckCount 옵션을 설정해야 한다.COUNT_TIME TIME, COUNT 옵션 중 맞는 조건이 하나라도 나올 경우 커밋 MANUAL Acknowledgement.acknowledge() 메서드가 호출되면 다음번 poll() 때 커밋을 한다. 매번 acknowledge() 메서드를 호출하면 BATCH 옵션과 동일하게 동작한다. 이 옵션을 사용할 경우에는
AcknowledgingMessageListener 또는 BatchAcknowledgingMessageListener를 리스너로 사용해야 한다.MANUAL_IMMEDIATE Acknowledgement.acknowledge() 메서드를 호출한 즉시 커밋한다.
이 옵션을 사용할 경우에는 AcknowledgingMessageListener 또는
BatchAcknowledgingMessageListener를 리스너로 사용해야 한다.기본 리스너 컨네이너
# application.yaml spring: kafka: consumer: bootstrap-servers: my-kafka:9092 listener: type: RECORD
레코드 리스너를 활용한 코드
SpringConsumerApplication.java
package com.example; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.PartitionOffset; import org.springframework.kafka.annotation.TopicPartition; @SpringBootApplication public class SpringConsumerApplication { public static Logger logger = LoggerFactory.getLogger(SpringConsumerApplication.class); public static void main(String[] args) { SpringApplication application = new SpringApplication(SpringConsumerApplication.class); application.run(args); } @KafkaListener(topics = "test", groupId = "test-group-00") public void recordListener(ConsumerRecord<String,String> record) { logger.info(record.toString()); } @KafkaListener(topics = "test", groupId = "test-group-01") public void singleTopicListener(String messageValue) { logger.info(messageValue); } @KafkaListener(topics = "test", groupId = "test-group-02", properties = { "max.poll.interval.ms:60000", "auto.offset.reset:earliest" }) public void singleTopicWithPropertiesListener(String messageValue) { logger.info(messageValue); } @KafkaListener(topics = "test", groupId = "test-group-03", concurrency = "3") public void concurrentTopicListener(String messageValue) { logger.info(messageValue); } @KafkaListener(topicPartitions = { @TopicPartition(topic = "test01", partitions = {"0", "1"}), @TopicPartition(topic = "test02", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "3")) }, groupId = "test-group-04") public void listenSpecificPartition(ConsumerRecord<String, String> record) { logger.info(record.toString()); } }
스프링 카프카 배치 리스너
# application.yaml spring: kafka: consumer: bootstrap-servers: my-kafka:9092 listener: type: BATCH
package com.example; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.kafka.annotation.KafkaListener; import java.util.List; @SpringBootApplication public class SpringConsumerApplication { public static Logger logger = LoggerFactory.getLogger(SpringConsumerApplication.class); public static void main(String[] args) { SpringApplication application = new SpringApplication(SpringConsumerApplication.class); application.run(args); } @KafkaListener(topics = "test", groupId = "test-group-01") public void batchListener(ConsumerRecords<String, String> records) { records.forEach(record -> logger.info(record.toString())); } @KafkaListener(topics = "test", groupId = "test-group-02") public void batchListener(List<String> list) { list.forEach(recordValue -> logger.info(recordValue)); } @KafkaListener(topics = "test", groupId = "test-group-03", concurrency = "3") public void concurrentBatchListener(ConsumerRecords<String, String> records) { records.forEach(record -> logger.info(record.toString())); } }
* 출처: 아파치 카프카 애플리케이션 프로그래밍 with 자바 책