- 이전 장에서는 REST를 사용한 동기화 통신을 알아봤다.
- 비동기 메시징은 애플리케이션 간에 응답을 기다리지 않고 간접적으로 메시지를 전송하는 방법이다.
JMS로 메시지 전송하기
- JMS(Java Message Service): 두 개 이상의 클라이언트 간에 메시지 통신을 위한 공통 API를 정의하는 자바 표준
- JMS가 나오기 전에는 메시지 브로커들이 각자 나름의 API를 갖고 있어서, 애플리케이션의 메시징 코드가 브로커 간에 호화되지 않았다.
- 메시지 브로커: 클라이언트 간에 메시지 통신을 중개하는 역할
- JMS를 사용하면 이것을 준수하는 모든 궇녀 코드가 공통 인터페이스를 통해 함께 작동할 수 있다.
- 스프링은
JmsTemplate
이라는 템플릿 기반의 클래스를 통해 JMS를 지원한다.JmsTemplate
을 사용하면 프로듀서가 큐와 토픽에 메시지를 전송하고, 컨슈머는 그 메시지들을 받을 수 잇다.- POJO도 지원한다.
- JMS는 자바 명세이므로 자바 애플리케이션에서만 사용할 수 있다. JVM 외의 다른 플랫폼에서 사용하려면 RabbitMQ 등을 사용할 수 있다.
JMS 설정하기
- 아파치 ActiveMQ 또는 더 최신의 아파치 ActiveMQ Artemis 중 어느 브로커를 사용할지 결정해야 한다.
- ActiveMQ를 사용하는 경우: pom.xml과 applicatoin.yml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
spring:
activemq:
broker-url: tcp://activemq.tacocloud.com
user: tacoweb
password: l3tm31n
- Artemis를 사용하는 경우: pom.xml과 applicatoin.yml
<dependecy>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-artemis</artifactId>
</dependency>
spring:
artemis:
host: artemis.tacocloud.com
port: 61617
user: tacoweb
password: l3tm31n
JmsTemplate을 사용해서 메시지 전송하기
JmsTemplate
가 메시지 브로커와의 연결 및 세션하는 코드, 메시지를 전송하는 도중 발생하는 예외를 처리하는 코드를 대신 처리하고 있다.- 메시지를 전송하기 위해서
send()
와convertAndSend()
두 개의 메서드가 있으며, 각 메서드는 서로 다른 매개변수를 지원하기 위해 오버로딩되어 있다. - 3개의 부류로 나눌 수 있다.
send()
메서드 3개는Message
객체를 생성하기 위해MessgaeCreator
를 필요로 한다.convertAndSend()
메서드 3개는Object
타입 객체를 인자로 받아 내부적으로Message
타입으로 변환한다.- 다른
convertAndSend()
메서드 3개는Object
타입 객체를Message
타입으로 변환한다. 그러나 메시지가 전송되기 전에Message
의 커스터마이징을 할 수 있도록MEssagePostProcessor
도 인자로 받는다.
- 3개의 메서드 부류는 각각 3개의 오버로딩된 메서드로 구성되며, 이 메서드들은 JMS 메시지의 도착지, 즉 메시지를 쓰는 곳 (큐 또는 토픽)을 지정하는 방법이 다르다.
- 첫 번째 메서드는 도착지 매개변수가 없으며, 해당 메시지를 기본 도착지로 전송한다.
- 두 번째 메서드는 해당 메시지의 도착지를 나타내는
Destination
객체를 인자로 받는다. - 세 번째 메서드는 해당 메시지의 도착지를 나타내는 문자열을 인자로 받는다.
- 기본 도착지 설정 방법: application.yml 파일에서 지정할 수 있다.
spring:
jms:
template:
default-destination: tacocloud.order.queue
Destination
객체는 빈으로 선언하고 메시지 전송을 수행하는 빈에 주입한다.
@Bean
fun orderQueue(): Destination {
return ActiveMQQueue("tacocloud.order.queue")
}
메시지 변환하고 전송하기
convertAndSend()
메서드는 인자로 전달받은 객체를Message
객체로 변환되어 전송된다.Message
객체로 변환하는 일은MessageConverter
를 구현하여 처리할 수 있다.
메시지 변환기 구현하기
MessageConverter
는 스프링에 정의된 인터페이스며, 두 개의 메서드만 정의되어 있다.
public interface MessageConverter {
Message toMessage(Object object, Session session) throws JMSException, MessageConversionException;
Object fromMessage(Message message) throws JMSException, MessageConversionException;
}
- 스프링에서 기본적으로 4가지의
MessageConverter
구현체를 제공하고 있다. - 기본적으로
SimpleMessageConverter
를 사용하며, 이 경우 전송될 객체가Serializable
인터페이스를 구현해야된다. Serializable
인터페이스를 구현해야 한다는 제약을 피하기 위해MappingJackson2MessageConverter
와 같은 다른 메시지 변환기를 사용할 수도 있다.- 다른 메시지 변환기를 사용하기 위해서는 빈으로 선언하면 된다.
setTypeIdPropertyName()
: 수신된 메시지의 변환 타입을 메시지 수신자가 알기위해서 타입 정보를 전달할 프로퍼티 이름setTypeIdMapping()
: 기본적으로 패키지 전체 경로가 포함된 클래스 이름이 전달되는데, 리팩토링할 때마다 수신자에서도 수정이 필요하므로 유연성을 늘리기위해서 크래스명 대신 ‘order’을 전달하도록 한다.
@Bean
fun messageConverter(objectMapper: ObjectMapper): MappingJackson2MessageConverter {
return MappingJackson2MessageConverter().apply {
setObjectMapper(objectMapper)
setTypeIdPropertyName("_typeId")
setTypeIdMappings(
mapOf<String, Class<*>>(
"order" to Order::class.java
)
)
}
}
후처리 메시지
convertAndSend()
의 마지막 인자로MessagePostProcessor
를 전달하면Message
객체가 생성된 후 이 객체에 우리가 필요한 처리를 할 수 있다.MessagePostProcessor
는 함수형 인터페이스다.
jmsTemplate.convertAndSend(orderQueue, order) { message ->
message.setStringProperty("X_ORDER_SOURCE", "WEB")
message
}
JMS 메시지 수신하기
- 메시지를 수신하는 방식에는 두 가지가 있다.
- 풀 모델: 우리 코드에서 메시지를 요청하고 도착할 때까지 기다린다.
- 푸시 모델: 메시지가 수신 가능하게 되면 우리 코드로 자동 전달된다.
JmsTemplate
의 모든 수신 메서드는 풀 모델을 사용한다.- 푸시 모델을 사용하기 위해서는 메시지 리스너를 정의해야 된다.
- 일반적으로 스레드의 실행을 막지 않으므로 일반적으로 푸시 모델이 좋은 선택이다.
- 단, 푸시 모델은 많은 메시지가 너무 빨리 도착하면 리스너에 과부하가 걸릴 수 있다.
JmsTemplate을 사용해서 메시지 수신하기
JmsTemplate
에서는receive()
와receiveAndConvert()
를 제공한다.receive()
: 원시 메시지를 수신한다.receiveAndConvert()
: 메시지를 도메인 타입으로 변환하기 위해 구성된 메시지 변환기를 사용한다.
- 메시지를 수신하기 위해서는 매번 해당 메소드를 호출해야된다.
- 위 메서드들은 메시지가 있는지 지정된 시간까지만 기다린다.
@Component
class JmsOrderReceiver(
private val jmsTemplate: JmsTemplate,
private val messageConverter: MappingJackson2MessageConverter,
private val orderQueue: Destination
) : OrderReceiver {
override fun receiveOrder(): Order? {
val message = jmsTemplate.receive(orderQueue)
?: return null
return (messageConverter.fromMessage(message)) as Order
}
}
메시지 리스너 선언하기
@JmsListener
가 지정된 메서드들은 지정된 도착지에 들어오는 메시지에 반응한다.
@JmsListener(destination = "tacocloud.order.queue")
fun receiveOrder(order: Order) {
// something...
}
RabbitMQ와 AMQP 사용하기
- AMQP에서 가장 중요한 구현이라고 할 수 있는 RabbitMQ는 JMS보다 더 진보된 메시지 라우팅 전략을 제공한다.
- AMQP 메시지는 수신자가 리스닝하는 큐와 분리된 거래소(exchange) 이름과 라우팅 키를 주소로 사용한다.
- 메시지가 RabbitMQ 브로커에 도착하면 주소로 지정된 거래소에 들어간다.
- 거래소는 하나 이상의 큐에 메시지를 전달할 책임이 있다.
- 거래소의 종류
- 기본(default): 브로커가 자동으로 생성하는 특별한 거래소. 해당 메시지의 라우팅 키와 이름이 같은 큐로 메시지를 전달한다. 모든 큐는 자동으로 기본 거래소와 연결된다.
- 디렉트(direct): 바인딩 키가 해당 메시지의 라우팅 키와 같은 큐에 메시지를 전달한다.
- 토픽(topic): 바인딩 키(와일드카드 포함)가 해당 메시지의 라우팅 키와 일치하는 하나 이상의 큐에 메시지를 전달한다.
- 팬아웃(fanout): 바인딩 키가 라우팅 키에 상관없이 모든 연결된 큐에 메시지를 전달한다.
- 헤더(header): 토픽 거래소와 유사하며, 라우팅 키 대신 메시지 헤더 값을 기반으로 한다는 것만 다르다.
- 데드 레터(dead letter): 전달 불가능한 즉, 정의된 어떤 거래소-큐 바인딩과도 일치하지 않는 모든 메시지를 보관하는 잡동사니 거래소
- 가장 간단한 형태는 기본 거래소와 팬아웃 거래소.
RabbitMQ를 스프링에 추가하기
- 의존성 추가
- 의존성을 추가하면 AMQP 연결 팩토리와
RabbitTemplate
빈을 생성하는 자동-구성이 수행된다.
- 의존성을 추가하면 AMQP 연결 팩토리와
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- rabbitmq 관련 구성 속성
- 개발 목적이면 로컬 컴퓨터에서 실행되고 5672 포트를 리스닝할 것이며, 인증 정보가 필요 없을 것이다.
RabbitTemplate을 사용해서 메시지 전송하기
RabbitTemplate
도send()
와convertAndSend()
메서드가 있다.RabbitTemplate
은JmsTemplate
와 다르게 메시지를 보낼 때, 거래소와 라우팅 키를 보낸다.- https://docs.spring.io/spring-amqp/api/org/springframework/amqp/rabbit/core/RabbitTemplate.html
send()
를 사용할 때rabbitTemplate
에서 가지고 있는MessageConverter
를 사용할 수도 있다.
@Service
class RabbitOrderMessagingService(
private val rabbitTemplate: RabbitTemplate
): OrderMessagingService {
override fun sendOrder(order: Order) {
val converter = rabbitTemplate.messageConverter
val props = MessageProperties()
val message = converter.toMessage(order, props)
rabbitTemplate.send("tacocloud.order", message)
}
}
- 기본 거래소 이름은 빈 문자열인 ““이며, 이것은 RabbitMQ 브로커가 자동으로 생성하는 기본 거래소와 일치한다. 기본 라우팅 키도 ““이다.
send()
메서드에서 거래소 이름과, 라우팅 키를 전달하지 않으면 기본값을 사용한다.- 이런 기본값은
spring.rabbitmq.template.exchange
와spring.rabbitmq.tempalte.routing-key
속성을 통해 변경할 수 있다.
메시지 변환기 구성하기
- RabbitMQ는 아래와 같은 메시지 변환기를 제공한다.
MessageConverter
타입을 빈으로 구성하면 메시지 변환기를 변경할 수 있다.
@Bean
fun messageConverter(): MessageConverter {
return Jackson2JsonMessageConverter()
}
메시지 속성 설정하기
- 메시지의 일부 헤더를 설정해야 할 경우,
MessageProperties
인스턴스를 통해 헤더를 설정할 수 있다. convertAndSend()
를 사용할 때는MessagePostProcessor
에서 해야된다.
@Service
class RabbitOrderMessagingService(
private val rabbitTemplate: RabbitTemplate
) : OrderMessagingService {
override fun sendOrder(order: Order) {
rabbitTemplate.convertAndSend("tacocloud.order.queue", order) { message ->
val props = message.messageProperties
props.setHeader("X_ORDER_SOURCE", "WEB")
message
}
}
}
RabbitMQ로부터 메시지 수신하기
- RabbitMQ도 JMS처럼 풀 모델과 푸시 모델이 있다.
RabbitTemplate
을 사용해서 큐로부터 메시지를 가져온다.@RabbitListener
가 지정된 메서드로 메시지가 푸시된다.
RabbitTemplate을 사용해서 메시지 수신하기
RabbitTemplate
에receive()
와receiveAndConvert()
메서드가 존재한다.수신 메서드에는 거래소나 라우팅 키를 매개변수로 갖지 않고, 큐의 이름은 매개변수로 갖는다.
대부분의 수신 메서드는 메시지의 수신 타임아웃을 나타내기 위해
long
타입의 매개변수를 갖는다. (기본값은 1ms)spring.rabbitmq.template.receive-timeout
를 통해 기본 타임아웃 값을 설정할 수 있다.
타임아웃 내에 메시지가 없으면
null
값이 반환된다.receive()
사용하는 예시
@Component
class RabbitOrderReceiver(
private val rabbitTemplate: RabbitTemplate,
private val messageConverter: MessageConverter
) {
fun receiveOrder(): Order? {
val message = rabbitTemplate.receive("tacocloud.order.queue")
?: return null
return messageConverter.fromMessage(message) as Order
}
}
receiveAndConvert()
사용하는 예시ParameterizedTypeReference
를 통해서 타입-안전 캐스팅이 가능하다.
@Component
class RabbitOrderReceiver(
private val rabbitTemplate: RabbitTemplate,
private val messageConverter: MessageConverter
) {
fun receiveOrder(): Order? {
return rabbitTemplate.receiveAndConvert(
"tacocloud.order.queue",
object : ParameterizedTypeReference<Order>() {}
)
}
}
리스너를 사용해서 RabbitMQ 메시지 처리하기
@RabbitListner
애노테이션을 메서드 위에 지정하면 된다.
@Component
class OrderListener {
companion object {
private val logger = LoggerFactory.getLogger(OrderListener::class.java)
}
@RabbitListener(queues = ["tacocloud.order.queue"])
fun receiveOrder(order: Order) {
logger.info(order.deliveryName)
}
}
리스너가 사용할 큐 생성하기
- 빈으로 등록하면 된다.
@Bean
fun orderQueue(): Queue {
return Queue("tacocloud.order.queue", false)
}
카프카 사용하기
- 카프카는 높은 확장성을 제공하는 클러스터로 실행되도록 설계되었다.
- 클러스터의 모든 카프카 인스턴스에 걸쳐 토픽을 파티션으로 분할하여 메시지를 관리한다.
- RabbitMQ가 거래소와 큐를 사용해서 메시지를 처리하는 반면, 카프카는 토픽만 사용한다.
- 카프카의 토픽은 클러스터의 모든 브로커에 걸쳐 복제된다.
- 각 토픽은 여러 개의 파티션으로 분할될 수 있다. 이 경우 클러스터의 각 노드는 한 토픽의 하나 이상의 파티션의 리더가 된다.
카프카 사용을 위해 스프링 설정하기
- 의존성 추가
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
KafkaTemplate
은 기본적으로 localhost에서 실행되면서 9092 포트를 리스닝하는 카프카 브로커를 사용한다.spring.kafka.bootstrap-servers
속성: 카프카 클러스터로의 초기 연결에 사용되는 하나 이상의 카프카 서버들의 위치
spring:
kafka:
bootstrap-servers:
- kafka.tacocloud.com:9092
- kafka.tacocloud.com:9093
- kafka.tacocloud.com:9094
KafkaTemplate을 사용해서 메시지 전송하기
KafkaTemplate
에는send()
와sendDefault()
메서드가 있다.KafkaTemplate
는 제네릭 타입을 사용하고 있고, 메시지를 전송할 때 직접 도메인 타입을 처리할 수 있기 때문에,send()
메서드에서 메시지 타입 변환이 일어난다.
send()
메서드에 있을 수 있는 매개변수- 메시지가 전송될 토픽
- 토픽 데이터를 쓰는 파티션(optional)
- 레코드 전송 키(optional)
- 타임스탬프(optional, 기본값은
System.currentTimeMillis()
) - 페이로드(메시지에 적재된 순수한 데이터)
send()
메서드에ProducerRecord
를 전송하는 것도 있따.ProducerRecord
: 위에 나와 있는 매개변수르 하나에 담은 타입
@Service
class KafkaOrderMessagingService(
private val kafkaTemplate: KafkaTemplate<String, Order>
): OrderMessagingService {
override fun sendOrder(order: Order) {
kafkaTemplate.send("tacocloud.orders.topic", order)
}
}
sendDefault()
메서드를 사용하면 기본 토픽으로 메시지를 전송한다.spring.kafka.template.default-topic
속성에 기본 토픽을 설정할 수 있다.
class KafkaOrderMessagingService(
private val kafkaTemplate: KafkaTemplate<String, Order>
): OrderMessagingService {
override fun sendOrder(order: Order) {
kafkaTemplate.sendDefault(order)
}
}
카프카 리스너 작성하기
- 카프카는
KafkaTemplate
에서 메시지 수신을 제공하지 않는다. - 리스너를 작성해서 메시지를 가져올 수 있다.
@kafkaListener
애노테이션를 메서드에 정의하면 된다.
@KafkaListener(topics = ["tacocloud.orders.topic"])
fun receiveOrder(order: Order) {
logger.info(order.deliveryName)
}
- 메시지의 추가적인 메타데이터가 필요하다면
ConsumeRecord
나Message
객체도 인자로 받을 수 있다.
@KafkaListener(topics = ["tacocloud.orders.topic"])
fun receiveOrder(order: Order, record: ConsumerRecord<String, Order>) {
logger.info("Received from partition {} with timestamp {}", record.partition(), record.timestamp())
logger.info(order.deliveryName)
}