Post

Kafka를 이용한 티켓팅 대기열 시스템 구현 및 순번 처리

티켓팅 시스템은 수천, 수만 명이 동시에 접속하는 상황을 고려해야 한다.

특히 티켓팅 당일에는 막대한 트래픽이 몰리기 때문에, 이를 효율적으로 처리할 수 있는 대기열 시스템이 필요하다.

또한 실시간 대기열 관리와 순번 조회 기능이 필요하다.

대기열을 구현하기 위한 방법에는 SQS, RabbitMQ, Kafka와 같은 큐 서비스가 있다. 각각의 서비스의 장단점을 알아보고, 적합성을 판단하여 구현해보자!


SQS

  • AWS에서 제공하는 완전 관리형 서비스로 설정이 간단하고 확장성이 뛰어나다.
  • 메시지 중복 제거와 지연 메시지 처리가 가능하다.
  • 다만, 사용량에 따라 비용이 증가하며 메시지 처리 속도와 보장 수준은 Kafka보다 낮다.

RabbitMQ

  • 다양한 라우팅 옵션과 기능을 제공한다.(우선순위, TTL)
  • 클러스터링을 통한 고가용성 지원
  • 매우 큰 규모의 메시징 시스템에서 성능이 떨어질 수 있으며, 설정 및 운영이 복잡하다.
  • 초당 수 백만 개의 요청 처리에는 제한이 있다.

Kafka

  • 초당 수백만 개의 메시지를 처리할 수 있는 높은 처리량과 내구성이 좋고 장애 발생 시 데이터 손실 최소화한다.
  • 클러스터 확장이 용이하여 트래픽 급증에 유연하게 대응이 가능하다.
  • 설정이 다소 복잡할 수 있으며, 관리가 필요하다.

대량의 트래픽 처리에 최적화되어 있고, 장애 발생 시에도 데이터 손실을 최소화할 수 있어, 수 천 ~ 수 만 명이 동시에 몰리는 티켓 예매 시스템에 적합하다고 판단하였다.




그러면 이제 Kafka에 대해서 알아보자





Kafka

분산 메시지 큐 시스템으로 대규모 데이터를 빠르게 처리할 수 있도록 도와준다.

동작 방식 및 특징

Pub-Sub 모델의 메시지 큐 형태로 동작한다.

image

메시지 큐?

메시지 지향 미들웨어를 구현한 시스템, 프로세스 간 데이터를 교환할 때 사용하는 기술이다.

  • Queue : producer의 데이터를 임시 저장하고 Consumer에 제공한다.
  • Producer : 정보를 제공하는 역할로, 데이터를 대기열로 보낸다
  • Consumer : 정보를 제공받아 소비한다.
  • Topic : topic를 통해 데이터를 구분한다.
  • Partition : 병렬처리 지원한다.

메시지 큐의 장점?

우선 비동기 처리가 가능하여, Queue라는 임시 저장소를 통해 작업을 나중에 처리할 수 있다. 또한, 낮은 결합도를 유지할 수 있어 애플리케이션과 메시지 처리를 분리해 독립적으로 운영할 수 있으며, 확장성이 뛰어나 Producer와 Consumer 서비스를 필요에 따라 자유롭게 확장이 가능하다. 마지막으로, 탄력성과 보장성을 제공하여 Consumer 서비스가 중단되더라도 메시지가 MQ에 남아있어 손실되지 않아, MQ에 들어간 모든 메시지는 반드시 Consumer에게 전달된다.

메세지 브로커 / 이벤트 브로커

메세지 브로커 는 데이터를 즉시 처리하고 삭제한다. 생산자(Publisher) 가 데이터를 보내면, 소비자(Consumer) 가 가져갈 수 있도록 큐에 저장한다. 소비자가 데이터를 가져가면 바로 삭제하며, 시스템 간 데이터를 비동기로 주고 받는다. Redis, RabbitMQ, AWS SQS가 있다.

이벤트 브로커 는 데이터를 저장해서 나중에 처리가 가능하다. 메시지 브로커 기능 + 이벤트 저장 기능이으로 이벤트를 삭제하지 않고 저장해두고, 특정 시점에 다시 데이터를 가져올 수 있다. 따라서 장애가 발생해도 복구가 가능하며, 대용량 처리에 강하다. Kafka, AWS Kinesis가 있다.


일반적인 형태의 네트워크

image

보틍은 위와 같이 각 객체가 직접 연결하여 통신한다. 이 방식은 전송 속도가 빠르고 전송 결과를 빠르게 알 수 있지만, 중간 단계가 없기 때문에 문제가 발생하면 시스템 전체에 영향을 끼칠 수 있다. 한 부분에서 장애가 발생 시 전체 시스템에 영향을 미칠 수 있고, 참여하는 개체가 많아질 수록 시스템이 복잡해진다.

이를 극복하고자 나온 것이 아래 모델이다.


Pub/Sub 모델

image

비동기 메세징 전송 방식으로, 발행자(Publisher)는 메시지를 보내, 특정 주제(Topic)에 메시지를 게시한다. 그러면 구독자(Subscriber)는 관심 있는 주제를 구독하여, 해당 주제에 게시된 메시지를 수신한다.

발행자와 구독자가 서로 직접 연결되지 않고, 중간 채널을 통해 메시지를 주고 받는다. 이를 통해 구독자는 발행자가 누구인지 알 필요 없이 원하는 메시지만 수신할 수 있닫.

대표적으로 Kafka, Redis, RabbitMQ가 있다.





카프카의 동작 방식

링크드인에서 개발된 메시지 큐 방식의 분산 메시징 시스템이다.


구성요소

  • Event: Kafka에서 데이터를 주고받는 단위, 즉 메시지를 말한다.
  • Producer: Kafka에 이벤트(메시지)를 게시하는 클라이언트 애플리케이션
  • Consumer: 특정 Topic을 구독하고, 그 안의 이벤트를 가져와 처리하는 클라이언트 애플리케이션
  • Topic : 메시지가 저장되는 카테고리로, Producer는 메시지를 Topic에 게시하고 Consumer는 Topic을 구독해 처리한다.
  • Partition: Topic을 나눠 저장하는 단위로, 여러 Broker에 분산 저장되며 각 Partition은 리더와 팔로워를 갖는다.
  • Zookeeper: Kafka 클러스터의 메타데이터를 관리하고 브로커 간 협조를 지원하는 시스템.
  • Brokers : 클러스터 내 서버 역할로, 메시지를 Topic에 저장하고 Consumer가 읽을 수 있게 제공함.

동작 원리

Producer는 메시지를 생성하고 토픽에 전송한다. 메시지는 토픽의 특정 파티션에 저장되고, 이 파티션은 여러 브로커에 분산되어 저장된다. 브로커는 메시지를 저장하고, 이를 Consumer가 읽을 수 있도록 준비한다.

Consumer는 자신이 구독한 토픽의 메시지를 읽고 처리한다. 각 컨슈머는 메시지를 읽은 후, 마지막 메시지의 위치인 offset을 관리하여 이후 읽을 메시지를 추적한다.

Kafka는 데이터를 분산 저장하는 단위로, 각 파티션은 하나의 리더와 여러 개의 팔로워를 가질 수 있다.

리더 파티션은 데이터를 읽고 쓰는 역할을 담당하며, 팔로워 파티션은 리더의 데이터를 복제하여 장애 발생 시 데이터를 보호한다. 즉, 리더는 쓰기 작업을 담당하고, 팔로워는 읽기 작업을 처리한다.

여러 컨슈머가 하나의 그룹에 묶여, 각 컨슈머가 다른 파티션을 읽는다. 이를 통해 각 메시지가 한 번만 처리되도록 보장할 수 있다.

장단점

Kafka는 대규모 트래픽 처리와 분산 처리에 효과적인 메시징 시스템으로, 클러스터 구성, Fail-over, Replication 기능을 제공한다. 이 시스템은 약 100KB/sec의 빠른 속도로 메시지를 처리할 수 있어 다른 메시지 큐보다 효율적이고, 메시지는 디스크에 저장되어 영속성이 보장되며, 저장된 데이터는 일정 보관 주기 동안 유지되어 데이터 유실 위험이 적다. 또한, Consumer 장애 시 재처리가 가능하여 장애 복구에 유리하다.






티켓팅 대기열 로직 설계

우선 어떤 흐름으로 진행되는지 생각해보자.

1
2
3
4
5
6
7
8
1. 수 천 명의 유저가 경기 좌석 선택 페이지(/ticket-paage)에 진입한다.   
2. 수 천 명의 유저가 대기열에 쌓인다.   
3. 실시간으로 자신의 순번을 확인할 수 있다.   
4. 새로고침 등으로 화면 이탈시 대기열을 빠져나간다.   
5. 다시 좌석 선택 페이지에 접근하면 대기열의 맨 뒤로 이동한다.   
6. 일정시간마다 대기열의 일정사용자들이 페이지에 진입한다.   
7. 내 순번이 앞당겨진다.   
8. 내 차례가 되어 화면에 진입한다.   


요구 사항 분석

1. Kafka 대기열 분산 처리

  • 각 경기별로 토픽을 나누어 각 경기에 대한 대기열을 관리한다.
  • 유저가 대기열에 들어오면 produce하여 유저의 정보를 전송한다.
  • 자신의 순번이 될 때, consume하여 유자가 입장할 수 있도록 한다.
  • 하루에 최대 다섯 개의 경기가 열리며, 각 경기별로 토픽을 관리한다. 하루 최대 5경기라면 5개의 토픽이 생성된다. (예: game_1, game_2, …, game_5)

2. 대기열 관리

  • Redis ZSet을 이용하여 유저를 대기열에 추가하고 우선순위를 결정한다.
  • 유저가 대기열에 진입할 때, userId와 함께 타임스탬프, 우선순위를 설정하여 대기열의 끝에 추가한다.

3. 실시간 순번 업데이트

  • SSE를 통해 실시간으로 자신의 순번을 확인한다.
  • 대기열 상태가 변경되면, 서버가 SSE로 각 유저에게 업데이트를 보낸다.
  • 대기열에서 순번이 앞당겨지면 즉시 클라이언트로 해당 정보가 전송된다.

    SSE를 선택한 이유?
    서버 -> 클라이언트 단방향 클라이언트로 이벤트를 스트리밍할 수 있다.

4. 페이지 이탈 시

  • 새로고침, 브라우저 닫기, 타임아웃 등.
  • SSE 연결 종료 이벤트를 활용하여 Redis에서 유저 제거.
  • 다시 접속하면 대기열의 맨 뒤에 새로 추가한다.

5. 일정 시간마다 사용자 진입

  • 일정 시간 마다 Scheduler을 통해 대기열에서 좌석 선택 페이지로 입장시킨다.
  • 입장 가능한 사용자에게 SSE를 통해 알림을 보낸다.
  • 이 작업은 고정된 간격으로 진행된다.
  • 예: 1초에 100명씩 입장


구성요소

  • Kafka: 대기열을 관리하고 데이터를 순차적으로 처리할 수 있는 기능을 제공한다.
  • Redis: Redis는 대기열의 순번을 관리하기 위한 데이터 저장소 Redis의 Sorted Set(ZSet)을 사용하여 순번을 관리한다.
  • WebSocket/SSE: 실시간 상태 업데이트를 위해 웹소켓(SSE)을 사용하여 사용자에게 대기 상태 및 순번 정보를 실시간으로 전달한다.


대기열 처리 흐름

1. 사용자 요청

  • 사용자가 티켓팅 페이지에 접속하면, 사용자 요청을 받아 대기열에 순차적으로 추가한다.
  • 사용자에게 대기 중임을 알리고, 순번을 Redis의 ZSet에 추가합니다. 순번은 사용자 요청이 들어오는 순서대로 관리된다.

2. 대기열 관리

  • Kafka의 Producer는 각 사용자의 요청을 대기열에 넣음
  • Kafka Consumer는 대기열에 있는 요청을 하나씩 처리한다.

3. 실시간 상태 업데이트

  • edis를 통해 사용자 상태를 업데이트하고, 해당 정보를 WebSocket/SSE를 통해 사용자에게 실시간으로 전달한다.






Kafka Producer 및 Consumer 구현

ProducerConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

Kafka에 메시지를 전송할 수 있는 KafkaTemplate을 생성하고, 메시지의 키와 값을 직렬화하고, 지정된 Kafka 클러스터로 메시지를 전송할 수 있게 한다.

ConsumerConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "ticketing-group");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");


        return new DefaultKafkaConsumerFactory<>(config);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }
}

KafkaProducer

스크린샷 2024-11-29 오후 11 20 50

  • create 메서드는 특정 경기에 해당하는 사용자를 대기열에 추가한다.
  • KafkaTemplate을 사용해 Kafka의 ticketing-queue라는 토픽에 메시지를 보낸다.
  • 메시지를 보내고 나서, 로그를 통해 사용자가 대기열에 추가되었음을 알려준다.

KafkaConsumer

스크린샷 2024-11-29 오후 11 22 00

  • @KafkaListener: ticketing-queue라는 토픽에서 메시지를 받기 위해 사용되며, ticketing_group이라는 소비자 그룹에 속한다.
  • receive(): 메시지를 받으면, ConsumerRecord를 통해 메시지 내용(email)과 오프셋(offset)을 추출하는데, 만약 메시지가 없다면 “대기열이 비어 있습니다.”라는 로그를 출력한다.
  • processUserInQueue(): 메시지의 이메일과 오프셋을 Redis 큐에 추가한다.

스크린샷 2024-11-29 오후 11 23 13

  • 작업 큐에 추가한다.






실시간 대기 순번

실시간으로 자신의 대기 순번을 확인할 수 있어야 한다. Redis와 SSE (Server-Sent Events)를 이용해 예매 대기열에서 유저를 관리하고, 예매 화면으로 진입할 시 해당 유저에게 실시간으로 알림을 보내도록 하였다.

주요 메서드를 살펴보자.

1. sendEvents 스크린샷 2024-11-29 오후 5 20 20

  • @Scheduled(fixedDelay = 1000)에 의해 1초마다 호출된다.
  • enterPageFromQueue(), getQueuePosition()가 차례로 호출된다.


2. enterPageFromQueue

스크린샷 2024-11-29 오후 5 11 26

  • 대기열에 있는 유저들을 예매 화면으로 진입시키는 메서드이다.
  • Redis의 ZSet에서 특정 범위(start ~ end)에 해당하는 유저들을 가져온다.
  • 차례가되면 집입 알림을 전송하고, Redis에서 해당 유저를 제거한다.
  • sendEntryNotificatiton(user.toString())를 호출하여 해당 유저에게 알림을 보낸다.


3. getQueuePosition 스크린샷 2024-11-29 오후 5 20 12

  • Redis에서 대기열에 있는 유저들을 조회하고, 특정 유저의 현재 순번을 찾아 반환하는 메서드이다.


4. sendEntryNotificatiton

스크린샷 2024-11-29 오후 5 11 09

  • 유저에게 예매 화면 진입 알림을 보내는 메서드로 SseEmitter를 통해 실시간 알림을 보내며, 예매 화면으로 진입할 준비가 되었음을 알려준다.
  • sseEmitters.get(email)로 유저의 SseEmitter 객체를 가져와, 해당 유저가 연결되어 있으면 emitter.send()로 실시간 알림을 보낸다.
  • emitter.complete()를 호출하여 알림 전송을 완료하고, sseEmitters.remove(email)로 해당 유저의 SseEmitter 객체를 제거한다.
  • 예외가 발생하면 emitter.completeWithError(e)로 에러를 처리하고, 에러 로그를 출력한다.


5. createEmitter 스크린샷 2024-11-29 오후 5 11 18

  • SseEmitter를 통해 각 유저와 실시간 연결을 유지하며, 알림을 전송한 후 연결을 종료한다.
  • onCompletion()과 onTimeout()은 연결이 완료되거나 타임아웃이 발생했을 때 해당 유저의 SseEmitter 객체를 제거한다.







테스트

스크린샷 2024-11-29 오전 1 41 01 Kafka 메시지 대기열 시스템을 멀티스레드 환경에서 검증하는 테스트로, 여러 사용자가 동시에 대기열에 메시지를 생성해도 Kafka 프로듀서가 정상적으로 작동하는지 확인한다.

100개의 스레드를 생성하여 각 스레드가 Kafka 프로듀서를 통해 특정 게임 ID와 이메일 기반으로 메시지를 대기열에 추가하는 작업을 테스트한다.
각 스레드는 producer.create(gameId, email)을 호출하며, 발생 가능한 예외를 로깅한다. 모든 요청이 정상적으로 처리되었는지는 CountDownLatch를 통해 확인하고, 작업 완료 후 모든 스레드가 정상 종료되었는지도 검증한다.

결과

스크린샷 2024-11-29 오후 10 41 33

스크린샷 2024-11-29 오후 10 41 00

This post is licensed under CC BY 4.0 by the author.