이전 글에서는 Kafka와 RabbitMQ의 이론에 대해서 알아보았다.
이번에는 프로젝트에서 사용한 Spring boot + Python을 Kafka와 연결하여 사용하는 방법에 대해서 설명하고자 한다.
해당 포스트는 이론적인 부분은 모두 제외되었기 때문에, 이론부터 공부를 하고자 한다면 이전 글을 참고하길 바랍니다.
해당 관련 코드들은 모두 Github에서 확인하실 수 있습니다.
Docker
Docker Desktop 설치
Docker + GUI 환경을 제공하는 Docker Desktop을 설치한다. 이 부분에서는 각 운영체제에 맞춰 설치하면 되기 때문에 따로 설명을 하지 않겠습니다.
Kafka + Kafka-ui 설치
Kafka와 Kafka의 상태를 모니터링하기 위한 Kafka-ui를 Docker를 통해 설치하겠습니다.
docker-compose.yml
전체 코드 보기
networks:
kafka_network:
driver: bridge
volumes:
kafka01:
driver: local
kafka02:
driver: local
kafka03:
driver: local
services:
kafka01:
image: bitnami/kafka:latest
container_name: kafka01
networks:
- kafka_network
volumes:
- kafka01:/bitnami/kafka
ports:
- 10000:9094
environment:
- KAFKA_CFG_BROKER_ID=1
- KAFKA_CFG_NODE_ID=1
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_KRAFT_CLUSTER_ID=HsDBs9l6UUmQq7Y5E6bNlw
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka01:9093,2@kafka02:9093,3@kafka03:9093
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka01:9092,EXTERNAL://127.0.0.1:10000
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
kafka02:
image: bitnami/kafka:latest
container_name: kafka02
networks:
- kafka_network
volumes:
- kafka02:/bitnami/kafka
ports:
- 10001:9094
environment:
- KAFKA_CFG_BROKER_ID=2
- KAFKA_CFG_NODE_ID=2
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_KRAFT_CLUSTER_ID=HsDBs9l6UUmQq7Y5E6bNlw
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka01:9093,2@kafka02:9093,3@kafka03:9093
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka02:9092,EXTERNAL://127.0.0.1:10001
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
kafka03:
image: bitnami/kafka:latest
container_name: kafka03
networks:
- kafka_network
volumes:
- kafka03:/bitnami/kafka
ports:
- 10002:9094
environment:
- KAFKA_CFG_BROKER_ID=3
- KAFKA_CFG_NODE_ID=3
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_KRAFT_CLUSTER_ID=HsDBs9l6UUmQq7Y5E6bNlw
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka01:9093,2@kafka02:9093,3@kafka03:9093
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka03:9092,EXTERNAL://127.0.0.1:10002
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
networks:
- kafka_network
depends_on:
- kafka01
- kafka02
- kafka03
ports:
- 8090:8080
environment:
- DYNAMIC_CONFIG_ENABLED=true
- KAFKA_CLUSTERS_0_NAME=kafka-clusters
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka01:9092,kafka02:9092,kafka03:9092
환경 변수에 대한 설명
컨테이너 | 설정 이름 | 설명 |
kafka | KAFKA_CFG_BROKER_ID | Kafka 인스턴스 ID 반드시, 고유한 ID를 가져야한다. |
KAFKA_CFG_NODE_ID | Kafka에서 KRaft를 사용하기 위해서는 NODE_ID를 반드시 지정해야 한다. | |
KAFKA_ENABLE_KRAFT | Kraft 사용 여부 | |
KAFKA__KRAFT_CLUSTER_ID | Cluster의 고유한 ID | |
ALLOW_PLAINTEXT_LISTENER | PLAINTEXT를 사용한 통신 여부 암호화가 되지 않은 통신 |
|
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE | 토픽 자동 생성 여부 | |
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS | 메타데이터, 각종 정보를 저장 관리 하기 위한 Controller 선출 목록 | |
KAFKA_CFG_LISTENERS | 내부, 외부 통신 주소 + 포트 설정 | |
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP | 통신할 때 사용할 프로토콜을 지정 현재, 모든 값은 암호화 되지 않은 형식으로 진행 (PLAINTEXT) |
|
KAFKA_CFG_PROCESS_ROLES | 해당 Kafka 서버가 무슨 역할을 할지 정하는 설정 KRaft를 사용하기 위해 반드시 필요하다. controller : 메타데이터 관리, 리더와 팔로우 선출 관리 등 broker : 일반 데이터 관리로써, 메시지를 주고 받고 파티션에 저장하는 역할 |
|
KAFKA_CFG_CONTROLLER_LISTENER_NAMES | 다른 브로커나 컨트롤러와 통신할 때 사용할 리스너 이름 KRaft를 사용하기 위해 반드시 필요하다. |
|
kafka-ui | KAFKA_CLUSTERS_0_NAME | 표시할 클러스터 이름 |
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS | 클러스터에 등록된 Kafka 서버의 목록 | |
DYNAMIC_CONFIG_ENABLED | 브로커의 설정이 바뀌면 Kafka 클러스터의 브로커 재시작 없이 동적으로 설정을 바꿔주는 역할 |
docker-compose.yml 실행
cmd를 열고 docker-compose.yml이 있는 곳까지 이동하고, 다음과 같은 명령어를 작성한다.
docker-compose -f docker-compose.yml up -d
이러한 창이 뜨면 성공한겁니다.
http://localhost:8090/ui/clusters/local/brokers를 입력 후 들어가면 Broker1이 리더 브로커로 승격되어 있는 모습을 볼 수 있다.
Code
코드를 최대한 단순화 시키기 위해 다음과 같은 로직을 만들겠습니다.
Spring boot
build.gradle
dependencies {
// ... 이전 코드
implementation 'org.springframework.kafka:spring-kafka',
}
RequestKafkaDto
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class RequestKafkaDto {
private Long id;
private String name;
}
ResponseKafkaDto
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class ResponseKafkaDto {
private Long id;
private String message;
}
application.properties
spring.application.name=kafka-spring
# Kafka
spring.kafka.bootstrap-servers=localhost:10000,localhost:10001,localhost:10002
spring.kafka.consumer.group-id=test-group
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable-auto-commit=false
키 | 설명 |
bootstrap-servers | Kafka 서버의 주소 목록 |
consumer.group-id | Consumer의 기본 그룹 ID |
consumer.auto-offset-reset | Kafka 서버 구동 시 Consumer의 메시지를 어디서부터 읽을지 설정하는 변수 earliest : 앞에서부터 읽는다. latest : 뒤에서부터 읽는다. |
KafkaProducerConfig
package seungyong20.tistory.kafkaspring.config;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import seungyong20.tistory.kafkaspring.dto.RequestKafkaDto;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
@Slf4j
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
/**
* 메시지를 보내기 위한 ProducerFactory를 생성합니다.
* Key는 String, Value는 RequestKafkaDto로 설정합니다.
*/
@Bean
public ProducerFactory<String, RequestKafkaDto> producerFactory() {
log.info("Creating producer factory");
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
/**
* KafkaTemplate을 생성합니다.
* Key는 String, Value는 RequestKafkaDto로 설정합니다.
*/
@Bean
public KafkaTemplate<String, RequestKafkaDto> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
KafkaConsumerConfig
package seungyong20.tistory.kafkaspring.config;
import com.fasterxml.jackson.core.JsonParseException;
import seungyong20.tistory.kafkaspring.dto.ResponseKafkaDto;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.FixedBackOff;
import java.net.SocketTimeoutException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@Configuration
@EnableKafka
@Slf4j
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private String enableAutoCommit;
private final Long interval = 1000L;
private final Long maxAttempts = 3L;
@Bean
public ConsumerFactory<String, ResponseKafkaDto> consumerFactory() {
log.info("Creating consumer factory");
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// 각 Consumer 인스턴스를 식별하기 위한 고유한 ID를 생성합니다.
configProps.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, UUID.randomUUID().toString());
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
// JSON 데이터를 ResponseKafkaDto로 역직렬화하기 위한 JsonDeserializer를 생성합니다.
JsonDeserializer<ResponseKafkaDto> deserializer = new JsonDeserializer<>(ResponseKafkaDto.class);
// ErrorHandlingDeserializer를 사용하여 예외가 발생했을 때 처리를 수행합니다.
ErrorHandlingDeserializer<ResponseKafkaDto> errorHandlingDeserializer = new ErrorHandlingDeserializer<>(deserializer);
// Kafka ConsumerFactory를 생성합니다. Key는 String, Value는 ResponseKafkaDto로 설정하고, 에러 발생 시 처리를 위해 ErrorHandlingDeserializer를 사용합니다.
return new DefaultKafkaConsumerFactory<>(configProps, new StringDeserializer(), errorHandlingDeserializer);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, ResponseKafkaDto> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, ResponseKafkaDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(1);
factory.setCommonErrorHandler(errorHandler());
return factory;
}
@Bean
public DefaultErrorHandler errorHandler() {
BackOff fixedBackOff = new FixedBackOff(interval, maxAttempts);
DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, e) -> {
log.error("Error occurred while processing: {} and value : {}", e, consumerRecord.value());
}, fixedBackOff);
errorHandler.addRetryableExceptions(SocketTimeoutException.class);
errorHandler.addNotRetryableExceptions(NullPointerException.class);
errorHandler.addNotRetryableExceptions(JsonParseException.class);
errorHandler.addNotRetryableExceptions(SerializationException.class);
return errorHandler;
}
}
설명이 필요한 코드들은 하나하나씩 알아보겠습니다.
configProps.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, UUID.randomUUID().toString());
각 Consumer 인스턴스를 식별하기 위해 고유한 UUID를 생성합니다.
JsonDeserializer<ResponseKafkaDto> deserializer = new JsonDeserializer<>(ResponseKafkaDto.class);
ErrorHandlingDeserializer<ResponseKafkaDto> errorHandlingDeserializer = new ErrorHandlingDeserializer<>(deserializer);
ResponseKafkaDto를 Json 형태로 바꾸고, 역직렬화 단계에서 에러가 발생할 시 오류를 핸들링하기 위해 ErrorHandlingDeserializer로 래핑합니다.
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
MANUAL_IMMEDIATE는 Acknowledgement.acknowledge() 메소드를 호출하면 즉시, 메시지를 정상적으로 처리했다라는 것을 알리게 합니다.
factory.setConcurrency(1);
Consumer가 동시에 몇 개로 운영될지를 정합니다. 즉, 1로 설정하였다면 Consumer는 1개의 스레드로 메시지를 처리합니다.
BackOff fixedBackOff = new FixedBackOff(interval, maxAttempts);
DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, e) -> {
log.error("Error occurred while processing: {} and value : {}", e, consumerRecord.value());
}, fixedBackOff);
interval 초마다 최대 maxAttempts만큼 재시도를 합니다.
log 부분에서 실패 토픽에 메시지를 보내기, 또 다른 처리를 할 수 있지만, 지금은 log만 남기도록 하겠습니다.
errorHandler.addRetryableExceptions(SocketTimeoutException.class);
errorHandler.addNotRetryableExceptions(NullPointerException.class);
errorHandler.addNotRetryableExceptions(JsonParseException.class);
errorHandler.addNotRetryableExceptions(SerializationException.class);
SocketTimeoutException은 일시적인 네트워크 문제일 수 있기 때문에 재시도를 하고, NullPointer, JsonParse, Serialization은 단순 데이터 문제이기 때문에 즉시 오류를 발생시킵니다.
KafkaProducer
@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaProducer {
private final KafkaTemplate<String, RequestKafkaDto> kafkaTemplate;
public void sendMessage(String topic, String key, RequestKafkaDto requestKafkaDto) {
log.info("Producing {} message: {}", key, requestKafkaDto);
kafkaTemplate.send(topic, key, requestKafkaDto);
}
}
KafkaConsumer
@Slf4j
@Getter
@Setter
@Component
public class KafkaConsumer {
// spring_topic을 가입하여, 메시지를 기다림
@KafkaListener(topics = "spring_topic")
public void listen(ConsumerRecord<String, ResponseKafkaDto> record, Acknowledgment acknowledgment) {
try {
// 전달 받은 값의 key 가져오기
String key = record.key();
// 전달 받은 값의 value 가져오기
ResponseKafkaDto payload = record.value();
log.info("Received message: key={}, id={}, message={}", key, payload.getId(), payload.getMessage());
} catch (Exception e) {
log.error("Error occurred while consuming message: {}", e.getMessage());
} finally {
// 메시지 처리가 되었음 알림
acknowledgment.acknowledge();
}
}
}
KafkaController
원래는 Service, Repository 등을 통해 데이터를 검증 및 가공하여 보내지만, 여기에서는 단순하게 메시지 전송만 하겠습니다.
@RequiredArgsConstructor
@RestController
@RequestMapping("/kafka")
public class KafkaController {
private final KafkaProducer kafkaProducer;
@PostMapping
public ResponseEntity<?> sendMessage() {
kafkaProducer.sendMessage("python_topic", "test_key", new RequestKafkaDto(1L, "seungyong"));
return ResponseEntity.noContent().build();
}
}
Python
python 관련 코드들은 코드들이 상당히 직관적이기 때문에 설명이 따로 필요하지 않을 거 같아서 생략하겠습니다.
kafka 라이브러리 설치
pip install kafka-python
# .env를 가져오기 위한 라이브러리
pip install python-dotenv
.env
bootstrap-servers=localhost:10000,localhost:10001,localhost:10002
group-id=test-group
consumer-topic=python_topic
producer-topic=spring_topic
log.py
Log 관련은 자세히 다루지 않겠습니다.
import logging
import os
class Logger:
def __init__(self, name):
self.directory = os.getcwd() + "/log/"
if not os.path.exists(self.directory):
os.makedirs(self.directory)
self.logger = logging.getLogger(name)
if not self.logger.hasHandlers():
self.logger.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# Stream handler
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
self.logger.addHandler(stream_handler)
# File handler
file_handler = logging.FileHandler(f'{self.directory}/{name}.log')
file_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
consumer.py
import json
import os
from kafka import KafkaConsumer
from log import Logger
from producer import Producer
from dotenv import load_dotenv
load_dotenv()
consumer_topic = os.environ.get("consumer-topic")
group_id = os.environ.get("group-id")
bootstrap_servers = os.environ.get("bootstrap-servers").split(",")
class Consumer:
broker = ""
topic = ""
group_id = ""
logger = None
consumer = None
producer = None
def __init__(self):
self.logger = Logger(name='consumer').logger
self.broker = bootstrap_servers
self.topic = consumer_topic
self.group_id = group_id
self.consumer = KafkaConsumer(
bootstrap_servers=self.broker,
group_id=self.group_id,
auto_offset_reset='latest',
enable_auto_commit=True,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
)
self.producer = Producer()
self.consumer.subscribe([self.topic])
def run(self):
self.logger.info('Starting Consumer...')
for message in self.consumer:
self.logger.info('Receive Message')
key = str(message.key, 'utf-8') if message.key else 'default_key'
is_json = isinstance(message.value, dict)
if not is_json:
self.logger.error('Not Json type')
continue
self.logger.info(f"Receving {key} message id : {message.value['id']} value : {message.value['name']}")
self.producer.send_message('python_key', {
'id': message.value['id'],
'message': f"Hello, {message.value['name']}"
})
producer.py
import json
import os
from dotenv import load_dotenv
from kafka import KafkaProducer
from log import Logger
load_dotenv()
producer_topic = os.environ.get("producer-topic")
bootstrap_servers = os.environ.get("bootstrap-servers")
class Producer:
broker = ""
topic = ""
producer = None
logger = None
def __init__(self):
self.logger = Logger(name='producer').logger
self.broker = bootstrap_servers
self.topic = producer_topic
self.producer = KafkaProducer(
bootstrap_servers=self.broker,
key_serializer=lambda x: bytes(x, encoding='utf-8'),
value_serializer=lambda x: json.dumps(x).encode('utf-8'),
retries=3
)
def send_message(self, key, message):
try:
self.producer.send(
topic=self.topic,
key=key,
value=message
)
self.producer.flush()
except Exception as e:
self.logger.error(f"Error sending message to {self.topic}: {message} cause {e.__str__()}")
사용
이제 실제로 구동해서 서로 데이터를 주고받는지 확인을 해보자.
구동
Spring boot
consumer의 spring_topic 가입과 동시에, consumer가 등록되었다는 로그가 뜨면 성공한겁니다.
Python
단순하게, Starting Consumer...라는 로그가 찍히면 성공한겁니다.
실행
Spring boot
Producer가 메시지를 보냈다는 문구가 뜨면 됩니다.
또한, Python 측에서 데이터를 가공 후 보낸 값도 잘 받아졌는지 봐야합니다.
Python
Python 측 또한 내가 보낸 값들이 받아졌습니다.
만약, Spring에서 10000개의 메시지가 발행된다면?
스레드 1개
자바도 스레드 1개, python도 스레드 1개로 처리해서 순차적으로 처리되는 것을 볼 수 있습니다.
스레드 5개
Spring은 다음과 같이 바꿨습니다.
// KafkaConsumerConfig.java
factory.setConcurrency(5);
Python은 다음과 같이 바꿨습니다.
import concurrent.futures
# ...
class Consumer:
# ...
def __init__(self):
# ...
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
def run(self):
self.logger.info('Starting Consumer...')
try:
for message in self.consumer:
self.executor.submit(self.process_message, message)
except Exception as e:
self.logger.error(f'Failed process message : {e}')
finally:
self.executor.shutdown(wait=True)
def process_message(self, message):
self.logger.info('Receive Message')
key = str(message.key, 'utf-8') if message.key else 'default_key'
is_json = isinstance(message.value, dict)
if not is_json:
self.logger.error('Not Json type')
return
self.logger.info(f"Receving {key} message id : {message.value['id']} value : {message.value['name']}")
self.producer.send_message('python_key', {
'id': message.value['id'],
'message': f"Hello, {message.value['name']}"
})
마무리
졸업 작품 하면서 Spring boot와 Python 서버 간의 비동기 작업 및 통신할 수 있는 솔루션이 필요했고, 최근에 많이 사용한다는 Kafka를 사용해보았습니다. 물론 오버 엔지니어링이라 효율은 떨어지지만 새로운 기술을 사용해보고자라는 마음이 컸습니다.
또한, Kafka를 클러스터링 해 여러 대를 두지 않았지만, 포스팅할 때만큼은 여러 대로 두고, 스레드도 5개로 늘려 처리해보았습니다. 컴터가 안 좋아서 렉이 걸리네요...
References
'Project 하면서 알아가는 것들' 카테고리의 다른 글
Kafka와 RabbitMQ를 알아보자 (0) | 2024.08.20 |
---|---|
[Nextjs] Tiptap 사용법과 커스텀마이징 기능 구현 (1) | 2024.02.28 |
IoC, DI, DPI 확실히 개념 잡기 (0) | 2023.11.12 |
REST API 확실히 개념 잡기 (0) | 2023.11.02 |
백엔드에서 이미지 업로드는 어떻게 하면 좋을까? (6) | 2023.11.01 |