분산 시스템에서 데이터 처리(Queue, CDC, Outbox Pattern...) 본문

백앤드 개발일지/웹, 백앤드

분산 시스템에서 데이터 처리(Queue, CDC, Outbox Pattern...)

giron 2023. 9. 23. 17:43
728x90

일반적으로 분산 시스템에서 메시지를 주고받는 방법으로는 크게 2가지가 있다.

  1. API를 통한 통신
    1. 즉각적인 요청과 응답을 주고받는다.
    2. 간단한 개발 
  2. 메시지 큐를 통한 통신
    1. 비동기, 배치 처리와 함께 적용하기 좋다.
    2. 일반적으로 publisher가 데이터를 큐에 넣으면 consumer가 큐에서 데이터를 꺼내서 데이터를 가공한다.
    3. 복잡한 개발

분산 시스템에서는 모든 데이터가 네트워크를 타면서 이동하므로 지연, 유실 등의 문제가 발생할 수 있다. 따라서 아래의 3가지 방식을 통해 데이터 전달을 보장하는 방법이 있다.

데이터 전달 방법

1. At most once

  1. producer가 최대 한 번만 송신하고 consumer가 최대 한 번만 수신한다.
  2. 간단한 구현 & 개발이지만 데이터 유실 가능성이 높다.
  3. 대용량 메세지 전송할 때 편하다.

2. At least once

  1. 잘 받았다는 응답이 올때까지 메시지를 보낸다: Ack
  2. 장점은 producer가 메시지 발송을 보장한다.
  3. 단점으로는 consumer가 멱등성을 보장하도록 코딩해야 한다.
  4. 발신 메시지 상태 관리 필요
    1. consumer에서 잘 받았는지 확인하기 위함

3. Exactly once

  1. 메시지 유실과 중복이 없다.
  2. 어려운 구현 난이도 & 메시지 큐에 의존적
  3. 발신 & 수신 메시지 상태 관리 필요

따라서 비용대비 효율이 좋은 At least once를 보장하는 것을 선호하는 것 같다. 1번은 유실 가능성이 너무 크므로 신뢰성 있는 데이터를 보장하고자 한다면 피하는 것이 좋을 것 같고 3번은 최선이긴 하지만 비용이 너무 크다는 단점이 있기 때문이다.

일반적인 분산환경(MSA)

서비스별 분산 환경

크게 3가지 방법이 있다.

  1. RDB를 사용한 방법 (api)
  2. rabbit mq를 사용한 방법 (queue)
  3. kafka를 사용한 방법 (queue)

1. RDB를 사용한 방법 (Transactional Outbox & Polling Pattern)

일반적으로 @TransactionalEventListener을 통해서 DB 커밋 후, rest api로 발송할 수 있다. 이러면 db에 save 되는 것을 확신하고 api를 쏘기 때문에 걱정이 없을 수 있다. 하지만 아래처럼 api에서 실패가 발생할 수 있다.

문제 상황 1

이렇게 되면 DB에는 저장되지만 api발송에는 실패하게 되므로 데이터 정합성에 문제가 발생한다. 그러면 어떻게 처리해야 할까?

 

단순하게 아래의 사진처럼 @Retry(3번 시도하고, 간격마다 100ms 시간을 delay 함)로 해결할 수 있다.

retry

그런데 이러한 방법은 말 그대로 여러 번 시도하는 것이지 근본적인 해결은 되지 못한다. 예를 들면 외부 시스템의 장애로 인해 재시도한 것이 무의미할 수도 있기 때문이다. 또한 외부 시스템의 문제가 내부 시스템의 지연 및 장애로 전파될 수 있다.

문제 상황2

그렇다면 또 어떻게 처리할까?

Transactional Outbox Pattern과 Polling Publisher Pattern을 함께 사용하면 해결할 수 있다고 한다. 해당 방식은 고전적인 2PC를 사용하지 않는 방식이다. 간단하게 DB의 데이터를 주기적으로 읽고 publish 하는 패턴이라고 볼 수 있다.

 

Transactional Outbox Pattern

transactional outbox pattern

Outbox는 메일 송신함과 같다. 애플리케이션은 데이터베이스의 outbox 테이블에 메시지 내용을 저장한다. 그리고 다른 애플리케이션이나 프로세스는 outbox 테이블에서 데이터를 읽고 해당 데이터를 사용하여 작업을 수행할 수 있다.

코드 예시

여기서 outbox디비의 역할을 eventRepository가 한다고 생각하면 된다. 여기서 @Transactional로 묶여있으므로 task가 만약 실패하더라도 outbox에 event가 저장되지 않고 롤백되므로 원자성을 보장한다.

Polling Publisher Pattern

polling
polling

매 분 0초부터 5초 간격으로 작업 Event 상태가 READY인 객체를 찾아서 restTemplate으로 이벤트를 발송하고 event를 발송했다는 상태로 표시해 주는 로직이다.

 

아래는 DB 테이블을 설계할 때, 넣으면 좋은 컬럼들을 추천해 주신 것이라고 한다

outbox column

 

다시 본문으로 돌아와서, 이러한 Transactional Outbox Pattern과 Polling Publisher Pattern을 통해 At least once를 보장함으로써 restAPI에서 데이터 유실 문제를 해결할 수 있다. 

하지만 단점으로는 아래와 같다.

  • polling후 publisher 과정에 의한 지연 처리
  • 데이터베이스 부하 & 디비 비례한 처리속도

또한 At least once에서 항상 멱등한 처리를 해줘야하기에 아래와 같은 멱등 방법도 고려해줘야 한다.

  1. 메시지의 고유 식별자를 이용해 이미 처리한 메시지는 다시 처리하지 않도록 하기
  2. 메시지를 수신할 때마다 상품의 현재 상태를 확인함으로써 여러 번 처리해도 같은 결과가 나오도록 하기

1-1. 트랜잭션 로그 테일링 패턴(Transaction log tailing Pattern)

로그 테일링

해당 방식은 DB 트랜잭션 로그(커밋 로그)를 테일링(tailing)하는 방법이다. 애플리케이션에서 커밋된 업데이트는 각 DB의 트랜잭션 로그 항목(log entry, 로그 엔트리)으로 남는다. 트랜잭션 로그 마이너(transaction log miner)로 트랜잭션 로그를 읽어 변경분을 하나씩 메시지로 메시지 브로커에 발행하는 방법이다. (aws cloud trailing처럼 모든 활동을 읽는다고 생각했습니다.)

 

RDBMS의 outbox 테이블에 출력된 메시지 또는 NoSQL DB에 레코드에 추가된 메시지를 발행할 수 있다.

mysql의 경우 mysqlbinlog, PostgreSQL WAL, Oracle redolog 등을 활용하여 변경사항을 읽어서 구현할 수 있지만 구현 난이도가 높아 관련 툴을 사용하는 경우가 많다고 한다. 관련 툴은 디비지움(Debezium), 링크드인 데이터 버스(LinkdIn Databus), DynamoDB 스트림즈, 이벤추에이트 트램 등이 있다.

장점으로는 2PC를 사용하지 않고 따로 메시지를 위한 로직도 필요하지 않아서 장점만 있어 보였다. 하지만 단점은 아래와 같다.

  1. 변경 로그만 존재하므로 어떤 이벤트에 의해 발행되었는지 알 수 없다.
  2. 데이터베이스별 솔루션이 필요하다.(추상화 X)
  3. 멱등성을 보장해야 함

이러한 로그를 읽고 처리하는 것을 CDC(Change Data Capture)라고 한다. 

2. Rabbit MQ를 사용한 방법

ack를 메시지 처리 응답 메커니즘

  • producer confirm - producer에서 메시지 잘 넣었을 때
  • consumer Ack - consumer가 응답 잘 보냈을 때

큐 구조

2-1. Producer Confirm

만약 라우팅에 실패한다면? exchange가 Ack or NACK를 응답함으로써 확인할 수 있다.

producer confirm

Producer Confirm을 어떻게 확인하는가?

Producer Confirm을 확인하는 방법은 CorrelationData.java로 확인할 수 있다. rabbitTemplate을 사용하여 메시지를 발행할 때, CorrelationData를 같이 넘겨서 확인할 수 있다.

예시

메시지를 발행 시점에, CorrelationData에 UUID를 넣으면 메시지 확인 가능하다. 콜백을 받는 방법은 아래와 같다.

  1. ack = boolean으로 성공, 실패 확인
  2. cause = 실패일 때 원인을 알 수 있다.
  3. 콜백 메서드라 실시간 대응은 어렵지만, 어떤 메시지가 실패했는지 알 수 있다.

설정도 따로 해줘야 한다. 스프링부트를 사용한다면 아래처럼 설정한다.

  1. 스프링부트
  2. 바닐라 스프링 사용할 때

설정

2-2. Consumer Acknowledge

Consumer Acknowledge

Consumer Acknowledge을 어떻게 확인하는가?

Consumer Acknowledge을 확인하는 방법은Channel.java로 확인할 수 있다.

 

최신 RabbitMQ버전부터는 자동으로 ack를 지원을 해준다. 따라서 아래와 같은 방식을 적용하면 ack를 두 번 보내서 - unknown delivery tag 1, 이와 같은 에러를 계속 마주할 것이다..

@RabbitListener 사용

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual # 수동으로 동작하도록 한다.

Auto를 사용하지 않고 수동으로 적용하는 이유?

일반적으로 Auto인 경우는 queue에서 메시지를 받자마자 ack를 보낸다. ack를 받은 큐는 데이터를 바로 삭제한다. 하지만 우리가 메시지를 처리하는 시간에 예외가 발생할때, Auto이면 이미 ack를 보내서 데이터가 유실이 된다. 따라서 메시지로부터 받은 데이터가 정상 처리 되었는지 확인하고 Ack를 보내도록 manual한 설정을 가져가서 수동으로 보내는 것이다. 이외에도 아래 3가지로 정리할 수 있다.

  1. 예외가 발생한 경우에도 메시지를 수동으로 NACK 표시하여 다시 큐로 반환하거나, 재처리를 요청하도록 커스텀할 수 있다.
  2. 메시지 처리 시간이 작업을 수행할 때는 바로 ACK 보내기 전에 메시지를 처리하고 완료 여부에 따라 ACK 또는 NACK 전송할 있다.
  3. 메시지 처리 순서를 제어해야 하는 경우, 일부 메시지를 우선적으로 처리하고 나머지 메시지를 보류해야 하는 경우, ACK 보내기 전에 메시지를 선택적으로 처리하고 ACK 전송할 있다.

계속해서 NACK가 온 경우

계속해서 NACK응답이 오면 큐가 쌓일 것이다. 이럴 땐, Dead Letter Queue로 넘겨서 처리할 수 있다.

  1. reQueue가 fulls고 basic.NACK 나 basic.reject로 처리하는 경우
  2. Queue에 메시지가 ttl을 넘어서 오래 있는 경우
  3. Queue가 가득 찼을 때

Dead Letter Queue

Retry 후 DeadLetter로 이동

예시 코드

 

  • 최대 3번 실행 후
  • retry간격으로 1000ms, 2의 배수로 최대 간격은 2000ms
  • 원래 있던 requeue에 계속하지 말고 DeadLetter로 빠지도록 한다.

RabbitListener로 만든 deadLetter

3. Kafka를 사용한 방법

3-1. Producer Confirm

예시 코드

ListenableFuture에 2개의 콜백 가능, 첫 번째는 성공 콜백, 두 번째는 실패했을 때 콜백이다.

3-2. Consumer Acknowledge

예시 코드
onMessage 예시 코드

2번째 인자로 받은 acknowledgment.acknowledge()를 호출하면 성공한 Consumer ACK이 나간다. 아래의 설정값도 반드시 필요하다.

설정 필수

분산 시스템 환경에서 메시지는 땔 수 없는 선택지인 것 같다. 성능에 따라 최대 한번 or 최소 한번으로 설정하면서 분산 환경에서 효율적으로 데이터를 송수신하도록 해야겠다.

 

 

*Scheduled cron

참고로 아래는 scheduled의 cron 표현식을 사용하여 스케줄링을 정의하는 방법입니다. 이 표현식은 초, 분, 시간, 일, 월, 요일 순서로 구성되며, 각각에 해당하는 위치에 어떤 동작을 수행할지를 지정한다.

  • 초 (0-59)
  • 분 (0-59)
  • 시간 (0-23)
  • 일 (1-31)
  • 월 (1-12 또는 JAN-DEC)
  • 요일 (0-7 또는 SUN-SAT, 0과 7은 일요일)
  • */5 * * * * *: 5초 간격으로 실행
  • 0 0 * * * *: 매 시간 정각에 실행
  • 0 0 12 * * ?: 매일 정오(12시)에 실행
  • 0 15 10 ? * *: 매일 10:15에 실행
  • 0 15 10 * * ?: 매일 10:15에 실행
  • 0 15 10 * * ? 2019: 2019년에 매일 10:15에 실행

cron 이외에 @Scheduled(fixedRate=1000) // 1초마다 실행이라는 설정도 있다.

Reference

https://microservices.io/patterns/data/transactional-outbox.html

 

Microservices Pattern: Transactional outbox

First, write the message/event to a database OUTBOX table as part of the transaction that updates business objects, and then publish it to a message broker.

microservices.io

https://blog.gangnamunni.com/post/transactional-outbox/

 

분산 시스템에서 메시지 안전하게 다루기

Transactional Outbox Pattern을 이용한 결과적 일관성 확보 by 강남언니 블로그

blog.gangnamunni.com

https://youtu.be/uk5fRLUsBfk?si=Pz2QeBCben4BQcEH 

 

https://www.youtube.com/watch?v=VIbMOSciFhg 

https://stackoverflow.com/questions/63953773/rabbitmq-does-basic-ack-also-needs-to-be-confirmed-by-the-server

728x90
Comments