프로듀서 개요# 카프카에 메시지를 써야 하는 상황에 따라 요구사항이 다양하다.메시지 유실이 용납되지 않는지 중복이 허용되도 상관없는지 반드시 지켜야할 지연이나 처리율이 있는지 이처럼 서로 다른 요구 조건은 카프카에 메시지를 쓰기 위해 프로듀서 API를 사용하는 방식과 설정에 영향을 미친다. 프로듀서 메시지 전송 과정ProducerRecord
객체를 생성한다. 여기서는 레코드가 저장될 토픽과 밸류는 필수사항이지만, 키와 파티션 지정은 선택사항이다.ProducerRecord
를 전송하는 API를 호출했을 때, 키와 값 객체가 네트워크 상에 전송될 수 있도록 직렬화해서 바이트 배열로 변환한다.만약 파티션을 명시적으로 지정하지 안핬다면 해당 데이터를 파티셔너에게로 보낸다. 파티셔너는 파티션을 결정하는 역할을 하는데, 그 기준은 보통 ProducerRecord
객체의 키의 값이다. 파티션이 결정되어 메시지가 전송될 토픽과 파티션이 확장되면 프로듀서는 이 레코드를 같은 토픽 파티션으로 전송될 레코드들을 모은 레코드 배치에 추가한다. 그러면 별도의 스레드가 이 레코드 배치를 적절한 카프카 브로커에게 전송한다. 메시지가 성공적으로 전송되었을 경우 브로커는 토픽, 파티션, 그리고 해당 파니션 안에서의 레코드의 오프셋을 담은 RecordMetadata
객체를 리턴한다. 메시지가 저장에 실패했을 경우에는 에러가 리턴되고, 에러를 수신하면 메시지 쓰기를 포기하고 사용자에게 에러를 리턴하기 전까지 몇 번 더 재전송을 시도할 수 있다. 카프카 프로듀서 생성하기# 카프카 프로듀서의 3개의 필수 속성값bootstrap.servers
카프카 클러스터와 첫 연결을 생성하기 위해 프로듀서가 사용할 브로커의 host:port 목록 이 값에 모든 브로커를 포함할 필요는 없는데, 프로듀서가 첫 연결을 생성한 뒤 추가 정보를 받아오게 되어 있기 때문이다. 다만 브로커 중 하나가 작동을 정지하는 경우에도 프로듀서가 클러스터에 연결할 수 있도록 최소 2개 이상을 지정할 것을 권장한다. key.serializer
카프카에 쓸 레코드의 키의 값을직렬화하기 위해 사용하는 시리얼라이저 클래스 key.serializer
에는 org.apache.kafka.common.serialization.Serializer
인터페이스를 구현하는 클래스의 이름을 지정되어야 한다.카프카의 client 패키지에는 ByteArraySerializer
, StringSerializer
, IntegerSerializer
등등이 포함되어 있으므로 자주 사용되는 타입을 사용할 경우 시리얼라이저를 직접 구현할 필요는 없다. 키값 없이밸류값만 보낼 때도 key.serializer
설정을 해 줘야 하지만, VoidSerialzier
를 새용해서 키 타입으로 Void 타입을 설정할 수 있다. value.serialzier
카프카에 레코드의 밸류값을 직렬화하기 위해 사용하는 시리얼라이저 클래스의 이름이다. 프로듀서 생성 예시 코드 메시지 전송 방법 3가지파이어 앤 포겟(fire and forget)메시지를 서버에 전송만 하고 성공 혹은 실패 여부에는 신경 쓰지 않는다. 카프카가 가용성이 높고 프로듀서는 자동으로 전송 실패한 메시지를 재전송 시도하기 때문에 대부분의 경우 메시지는 성공적으로 전달된다. 다만, 재시도를 할 수 없는 에러가 발생하거나 타임아웃이 발생했을 경우 메시지는 유실되며 애플리케잇녀은 여기에 대해 아무런 정보나 예외를 전달받지 않게 된다. 동기적 전송(synchronous send)카프카 프로듀서는 언제나 비동기적으로 작동한다. 즉, 메시지를 보내면 send()
메서드는 Future
객체를 리턴한다. 하지만 다음 메시지를 전송하기 전 get()
메서드를 호출해서 작업이 완료될 때까지 기다렸다가 실제 성공 여부를 확인해야 한다. 비동기적 전송(asynchronous send)콜백 함수와 함께 send()
메서드를 호출하면 카프카 브로커로부터 응답을 받는 시점에서 자동으로 콜백 함수가 호출된다. 카프카로 메시지 전달하기# ProduceRecord
를 생성한다.ProducerRecord
클래스의 생성자는 여러 개 있다.위의 예시에서 사용한 생성자는 토픽 이름과 키, 밸류값을 사용한 것이다. ProducerRecord
를 전송하기 위해 프로듀서 객체의 send
메서드를 사용한다.send()
메서드는 RecordMetadata
를 포함한 자바 Future
객체를 리턴하지만, 여기서는 리턴값을 무시하기 때문에 메시지 전송의 성공 여부를 알아낼 방법은 없다.카프카 브로커에 메시지를 전송할 떄 발생하는 에러 혹은 브로커 자체에서 발생한 에러를 무시하더라도 프로듀서가 카프카로 메시지를 보내기 전 에러가 발생할 경우 여전히 예외가 발생할 수 있다.메시지를 직렬화하는 데 실패할 경우 SerializationException
버퍼가 가득찰 경우 TimeoutException
실제로 전송 작업을 수행하는 스레드에 인터럽트가 걸리는 경우 InterruptException
이 발생한다. 동기적으로 메시지 전송하기# 동기적으로 메시지를 전송할 경우 전송을 요청하는 스레드는 이 시간 동안 아무것도 안 하면서 기다려야 한다. 결과적으로 성능이 크게 낮아지기 때문에 동기적 전송은 실제로 사용되는 애플리케이션에서는 잘 사용되지 않는다. KafkaProducer
에는 두 종류의 에러가 있다.재시도 가능한 에러예로 연결 에러는 연결이 회복되면 해결될 수 있다. 이런 류 에러가 발생했을 때 자동으로 재시도하도록 KafkaProducer
를 설정할 수 있기 떄문에 이 경우 재전송 횟수가 소진되고서도 에러가 해결되지 않은 경우에 한해 재시도 가능한 예외가 발생한다. 재시도 불가능한 에러예로메시지 크기가 너무 클 경우 이러한 경우, KafkaProducer
는 재시도 없이 바로 예외를 발생시킨다. 비동기적으로 메시지 전송하기# 메시지를 비동기적으로 전송하고도 여전히 에러를 처리하는 경우를 위해 프로듀서는 레코드를 전송할 때 콜백을 지정할 수 있도록 한다. 콜백을 사용하려면 org.apache.kafka.clients.producer.Callback
인터페이스를 구현하는 클래스가 필요하다. 이 인터페이스에는 onCompletion()
단 하나의 메서드만 정의되어 있다. 만약 카프카가 에러를 리턴한다면 onCompletion()
메서드가 null이 아닌 Exception 클래스를 받게 된다. 콜백은 프로듀서의 메인 스레드에서 실행된다.만약 우리가 두 개의 메시지를 동일한 파티션에 전송한다면, 콜백 역시 우리가 보낸 순서대로 실행된다. 전송되어야 할 메시지가 전송 안되고 프로듀서가 지연되는 상황을 막기 위해서는 콜백이 충분히 빨라야 한다는 의미이기도 하다. 콜백 안에서 다른 블로킹 작업을 수행하는 것 역시 권장되지 않는다. 대신, 블로킹 작업을 동시에 수행하는 다른 스레드를 사용해야 한다. 프로듀서 설정하기# client.id# 프로듀서와 그것을 사용하는 애플리케이션을 구분하기 위한 논리적 식별자 브로커가 로그 메시지를 출력하거나 성능 메트릭 값을 집계할 때, 그리고 클라이언트별로 사용량을 할당할 때 사용된다. acks# acks 매개변수는 프로듀서가 임의의 쓰기 작업이 성공했다고 판별하기 위해 얼마나 많은 파티션 레플리카가 해당 레코드를 받아야 하는지를 결정한다. 카프카 3.0 기준으로 기본값은 acks=all
acks 매개변수 설정 가능한 3가지 값acks=0
: 프로듀서는 메시지가 성공적으로 전달되었다고 간주하고 브로커의 응답을 기다리지 않는다.acks=1
: 프로듀서는 리더 레플리카가 메시지를 받는 순간 브로커로부터 성공했다는 응답을 받는다.만약 리더에 메시지를 쓸 수 없다면 프로듀서는 에러 응답을 받을 것이고 데이터 유실을 피하기 위해 메시지 재전송을 시도하게 된다. 하지만 리더에 크래시가 난 상태에서 해당 메시지가 복제가 안 된 채로 새 리더가 선출될 경우에는 여전히 메시지가 유실될 수 있다. acks=all
: 프로듀서는 메시지가 모든 인-싱크 레플리카에 전달된 뒤에야 브로커로부터 성공했다는 응답을 받는다.이것은 가장 안전한 형태인데, 최소 2개 이상의 브로커가 해당 메시지를 가지고 있으며, 이는 크래시가 났을 경우에도 유실되지 않기 때문이다. 그러나 acks=1
인 경우, 단순히 브로커 하나가 메시지를 받는 것보다 더 기다려야 하기 때문에 지연 시간은 더 길어질 것이다. acks 설정은 신뢰성과 프로듀서 지연 사이의 트레이드 오프 관계이다.하지만 종단 지연의 3가지 설정 모두 같은 값이다. 종단 지연: 레코드가 생성되어 컨슈머가 읽을 수 있을 때 까지의 시간 카프카는 일관성을 유지하기 위해서 모든 인-싱크 래플리카에 복제가 완료된 뒤에야 컨슈머가 레코드를 읽어 갈 수 있게 하기 때문이다. 메시지 전달 시간# 카프카 2.1부터 개발진은 ProducerRecord
를 보낼 때 걸리는 시간을 두 구간으로 나누어 따로 처리할 수 있도록 했다.send()에 대한 비동기 호출이 이뤄진 시각부터 결과를 리턴할 때까지 걸리는 시간: 이 시간 동안 send()를 호출한 스레드는 블록된다. send()에 대한 비동기 호출이 성공적으로 리턴한 시각부터 콜백이 호출될 때 까지 걸리는 시간 max.block.ms
: 프로듀서가 얼마나 오랫동안 블록되는지 결정프로듀서의 전송 버퍼가 가득 차거나 메타데이터가 아직 사용 가능하지 않을 때 블록된다. delivery.timeout.ms
: 이 설정은 레코드 전송 준비가 완료된 시점(즉, send()
가 문제없이 리턴되고 레코드가 배치에 저장된 시점)에서부터 브로커의 응답을 받거나 아니면 전송을 포기하게 되는 시점까지의 제한시간을 결정한다.이 값은 linger.ms
와 request.timeout.ms
보다 커야 한다. 만약 프로듀서가 재시도를 하는 도중에 delivery.timeout.ms
가 넘어가버린다면, 마지막으로 재시도 하기 전에 브로커가 리턴한 에러에 해당하는 예외와 함께 콜백이 호출된다. 레코드 배치가 전송 기다리는 와중에 delivery.timeout.ms
가 넘어가버리면 타임아웃 예외와 함께 콜백이 호출된다. request.timeout.ms
: 프로듀서가 데이터를 전송할 때 서버로부터 응답을 받기 위해 얼마나 기다릴 것인지를 결정한다.응답 없이 타입아웃이 발생할 경우, 프로듀서는 재전송을 시도하거나 아니면 TimeoutException
과 함께 콜백을 호출한다. retries
: 프로듀서가 메시지 전송을 포기하고 에러를 발생시키 때까지 메시지를 재전송하는 횟수를 결정한다.retry.backoff.ms
: 재시도 사이에 대기 시간(기본값은 100ms)현재 버전의 카프카에서 retries
와 retry.backoff.ms
를 조정하는 것을 권장하지 않는다.대신, 크래시 난 브로커가 정상으로 돌아오기까지의 시간을 테스트한 뒤 deliver.timeout.ms
매개변수를 더 길게 잡아 주는 것이 좋다. linger.ms# 현재 배치를 전송하기 전까지 대기하는 시간을 결정한다. KafkaProducer
는 현재 배치가 가득 차거나 linkger.ms
에 설정된 제한 시간이 되었을 때 메시지 배치를 전송한다.기본적으로, 프로듀서는 메시지 전송에 사용할 수 있는 스레드가 있을 때 곧바로 전송하도록 되어 있다. linkger.ms
를 높이면 지연은 좀 증가하는 대신 처리율을 크게 증대시킨다.buffer.memory# 프로듀서가 메시지를 전송하기 전에 메시지를 대기시키는 버퍼의 크기를 결정한다. 버퍼 메모리가 가득 찬 경우, 추가로 호출되는 send()
는 max.block.ms 동안 블록되어 버퍼 메모리에 공간이 생기기를 기다리게 되는데, 해당 시간 동안 대기하고서도 공간이 확보되지 않으면 예외를 발생시킨다. compression.type# 기본적으로 메시지는 압축되지 않은 상태로 전송된다. 하지만 이 매개변수를 snappy, gzip, lz4, zstd 중 하나로 설정하면 해당 압축 알고리즘을 사용해서 메시지를 압축한뒤 브로커로 전송된다. batch.size# 같은 파티션에 다수의 레코드가 전송될 경우 프로듀서는 이것들을 배치 단위로 모아서 한꺼번에 전송한다. 이 매개변수는 각각의 배치에 사용될 메모리 바이트를 결정한다. 배치가 가득 차면 해당 매치에 들어 있는 모든 메시지가 한꺼번에 전송된다. 하지만 그렇다고 프로듀서가 배치가 가득 찰 때까지 기다리는 의미는 아니다. max.in.flight.requests.per.connection# 프로듀서가 서버로부터 응답을 받지 못한 상태에서 전송할 수 있는 최대 메시지의 수 이 값을 올려잡아 주면 메모리 사용량이 증가하지만 처리량 역시 증가한다. retries
매개변수를 0보다 큰 값을 설정한 상태에서 max.in.flight.requests.per.connection
을 1 이상으로 잡아줄 경우 메시지의 순서가 뒤집어질 수 있다.브로커가 첫 번째 배치를 받아서 쓰려다 실패했는데, 두 번째 배치를 쓸 때는 성공한 상황에서 다시 첫 번째 배치가 재전송 시도되어 성공한 경우 max.request.size# 프로듀서가 전송하는 쓰기 요청의 크기를 결정한다. 이 값은 메시지의 최대 크기를 제한하기도 하지만, 한 번의 요청에 보낼 수 있는 메시지의 최대 개수 역시 제한한다. 브로커에는 브로커가 받아들일 수 잇는 최대 메시지 크기를 결정하는 message.max.bytes
매개변수가 있다. 이 두 매개변수를 동일하게 맞춤으로써 프로듀서가 브로커가 받아들이지 못하는 크기의 메시지를 전소앟려 하지 않게 하는 것이 좋다. receive.buffer.bytes, send.buffer.bytes# 데이터를 읽거나 쓸 때 소켓이 사용하는 TCP 송수신 버퍼의 크기를 결정한다. 각각의 값이 -1일 경우에는 운영체제의 기본값이 사용된다. 프로듀서나 컨슈머가 다른 데이터센터에 위치한 브로커와 통신할 경우 네트워크 대역폭은 낮고 지연은 길어지는 것이 보통이기 떄문에 이 값들을 올려잡아 주는 것이 좋다. enable.indepotence# acks=all으로 잡고 실패가 나더라도 충분히 재시도하도록 delivery.timeout.ms
는 꽤 큰 값으로 잡는다. 이 경우 메시지는 최소 한 번 카프카에 쓰여지게 된다.브로커가 프로듀서로부터 레코드를 받아서 로컬 디스크에 쓰고, 다른 브로커에도 성공적으로 복제되었다고 하자. 여기서 첫 번째 브로커가 프로듀서로 응답을 보내기 전에 크래시가 나면, 프로듀서는 request.timeout.ms
만큼 대기한 뒤 재전송을 시도하게 된다. 이때 새로 보내진 메시지는 이미 메시지를 받은 바 있는 새 리더 브로커로 전달되게 된다. 메시지가 중복되어 저장되는 것이다. enable.idempotence=true
설정을 잡아 주는 것은 바로 이러한 사태를 방지하기 위함이다.멱등적 프로듀서 기능이 활성화된다면, 프로듀서는 레코드를 보낼 때 마다 순차적인 번호를 붙여서 보내게된다. 만약 브로커가 동일한 번호를 가진 레코드를 2개 이상 받을 경우 하나만 저장하게 되며, 프로듀서는 별다른 문제를 발생시키지 않는 DuplicateSequenceException
을 받게 된다. 멱등성 프로듀서 기능을 활성화하기 위해서는 max.in.flight.requests.per.connection
매개변수는 5이하로, retires
는 1이상으로 그리고 acks=all
로 잡아 주어야 한다. 만약 이 조건을 만족하지 않는 설정값을 지정하면 Config.Exception
이 발생한다. 시리얼라이저# 커스텀 시리얼라이저# 카프카로 전송해야 하는 객체가 단순한 문자열이나 정숫값이 아닐 경우에는 두 가지의 선택지가 있다.Avro, Thrift, Protobuf와 같은 범용 직렬화 라이브러리 사용 사용하고 있는 객체를 직렬화하기 위한 커스텀 직렬화 로직 작성 범용 직렬화 라이브러리를 사용하는 방안을 강력하게 권장한다. 커스텀 시리얼라이저의 경우, 직렬화 로직이 바뀐담녀 같은 회사의 다른 팀에서 호환성을 위해 직렬화 코드를 모두 동시에 변경해야 되는 상황이 발생한다. 아파치 에이브로를 사용해서 직렬화하기# 아파치 에이브로는 언어 중립적인 스키마의 형태로 기술된다. 이 스키마는 보통 JSON 형식으로 정의되며, 주어진 데이터를 스키마에 따라 직렬화하면 이진 파일 형태로 결과물이 뽑혀 나오는 것이 보통이다. 에이브로는 직렬화된 결과물에 스키마 정보가 주어진다고 가정하고, 보통은 에이브로 파일 자체에 스키마를 내장하는 방법을 쓴다. 스키마 예시 에이브로의 이점데이터를 읽는 쪽 애플리케이션을 전부 변경하지 않고 스키마를 변경하더라도 어떠한 예외나 에러가 발생하지 않으며, 기존 데이터를 새 스키마에 맞춰 업데이트하는 작업을 할 필요도 없다. 에이브로 사용시 주의 점데이터를 쓸 때 사용하는 스키마와 읽을 때 기대하는 스키마가 호환되어야 한다. 역직렬화를 할 때는 데이터를 쓸 때 사용했던 스키마에 접근이 가능해야 한다. 설령 그 스키마가 읽는 쪽 애플리케이션에서 기대하는 스키마와 다른 경우에도 마찬가지다. 카프카에서 에이브로 레코드 사용하기# 에이브로는 레코드를 읽을 때 스키마 전체를 필요로 하기 때문에 어딘가에 스키마를 저장해 두기를 해야 한다.이 문제를 해결하기 위해 스키마 레지스트리라 불리는 아키텍처 패턴을 사용한다. 스키마 레지스트리는 아파치 카프카의 일부가 아니며 여러 오픈소스 구현체 중 하나를 골라서 사용하면 된다. 카프카에 데이터를 쓰기 위해 사용되는 모든 스키마를 레지스트리에 저장한다.카프카에 쓰는 레코드에는 사용된 스키마의 고유 식별자만 심어줌녀 된다. 컨슈머는 이 식별자를 사용해서 스키마 레지스트리에서 스키마를 가져와서 데이터를 역직렬화 할 수 있다. 여기서 중요한 점은 이 모든 작업은 시리얼라이저와 디시리얼라이저 내부에서 수행된다는 점이다. 에이브로를 사용해서 생성한 객체를 카프카에 쓰는 예시Customer
클래스는 POJO가 아니라 에이브로의 코드 생성 기능을 사용해서 스키마로부터 생성된 에이브로 특화 객체다.에이브로 시리얼라이저는 POJO 객체가 아닌 에이브로 객체만 직렬화할 수 있다. 에이브로 클래스를 생성하는것은 avro-tools.jar
를 사용하거나 에이브로 메이븐 플러그인을 사용해서 가능하다. 2번 코드 라인을 보면 스키마 레지스트리 url을 명시한 것을 볼 수 있다. 에이브로를 사용하면 키-밸류 맵 형태로 사용할 수 있는 제네릭 에이브로 객체 역시 사용할 수 있다.제네릭 에이브로 객체를 사용할 떄는 에이브로 스키마를 지정한다. 더 이상 에이브로가 자동 생성한 객체를 사용하지 않기 때문이다. 파티션# 키의 역할메시지에 저장되는 추가적인 정보 토픽에 속한 여러 개의 파티션 중 해당 메시지가 저장될 파티션을 결정짓는 기준점 같은 키값은 가진 모든 메시지는 같은 파티션에 저장된다. 기본 파티셔너키값이 null인 레코드가 주어질 경우각 파티션별로 저장되는 메시지 개수의 균형을 맞추기 위해 라운드 로빈 알고리즘이 사용된다. 아파치 카프카 2.4 프로듀서부터는 접착성 처리가 있는 라운드 로빈 알고리즘을 사용하여, 더 적은 요청으로 같은 수의 메시지를 전송하게 하여 지연 시간을 줄이고 브로커의 CPU 사용량도 줄인다. 키값이 지정된 상황인 경우, 카프카는 키값을 해시한 결과를 기준으로 메시지를 저장할 파티션을 특정한다.이때 파티셔너는 자체적인 해싱알고리즘을 사용하기 때문에 자바 버전이 업그레이드되어도 해시값은 변하지 않는다. RoundRobinPartitioner와 UniformStickyPartitioner키값을 포함하고 있을 경우에도 랜덤파티션 할당과 접착성 랜덤 파티션 할당을 수행한다. 컨슈머 쪽에서만 키값이 중요한 경우에 사용할 수 있다. (카프카에 저장된 데이터를 RDBS로 보낼 때 카프케 레코드의 키값을 기본 키로 사용하는 등) 기본 파티셔너에서 키값 분포가 불균형해서 특정한 키값을 갖는 레코드가 많을 경우 부하가 몰릴 수 있는데 이때 UniformStickyPartitioner를 사용할 수 있다. 레코드 해더는 카프카 레코드의 키/밸류값을 건드리지 않고 추가 메타데이터를 심을 때 사용한다. 헤더의 주된 용도 중 하나는 메시지의 전달 내역을 기록하는 것이다.즉, 데이터가 생성된 곳의 정보를 헤더에 저장해 두면, 메시지를 파싱할 필요없이 헤더에 심어진 정보만으로 메시지를 라우팅하거나 출처를 추적할 수 있다. 헤더는 순서가 있는 키/밸류 쌍의 집합으로 구현되어 있다.키값은 언제나 String 타입이어야 하지만 밸류값은 아무 직렬화된 객체라도 상관없다. 인터셉터# ProducerRecord<K, V> onSend(ProducerRecord<K, V> record)
프로듀서가 레코드를 브로커에 보내기 전, 직렬화되기 직전에 호출된다. 이 메서드를 재정의할 때는 보내질 레코드에 담긴 정보를 볼 수 있을 뿐만 아니라 고칠 수도 있다. 이 메서드에서 유효한 ProducerRecord를 리턴하도록 주의하기만 하면 된다. 이 메시지가 리턴한 레코드가 직렬화되어 카프카로 보내질 것이다. void onAcknowledgement(RecordMetadata metadata, Exception exception)
카프카 브로커가 보낸 응답을 클라이언트가 받았을 때 호출한다. 브로커가 보낸 응답을 변경할 수는 없지만, 그 안에 담긴 정보는 읽을 수 있다. 인터셉터의 일반적인 사용 사례로는 모니터링, 정보 추적, 표준 헤더 삽입 등이 있다. 쿼터, 스로틀링# 카프카 브로커에는 한도를 설정해주면 쓰기/읽기 속도를 제한할 수 있다. 3가지 쿼터 타입 쓰기/읽기 쿼터는 클라이언트가 데이터를 전송하거나 받는 속도를 초당 바이트 수 단위로 제한한다. 요청 쿼터의 경우 브로커가 요청을 처리하는 시간 비율 단위로 제한한다. 쿼터는 기본값을 설정하거나, 특정한 client.id 값에 대해 설정하거나, 특정한 사용자에 대해 설정할 수 있다.사용자에 대해 설정된 쿼터는 보안 기능과 클라이언트 인증 기능이 활성화되어 있는 클라이언트에서만 작동한다. 모든 클라이언트에 적용되는 쓰기/읽기 쿼터의 기본값은 카프카 브로커를 설정할 때 함께 설정해줄 수 있다. (quata.producer.default=2M
) 권장되는 것은 아니지만, 브로커 설정 파일에 특정 클라이언트에 대한 쿼터값을 정의해서 기본값을 덮어쓸 수도 있다. (`quota.producer.override=“clientA:4M,clientB:10M”) 카프카의 설정 파일에 정의된 쿼터값은 고정되더 있기 때문에 이 값을 변경하고 싶다면 설정 파일을 변경한 뒤에 모든 브로커를 재시작하는 방법밖에 없다.이러한 이유 때문에 특정한 클라이언트에 쿼터를 적용할 때는 kafka-configs.sh
또는 AdminClient API에서 제공하는 동적 설정 기능을 사용하는 것이 보통이다. 클라이언트가 할당량을 다 채웠을 경우, 브로커는 클라이언트의 요청에 대한 스로틀링을 시작하여 할당량을 초과하지 않도록 한다.이는 브로커가 클라이언트 요청에 대한 응답을 늦게 보내준다는 의미이기도 한데, 대부분의 경우 클라이언트는 이 상황에서 자동으로 요청 속도를 줄이는 것이 보통이기 때문에 해당 클라이언트의 메시지 사용량이 할당량 아래로 줄어들게 된다. 스로틀되는 와중에도 오작동하는 클라이언트가 추가 요청을 쏟아낼 경우 브로커는 해당 클라이언트와의 커뮤니케이션 채널을 일시적으로 무시함으로써 정해진 할당량을 맞추고 브로커를 보호한다. 클라이언트 입장에서는 다음 JMX 메트릭을 통해서 스로틀링의 작동 여부를 확인할 수 있다. 각각은 쓰기/읽기 요청이 스로틀링 때문에 지연된 평균/최대 시간을 나타낸다.produce-throttle-time-avg produce-throttle-time-max fetch-throttle-time-avg fetch-throttle-time-max Please enable JavaScript to view the comments powered by Disqus. comments powered by