• 7장까지는 ‘최소 한 번’ 전달에 초점을 맞췄다. 하지만 메시지 중복의 가능성은 여전히 있다.
  • 현실에서 사용되는 대부분의 애플리케이션들은 메시지를 읽는 애플리케이션이 중복 메시지를 제거할 수 있도록 메시지에 고유한 식별자를 포함한다.
  • 카프카 ‘정확히 한 번’ 의미 구조는 두 개의 핵심 기능의 조합로 이루어진다.
    • 멱등성 프로듀서: 프로듀서 재시도로 인해 발생하는 중복을 방지한다.
    • 트랜잭션 의미 구조: 스트림 처리 애플리케이션에서 ‘정확히 한 번’ 처리를 보장한다.

멱등성 프로듀서

  • 멱등성: 작업을 여러 번 실행해도 한 번 실행한 것과 결과가 같은 성질
  • 프로듀서에서 메시지가 중복으로 발생하는 일반적인 시나리오
      1. 파티션 리더가 프로듀서로부터 레코드를 받아서 팔로워들에게 성공적으로 복제한다.
      1. 프로듀서에게 응답을 보내기 전, 파티션 리더가 있는 브로커에 크래시가 발생한다.
      1. 프로듀서 입장에서는 응답을 받지 못한 채 타임아웃이 발생하고, 메시지를 재전송한다.
      1. 재전송된 메시지가 새 리더에 도착한다. 하지만 이 메시지는 이미 저장되어 있다.

멱등적 프로듀서의 작동 원리

  • 멱등적 프로듀서 기능을 켜면 모든 메시지는 고유한 프로듀서 ID와 시퀀스 넘버를 가지게 된다.
    • 토픽 + 파티션 + 프로듀서 ID + 시퀀스 넘버를 합치면 고유한 식별자가 된다.
  • 각 브로커는 해당 브로커에 할당된 모든 파티션들에 쓰여진 마지막 5개 메시지들을 추적하기 위해 이 고유 식별자를 사용한다.
    • 파티션 별로 추적되어야 하는 시퀀스 넘버의 수를 제한하고 싶다면 프로듀서의 max.in.flights.requests.per.connection 설정값이 5이하로 잡혀 있어야 한다. (기본값: 5)
  • 브로커가 예전에 받은 적이 있는 메시지를 받게 될 경우, 적절한 에러를 발생시켜 중복 메시지를 거부한다.
    • 이 에러는 프로듀서에 로깅도 되고 지푯값에도 반영되지만, 예외가 발생하는 것은 아니기 때문에 사용자에게 경보를 보내지는 않는다.
      • 프로듀서 클라이언트에서는 record-error-rate 지푯값을 확인함으로써 에러를 확인할 수 있다.
      • 브로커의 경우 RequestMetrics 유형의 ErrorsPerSec 지푯값에 기록된다.
  • 만약 브로커가 예상보다 높은 시퀀스 넘버를 받게 된다면, 브로커는 ‘out of order sequence number’ 에러를 발생시킨다.
    • 하지만 트랜잭션 기능 없이 멱등성 프로듀서만 사용하고 있다면 이 에러는 무시해도 좋다.
    • ‘out of order sequence number’ 에러가 발생한 뒤에도 프로듀서가 정상 작동한다면, 이 에러는 보통 프로듀서와 브로커 사이에 메시지 유실이 있었음을 의미한다. 프로듀서와 브로커 설정을 재검토하고 프로듀서 설정이 고신뢰성을 위해 권장되는 값으로 잡혀 있는지, 아니면 언클린 리더 선출이 발생했는지 여부를 확인해볼 필요가 있다.
  • 각 실패 상황에 따른 멱등적 프로듀서의 동작 방식
    • 상황1: 프로듀서 장애
      • 새 프로듀서를 생성해서 장애가 난 프로듀서를 대체하고, 프로듀서는 초기화 과정에서 카프카 브로커로부터 프로듀서 ID를 생성 받는다.
      • 트랜잭션 기능을 켜지 않았을 경우, 프로듀서는 초기화할 때마다 완전시 새로운 ID가 생성된다.
        • 즉, 프로듀서에 장애가 발생해서 대신 투입된 새 프로듀서가 기존 프로듀서가 이미 전송한 메시지를 다시 전송할 경우 ,브로커는 메시지에 중복이 발생했음을 알아차리지 못한다.
    • 상황2: 브로커 장애
      • 컨트롤러는 장애가 난 브로커가 리더를 맡고 있었던 파티션들에 대해 새 리더를 선출한다. 새로 선출된 브로커도 최근 5개의 시퀀스 넘버를 가지고 있다.
        • 이유: 리더는 새 메시지가 쓰여질 때마다 인-메모리 프로듀서 상태에 저장된 최근 5개의 시퀀스 넘버를 업데이트하고, 팔로워 레플리카는 리더로부터 새로우 메시지를 복제할 때마다 자체적인 인-메모리 버퍼를 업데이트한다.
      • 추가 상황: 예전 리더가 다시 돌아온다면?
        • 현재 리더로부터 복제한 레코드를 사용해서 프로듀서 상태를 업데이트함으로써 최신 상태를 복구한다.

멱등적 프로듀서의 한계

  • 카프카의 멱등적 프로듀서는 프로듀서의 내부 로직으로 인한 재시도가 발생할 경우 생기는 중복만을 방지한다.
    • 예시: 동일한 메시지를 가지고 producer.send()를 두 번 호출함녀 멱등적 프로듀서가 개입하지 않는 만큼 중복된 메시지가 생기게 된다.
  • 여러 개의 인스턴스를 띄우거나 하나의 인스턴스에서 여러 개의 프로듀서를 띄우는 애플리케이션들 역시 흔하다. 만약 이러한 프로듀서들 중 두 개가 동일한 메시지를 전송하려 시도할 경우, 멱등적 프로듀서는 중복을 잡아내지 못한다.

멱등적 프로듀서 사용법

  • enable.idempotence=true를 추가해주면 끝이다.
  • 만약 프로듀서에 acks=all 설정이 이미 잡혀 있다면, 성능에는 차이가 없을 것이다.
  • 멱등적 프로듀서 기능을 활성화시키면 다음과 같은 것들이 바뀐다.
    • 프로듀서 ID를 받아오기 위해 프로듀서 시동 과정에서 API를 하나 더 호출한다.
    • 전송되는 각각의 레코드 배치에는 프로듀서 ID와 배치 내 첫 메시지의 시퀀스 넘버가 포함된다. 이 새 필드들은 각메시지 배치에 96비트를 추가한다. 따라서 대부분의 경우는 작업부하에 어떠한 오버헤드도 되지 않는다.
    • 브로커들은 모든 프로듀서 인스턴스에서 들어오는 레코드 배치의 시퀀스 넘버를 검증해서 메시지 중복을 방지한다.
    • 장애가 발생하더라도 각 파티션에 쓰여지는 메시지들의 순서는 보장된다.

트랜잭션

  • 트랜잭션 기능은 카프카 스트림즈를 사용해서 개발된 애플리케이션에 정확성을 보장하기 위해 도입되었다.

트랜잭션 활용 사례

  • 트랜잭션은 정확성이 중요한 스트림 처리 애플리케이션이라면 도움이 되며, 스트림 처리 로직에 집적이나 조인이 포함되어 있는 경우 특히 그렇다.

트랜잭션이 해결하는 문제

  • 단순 스트림 처리 애플리케이션은 원본 토픽으로부터 이벤트를 읽어서, 처리한 다음, 결과를 다른 토픽에 쓴다.
  • 스트림 처리에는 여러 가지 문제가 발생할 수 있는데 아래는 대표 예 2가지다.
  • 예시1: 애플리케이션 크래시로 인한 재처리
    • 배경 지식: 원본 클러스터로부터 메시지를 읽어서 처리한 뒤, 애플리케이션은 하는 두 가지 일
      • 결과를 출력 토픽에 쓰는 것
      • 우리가 읽어 온 메시지의 오프셋을 커밋하는 것
    • 결과: 만약 출력 토픽에는 이미 썼는데 입력 오프셋은 커밋되기 전에 애플리케이션이 크래시 나면, 레코드에 중복이 발생할 수 있다.
  • 예시2: 좀비 애플리케이션에 의해 발생하는 재처리
    • 상황 설명: 애플리케이션이 카프카로부터 레코드 배치를 읽어온 직후 뭔가를 하기전에 멈추거나, 카프카로의 연결이 끊어진 경우
    • 결과:
      • 하트비트가 끊어지면서 애플리케이션이 죽은 것으로 간주되고, 컨슈머 그룹 내 다른 컨슈머들에게 재할당될 것이다.
      • 파티션을 재할당 받은 컨슈머가 레코드 배치를 다시 읽어서 처리하고, 출력 토픽에 결과를 쓰고 잡업을 계속한다.
    • 추가 상황: 그사이 애플리케이션의 첫 번째 인스턴스가 다시 작동한다면?
      • 이러한 상태의 컨슈머를 좀비 라고 부르고, 추가적인 보장이 없을 경우, 좀비는 출력 토픽으로 데이터를 쓸 수 있어 중복된 겨로가가 발생할 수 있다.

트랜잭션은 어떻게 ‘정확히 한 번’을 보장하는가?

  • 읽어 온 원본 메시지의 오프셋이 커밋되고 결과가 성공적으로 쓰여지거나, 아니면 둘 다 안 일어나거나, 우리는 부분적인 결과가 결코 발생하지 않을 거라는 보장이 필요하다.
  • 이러한 작동을 지원하기 위해, 카프카 트랜잭션은 원자적 다수 파티션 쓰기(atomic multipartition write) 기능을 도입했다.
    • 이 아이디어는 오프셋을 커밋하는 것과 결과를 쓰는 것은 둘 다 파티션에 메시지를 쓰는 과정을 수반한다는 점에 착안했다.
      • 결과는 출력 토픽에, 오프셋은 _consumer_offsets 토픽에
  • 트랜잭션을 사용해서 원자적 다수 파티션 쓰기를 수행하려면 ‘트랜잭션적 프로듀서’를 사용해야 한다.
    • 보통 프로듀서와 차이점: transactoinal.id 설정이 잡혀있고, initTransactions()를 호출해서 초기화해준다.
    • producer.id는 브로커에 의해 자동으로 생성되지만, transactional.id는 프로듀서 설정의 일부이며, 재시작하더라도 값이 유지된다.
    • transactional.id의 주 용도: 프로듀서가 재시작 후에도 동일한 프로듀서를 식별하는 것
      • 동일한 프로듀서로 식별해주는 이유: 브로커에서 transactional.id 에서 producer.id 로의 대응 관계를 가지고 있다가, 만약 이미 있는 transactional.id 프로듀서가 initTransactions()를 호출하면 새로운 랜덤값이 아닌 producer.id 값을 할당해 준다.
    • 주의점: 메시지를 쓰고나서 커밋하기 전에 다른 애플리케이션이 응답하기를 기다리는 패턴은 반드시 피해야 한다.
      • 다른 애플리케이션은 트래잭션이 커밋될 때까지 메시지를 받지 못할 것이기 때문에 결과적으로 데드락이 발생한다.
  • 좀비 펜싱: 좀비 애플리케이션이 출력 스트림에 중복된 결과를 쓰는 것을 방지하는 것
    • 구현 방법: 에포크(epoch)를 사용하는 방식
      • 브로커는 트랜잭션적 프로듀서가 초기화를 위해 initTransaction()을 호출하면 transactional.id에 해당하는 에포크 값을 증가시킨다.
      • 같은 transactional.id를 가지고 있지만 에포크 값은 낮은 프로듀서가 메시지 전송, 트랜잭션 커밋, 트랜잭션 중단 요청을 보낼 경우 FencedProducer 에러가 발생하면서 거부된다.
  • 컨슈머에 올바른 격리 수준을 설정하지 않을 경우, 트랜잭션적 프로듀서로 메시지를 써도 ‘정확히 한 번’이 보장되지 않는다.
    • 이유: 트랜잭션 기능을 사용해서 쓰여진 레코드는 비록 결과적으로 중단된 트랜잭션에 속할지라도 다른 레코드들과 마찬가지로 파티션에 쓰여진다.
    • 격리 수준 설정 방법: isolation.level 설정
      • read_committed: consumer.poll()을 호출하면 커밋된 트랜잭션에 속한 메시지나 처음부터 트랜잭션에 속하지 않는 메시지만 리턴된다.
      • read_uncommitted(기본값): 중단된 트랜잭션에 속하는 것들 포함, 모든 레코드가 리턴된다.
    • read_committed로 설정한다고 해서 특정 트랜잭션에 속한 모든 메시지가 리턴된다고 보장되는 것도 아니다.
      • 트랜잭션에 속하는 노픽의 일부만 구독했기 때문에 일부 메시지만 리턴받을 수도 있다.
  • 메시지의 읽는 순서를 보장하기 위해 read_committed 모드에서는 아직 진행중인 트랜잭션이 처음으로 시작된 시점 이후에 쓰여진 메시지는 읽지 않는다.
    • 이 메시지들은 트랜잭션이 프로듀서에 의해 커밋되거나 중단될 때까지, 혹은 transaction.time.ms 설정값(기본값: 15분)만큼 시간이 지나 브로커가 트랜잭션을 중단시킬 때까지 보류된다.
    • 이렇게 트랜잭션이 오랫동안 닫히지 않고 있으면 컨슈머들이 지체되면서 종단 지연이 길어진다.

트랜잭션으로 해결할 수 없는 문제들

  • 트랜잭션 기능과 관련해서 자주 하는 실수 2가지
    • 카프카에 대한 쓰기 이외의 작동에서도 보장된다고 착각한다.
    • 컨슈머가 항상 전체 트랜잭션을 읽어 온다고 착각한다.
  • 카프카의 트랜잭션 기능이 ‘정확히 한 번’ 보장에 도움이 되지 않은 경우
    • 스트림 처리에 있어서 사이드 이펙트: 카프카 외 사이드 이펙트를 일으키는 작업(REST API 호출, 파일 쓰기)
    • 카프카 토픽에서 읽어서 데이터베이스에 쓰는 경우
      • 상황: 레코드는 JDBC와 같은 데이터베이스 드라이버를 통해 데이터베이스에 쓰여지고, 오프셋은 컨슈머에 의해 카프카에 커밋된다.
      • 문제가 되는 이유: 하나의 트랜잭션에서 외부 데이터베이스에 결과를 쓰고 카프카에는 오프셋을 커밋할 수 있도록 해주는 메커니즘 같은 건 없다.
      • 보완 방법: 아웃 박스 패턴
        • ‘아웃박스’라고 불리는 카프카 토픽에 메시지를 쓰는 작업까지만 하고, 별도의 메시지 중계 서비스가 카프카로부터 메시지를 읽어와서 데이터베이스를 업데이트한다.
        • 아웃박스로부터 컨슘해서 데이터베이스에 업데이트하는 작업은 멱등적이어야 한다.
        • 아웃박스는 카프카 토픽 대신에 데이터베이스 테이블을 사용할 수도 있다.
    • 데이터베이스에서 읽어서, 카프카에 쓰고, 여기서 다시 다른 데이터베이스에 쓰는 경우
      • 카프카 트랜잭션은 이러한 종류의 종단 보장에 필요한 기능을 가지고 있지 않다.
      • 문제가 되는 이유:
        • 컨슈머가 일부 토픽에서 랙이 발생했을 때 이미 데이터베이스에는 커밋된 레코드들을 모두 봤을 거라는 보장이 없다.
        • 트랜잭션의 경계도 알 수 있는 방법이 없기 때문에 언제 트랜잭션이 시작되었는지, 끝났는지, 레코드 중 어느 정도를 읽었는지도 알 수 없다.
    • 한 클러스터에서 다른 클러스터로 데이터 복제
      • 하나의 카프카 클러스터에 다른 클러스터로 데이터를 복사할 때 ‘정확히 한 번’을 보장할 수 있다.
        • 가능한 이유: 미러메이커 2.0에 ‘정확히 한 번’ 기능을 추가하는 KIP-656에서 볼 수 있다.
      • 하지만 이것은 트랜잭션의 원자성을 보장하지 않는다.
    • 발행/구독 패턴
      • 원인: 컨슈머들은 메시지를 한 번 이상 처리할 수 있다.

트랜잭션 사용법

  • 트랜잭션 기능을 사용하는 가장 일반적이고도 권장되는 방법은 카프카 스트림즈에서 exactly-once 보장을 활성화하는 것이다.
    • 이렇게 하면 트랜잭션 기능을 직접적으로 사용할 일은 전혀 없지만, 카프카 스트림즈가 대신 해당 기능을 사용해서 우리가 필요로 하는 보장을 제공해 준다.
    • 카프카 스트림즈 애플리케이션에서 ‘정확히 한 번’ 보장 기능을 활성화하려면 그냥 processing.guarantee 설정을 exactly_once이나 exactly_once_beta로 잡아주면 된다.
  • 카프카 스트림즈를 사용하지 않고 ‘정확히 한 번’ 보장을 사용하고 싶다면 트랜잭션 API를 직접 사용한다.

트랜잭션 ID와 팬싱

  • 카프카 2.5 이전
    • 트랜잭션 상태 토픽(__transaction_state)이 있는데, 트랜잭션 ID를 기준으로 해시값을 구해서 한 파티션에만 기록되도록 동작한다.
      • 예시: 트랜잭션 “tx-123"은 파티션 5에만 기록된다.
    • 트랜잭션의 상태를 변경할 때, 펜싱이 가능하다.
    • 문제점: 이 방법은 트랜잭션 상태를 기록할 때만 펜싱이 동작하고, offset 커밋 등에서는 좀비 프로듀서가 메시지를 생성할 수 있는 구멍이 존재한다.
  • 카프카 2.5 이후
    • offset 커밋 등의 동작을 할 때, 아래의 정보를 브로커에 함께 넘긴다.
        1. 트랜잭션 ID
        1. 컨슈머 그룹 ID, 제네레이션 번호, 인스턴스 ID
    • 이전 제네레이션의 컨슈머 그룹에서 온 호출은 무시되어서, 좀비 프로듀서의 동작들에 대해서 펜싱 가능하다.
      • 메시지 보내기
      • 오프셋 커밋
      • 메시지 커밋

트랜잭션의 작동 원리

  • 카프카 트랜잭션 기능의 기본적인 알고리즘은 찬디-램포드 스냅샷 알고리즘의 영향을 받았다.
    • 이 알고리즘은 통신 채널을 통해 ‘마커’라 불리는 컨트롤 메시지를 보내고, 이 마커의 돡을 기준으로 일관적인 상태를 결정한다.
  • 프로듀서가 트랜잭션을 커밋하기 위해 트랜잭션 코디네이터에 ‘커밋’ 메시지를 보내면 트랜잭션 코디네이터가 트랜잭션에 관련된 모든 파티션에 커밋 마커를쓴다.
    • 문제 상황: 일부 파티션에만 커밋 메시지가 쓰여진 상태에서 프로듀서가 크래시 나면 어떻게 될까?
      • 카프카 트랜잭션은 2단계 커밋과 트랜잭션 로그를 사용해서 이 문제를 해결한다.
  • 트랜잭션 커밋 알고리즘의 과정
      1. 현재 진행중인 트랜잭션이 존재함을 로그에 기록한다. 연관된 파티션 역시 함께 기록한다.
      1. 로그에 커밋 혹은 중단 시도를 기록한다.
      1. 모든 파티션에 트랜잭션 마커를 쓴다.
      1. 트랜잭션이 종료되었음을 로그에 쓴다.
  • 트랜잭션 로그는 __transactoin_state라는 내부 토픽에 기록한다.
  • 각 트랜잭션 ID의 트랜잭션 코디네이터는 트랜잭션 ID에 해당하는 트랜잭션 로그 파티션의 리더 브로커가 맡는다.
  • beginTransaction()을 호출하면 브로커에 있는 트랜잭션 코디네이터는 트랜잭션이 시작되었다는 사실을 모른다.
    • 하지만 프로듀서가 레코드 전송을 시작하면, 프로듀서는 새로운 파티션으로 레코드를 전송하게 될 때마다 브로커에 AddPartitionToTxn 요청을 보냄으로써 현재 이 프로듀서에 진행중인 트랜잭션이 있으며 레코드가 추가되는 파티션들이 트랜잭션의 일부임을 알린다.
    • 이 정보는 트랜잭션 로그에 기록된다.
  • 오프셋 커밋을 위해 sendOffsetsToTrnasaction()를 호출하면 트랜잭션 코디네이터로 오프셋과 컨슈머 그룹 ID가 포함된 요청이 전송된다.
    • 트랜잭션 코디네이터는 컨슈머 그룹 ID를 사용해서 컨슈머 그룹 코디네이터를 찾은 뒤, 컨슈머 그룹이 보통 하는 것과 같은 방식으로 오프셋을 커밋한다.
  • 트랜잭션 커밋 또는 중단을 위해 commitTransaction()abortTransaction()를 호출하면 트랜잭션 코디네이터에 ExdTxn 요청이 전송된다.
      1. 트랜잭션 코디네이터는 트랜잭션 로그에 커밋 혹은 중단 시도를 기록한다.
      1. 트랜잭션 코디네이터는 우선 트랜잭션에 포함된 모든 파티션에 커밋 마커를 쓴 다음 트랜잭션 로그에 커밋이 성공적으로 완료되었음을 기록해 넣는다.
      1. 만약 커밋 시도는 로그에 기록되었지만 전체 과정이 완료되기 전에 트랜잭션 코디네이터가 종료되거나 크래시날 경우, 새로운 트랜잭션 코디네이터가 선출되어 트랜잭션 로그에 대한 커밋 작업을 대신 마무리 짓는다.
  • 만약 트랜잭션이 transactoin.timeout.ms에 설정된 시간 내에 커밋되지도, 중단되지도 않는다면, 트랜잭션 코디네이터는 보통 자동으로 트랜잭션을 종료한다.

트랜잭션 성능

  • 트랜잭션은 프로듀서에 약간의 오버헤드를 발생시킨다.
    • 이유: 트랜잭션 초기화와 커밋 요청은 동기적으로 작동하기 때문에 성공적으로 완료되거나, 실패하거나, 타임아웃되거나 할 때까지 어떤 데이터도 전송되지 않는다.
  • 하지만 프로듀서에 있어서 오버헤드는 트랜잭션에 포함된 메시지의 수와 무관하다.
    • 이유: 트랜잭션마다 많은 수의 메시지를 집어넣는 쪽이 상대적으로 오버헤드가 적을 뿐 아니라 동기적으로 실행되는 단계의 수도 줄어든다. 오히려 결과적으로 전체 처리량이 올라간다.
  • 컨슈머에 대해서는, 커밋 마커를 읽어오는 작업에 있어서 종단 지연이 길어지는 현상이 있다.
    • 이유: read_committed 모드 컨슈머에서는 아직 완료되지 않은 트랜잭션의 레코드들이 리턴되지 않는다. 트랜잭션 커밋 사이의 간격이 길어질수록 컨슈머는 메시지가 리턴될 때까지 더 오랫동안 기다린다.
  • 컨슈머가 아직 완료되지 않은 트랜잭션에 속하는 메시지들을 버퍼링하지는 않는다.
    • 이유: 브로커는 컨슈머가 보낸 읽기 요청을 받는다고 해서 이 메시지들을 리턴하지 않는다.