• 이전 장에서는 REST를 사용한 동기화 통신을 알아봤다.
  • 비동기 메시징은 애플리케이션 간에 응답을 기다리지 않고 간접적으로 메시지를 전송하는 방법이다.

JMS로 메시지 전송하기

  • JMS(Java Message Service): 두 개 이상의 클라이언트 간에 메시지 통신을 위한 공통 API를 정의하는 자바 표준
  • JMS가 나오기 전에는 메시지 브로커들이 각자 나름의 API를 갖고 있어서, 애플리케이션의 메시징 코드가 브로커 간에 호화되지 않았다.
    • 메시지 브로커: 클라이언트 간에 메시지 통신을 중개하는 역할
    • JMS를 사용하면 이것을 준수하는 모든 궇녀 코드가 공통 인터페이스를 통해 함께 작동할 수 있다.
  • 스프링은 JmsTemplate이라는 템플릿 기반의 클래스를 통해 JMS를 지원한다.
    • JmsTemplate을 사용하면 프로듀서가 큐와 토픽에 메시지를 전송하고, 컨슈머는 그 메시지들을 받을 수 잇다.
    • POJO도 지원한다.
  • JMS는 자바 명세이므로 자바 애플리케이션에서만 사용할 수 있다. JVM 외의 다른 플랫폼에서 사용하려면 RabbitMQ 등을 사용할 수 있다.

JMS 설정하기

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
spring:
	activemq:
		broker-url: tcp://activemq.tacocloud.com
		user: tacoweb
		password: l3tm31n
<dependecy>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-artemis</artifactId>
</dependency>
spring:
	artemis:
		host: artemis.tacocloud.com
		port: 61617
		user: tacoweb
		password: l3tm31n

JmsTemplate을 사용해서 메시지 전송하기

  • JmsTemplate가 메시지 브로커와의 연결 및 세션하는 코드, 메시지를 전송하는 도중 발생하는 예외를 처리하는 코드를 대신 처리하고 있다.
  • 메시지를 전송하기 위해서 send()convertAndSend() 두 개의 메서드가 있으며, 각 메서드는 서로 다른 매개변수를 지원하기 위해 오버로딩되어 있다.
  • 3개의 부류로 나눌 수 있다.
    • send() 메서드 3개는 Message 객체를 생성하기 위해 MessgaeCreator를 필요로 한다.
    • convertAndSend() 메서드 3개는 Object 타입 객체를 인자로 받아 내부적으로 Message 타입으로 변환한다.
    • 다른 convertAndSend() 메서드 3개는 Object 타입 객체를 Message 타입으로 변환한다. 그러나 메시지가 전송되기 전에 Message의 커스터마이징을 할 수 있도록 MEssagePostProcessor도 인자로 받는다.
  • 3개의 메서드 부류는 각각 3개의 오버로딩된 메서드로 구성되며, 이 메서드들은 JMS 메시지의 도착지, 즉 메시지를 쓰는 곳 (큐 또는 토픽)을 지정하는 방법이 다르다.
    • 첫 번째 메서드는 도착지 매개변수가 없으며, 해당 메시지를 기본 도착지로 전송한다.
    • 두 번째 메서드는 해당 메시지의 도착지를 나타내는 Destination 객체를 인자로 받는다.
    • 세 번째 메서드는 해당 메시지의 도착지를 나타내는 문자열을 인자로 받는다.
  • 기본 도착지 설정 방법: application.yml 파일에서 지정할 수 있다.
spring:
	jms:
		template:
			default-destination: tacocloud.order.queue
  • Destination 객체는 빈으로 선언하고 메시지 전송을 수행하는 빈에 주입한다.
@Bean  
fun orderQueue(): Destination {  
   return ActiveMQQueue("tacocloud.order.queue")  
}

메시지 변환하고 전송하기

  • convertAndSend() 메서드는 인자로 전달받은 객체를 Message 객체로 변환되어 전송된다.
  • Message 객체로 변환하는 일은 MessageConverter를 구현하여 처리할 수 있다.

메시지 변환기 구현하기

  • MessageConverter는 스프링에 정의된 인터페이스며, 두 개의 메서드만 정의되어 있다.
public interface MessageConverter {  
    Message toMessage(Object object, Session session) throws JMSException, MessageConversionException; 
    Object fromMessage(Message message) throws JMSException, MessageConversionException;  
}
  • 스프링에서 기본적으로 4가지의 MessageConverter 구현체를 제공하고 있다.
  • 기본적으로 SimpleMessageConverter를 사용하며, 이 경우 전송될 객체가 Serializable 인터페이스를 구현해야된다.
  • Serializable 인터페이스를 구현해야 한다는 제약을 피하기 위해 MappingJackson2MessageConverter와 같은 다른 메시지 변환기를 사용할 수도 있다.
  • 다른 메시지 변환기를 사용하기 위해서는 빈으로 선언하면 된다.
    • setTypeIdPropertyName(): 수신된 메시지의 변환 타입을 메시지 수신자가 알기위해서 타입 정보를 전달할 프로퍼티 이름
    • setTypeIdMapping(): 기본적으로 패키지 전체 경로가 포함된 클래스 이름이 전달되는데, 리팩토링할 때마다 수신자에서도 수정이 필요하므로 유연성을 늘리기위해서 크래스명 대신 ‘order’을 전달하도록 한다.
@Bean  
fun messageConverter(objectMapper: ObjectMapper): MappingJackson2MessageConverter {  
   return MappingJackson2MessageConverter().apply {  
      setObjectMapper(objectMapper)  
      setTypeIdPropertyName("_typeId")  
      setTypeIdMappings(  
         mapOf<String, Class<*>>(  
            "order" to Order::class.java  
         )  
      )  
   }  
}

후처리 메시지

  • convertAndSend()의 마지막 인자로 MessagePostProcessor를 전달하면 Message 객체가 생성된 후 이 객체에 우리가 필요한 처리를 할 수 있다.
  • MessagePostProcessor는 함수형 인터페이스다.
jmsTemplate.convertAndSend(orderQueue, order) { message ->  
   message.setStringProperty("X_ORDER_SOURCE", "WEB")  
   message  
}

JMS 메시지 수신하기

  • 메시지를 수신하는 방식에는 두 가지가 있다.
    • 풀 모델: 우리 코드에서 메시지를 요청하고 도착할 때까지 기다린다.
    • 푸시 모델: 메시지가 수신 가능하게 되면 우리 코드로 자동 전달된다.
  • JmsTemplate의 모든 수신 메서드는 풀 모델을 사용한다.
  • 푸시 모델을 사용하기 위해서는 메시지 리스너를 정의해야 된다.
  • 일반적으로 스레드의 실행을 막지 않으므로 일반적으로 푸시 모델이 좋은 선택이다.
    • 단, 푸시 모델은 많은 메시지가 너무 빨리 도착하면 리스너에 과부하가 걸릴 수 있다.

JmsTemplate을 사용해서 메시지 수신하기

  • JmsTemplate 에서는 receive()receiveAndConvert() 를 제공한다.
    • receive(): 원시 메시지를 수신한다.
    • receiveAndConvert(): 메시지를 도메인 타입으로 변환하기 위해 구성된 메시지 변환기를 사용한다.
  • 메시지를 수신하기 위해서는 매번 해당 메소드를 호출해야된다.
  • 위 메서드들은 메시지가 있는지 지정된 시간까지만 기다린다.
@Component  
class JmsOrderReceiver(  
   private val jmsTemplate: JmsTemplate,  
   private val messageConverter: MappingJackson2MessageConverter,  
   private val orderQueue: Destination  
) : OrderReceiver {  
   override fun receiveOrder(): Order? {  
      val message = jmsTemplate.receive(orderQueue)  
         ?: return null  
      return (messageConverter.fromMessage(message)) as Order  
   }  
}

메시지 리스너 선언하기

  • @JmsListener가 지정된 메서드들은 지정된 도착지에 들어오는 메시지에 반응한다.
@JmsListener(destination = "tacocloud.order.queue")  
fun receiveOrder(order: Order) {  
   // something...  
}

RabbitMQ와 AMQP 사용하기

  • AMQP에서 가장 중요한 구현이라고 할 수 있는 RabbitMQ는 JMS보다 더 진보된 메시지 라우팅 전략을 제공한다.
  • AMQP 메시지는 수신자가 리스닝하는 큐와 분리된 거래소(exchange) 이름과 라우팅 키를 주소로 사용한다.
    • 메시지가 RabbitMQ 브로커에 도착하면 주소로 지정된 거래소에 들어간다.
    • 거래소는 하나 이상의 큐에 메시지를 전달할 책임이 있다.
  • 거래소의 종류
    • 기본(default): 브로커가 자동으로 생성하는 특별한 거래소. 해당 메시지의 라우팅 키와 이름이 같은 큐로 메시지를 전달한다. 모든 큐는 자동으로 기본 거래소와 연결된다.
    • 디렉트(direct): 바인딩 키가 해당 메시지의 라우팅 키와 같은 큐에 메시지를 전달한다.
    • 토픽(topic): 바인딩 키(와일드카드 포함)가 해당 메시지의 라우팅 키와 일치하는 하나 이상의 큐에 메시지를 전달한다.
    • 팬아웃(fanout): 바인딩 키가 라우팅 키에 상관없이 모든 연결된 큐에 메시지를 전달한다.
    • 헤더(header): 토픽 거래소와 유사하며, 라우팅 키 대신 메시지 헤더 값을 기반으로 한다는 것만 다르다.
    • 데드 레터(dead letter): 전달 불가능한 즉, 정의된 어떤 거래소-큐 바인딩과도 일치하지 않는 모든 메시지를 보관하는 잡동사니 거래소
  • 가장 간단한 형태는 기본 거래소와 팬아웃 거래소.

RabbitMQ를 스프링에 추가하기

  • 의존성 추가
    • 의존성을 추가하면 AMQP 연결 팩토리와 RabbitTemplate 빈을 생성하는 자동-구성이 수행된다.
<dependency>  
   <groupId>org.springframework.boot</groupId>  
   <artifactId>spring-boot-starter-amqp</artifactId>  
</dependency>
  • rabbitmq 관련 구성 속성
    • 개발 목적이면 로컬 컴퓨터에서 실행되고 5672 포트를 리스닝할 것이며, 인증 정보가 필요 없을 것이다.

RabbitTemplate을 사용해서 메시지 전송하기

@Service  
class RabbitOrderMessagingService(  
   private val rabbitTemplate: RabbitTemplate  
): OrderMessagingService {  
   override fun sendOrder(order: Order) {  
      val converter = rabbitTemplate.messageConverter  
      val props = MessageProperties()  
      val message = converter.toMessage(order, props)  
  
      rabbitTemplate.send("tacocloud.order", message)  
   }  
}
  • 기본 거래소 이름은 빈 문자열인 ““이며, 이것은 RabbitMQ 브로커가 자동으로 생성하는 기본 거래소와 일치한다. 기본 라우팅 키도 ““이다.
    • send() 메서드에서 거래소 이름과, 라우팅 키를 전달하지 않으면 기본값을 사용한다.
    • 이런 기본값은 spring.rabbitmq.template.exchangespring.rabbitmq.tempalte.routing-key 속성을 통해 변경할 수 있다.

메시지 변환기 구성하기

  • RabbitMQ는 아래와 같은 메시지 변환기를 제공한다.
  • MessageConverter 타입을 빈으로 구성하면 메시지 변환기를 변경할 수 있다.
@Bean  
fun messageConverter(): MessageConverter {  
   return Jackson2JsonMessageConverter()  
}

메시지 속성 설정하기

  • 메시지의 일부 헤더를 설정해야 할 경우, MessageProperties 인스턴스를 통해 헤더를 설정할 수 있다.
  • convertAndSend()를 사용할 때는 MessagePostProcessor에서 해야된다.
@Service  
class RabbitOrderMessagingService(  
   private val rabbitTemplate: RabbitTemplate  
) : OrderMessagingService {  
   override fun sendOrder(order: Order) {  
      rabbitTemplate.convertAndSend("tacocloud.order.queue", order) { message ->  
         val props = message.messageProperties  
         props.setHeader("X_ORDER_SOURCE", "WEB")  
         message  
      }  
   }  
}

RabbitMQ로부터 메시지 수신하기

  • RabbitMQ도 JMS처럼 풀 모델과 푸시 모델이 있다.
    • RabbitTemplate을 사용해서 큐로부터 메시지를 가져온다.
    • @RabbitListener가 지정된 메서드로 메시지가 푸시된다.

RabbitTemplate을 사용해서 메시지 수신하기

  • RabbitTemplatereceive()receiveAndConvert() 메서드가 존재한다.

  • 수신 메서드에는 거래소나 라우팅 키를 매개변수로 갖지 않고, 큐의 이름은 매개변수로 갖는다.

  • 대부분의 수신 메서드는 메시지의 수신 타임아웃을 나타내기 위해 long 타입의 매개변수를 갖는다. (기본값은 1ms)

    • spring.rabbitmq.template.receive-timeout 를 통해 기본 타임아웃 값을 설정할 수 있다.
  • 타임아웃 내에 메시지가 없으면 null 값이 반환된다.

  • receive() 사용하는 예시

@Component  
class RabbitOrderReceiver(  
   private val rabbitTemplate: RabbitTemplate,  
   private val messageConverter: MessageConverter  
) {  
   fun receiveOrder(): Order? {  
      val message = rabbitTemplate.receive("tacocloud.order.queue")  
         ?: return null  
      return messageConverter.fromMessage(message) as Order  
   } 
}
  • receiveAndConvert() 사용하는 예시
    • ParameterizedTypeReference를 통해서 타입-안전 캐스팅이 가능하다.
@Component  
class RabbitOrderReceiver(  
   private val rabbitTemplate: RabbitTemplate,  
   private val messageConverter: MessageConverter  
) {  
   fun receiveOrder(): Order? {  
      return rabbitTemplate.receiveAndConvert(  
         "tacocloud.order.queue",  
         object : ParameterizedTypeReference<Order>() {}  
      )  
   }  
}

리스너를 사용해서 RabbitMQ 메시지 처리하기

  • @RabbitListner 애노테이션을 메서드 위에 지정하면 된다.
@Component  
class OrderListener {  
  
   companion object {  
      private val logger = LoggerFactory.getLogger(OrderListener::class.java)  
   }  
  
   @RabbitListener(queues = ["tacocloud.order.queue"])  
   fun receiveOrder(order: Order) {  
      logger.info(order.deliveryName)  
   }  
}

리스너가 사용할 큐 생성하기

  • 빈으로 등록하면 된다.
@Bean  
fun orderQueue(): Queue {  
   return Queue("tacocloud.order.queue", false)  
}

카프카 사용하기

  • 카프카는 높은 확장성을 제공하는 클러스터로 실행되도록 설계되었다.
  • 클러스터의 모든 카프카 인스턴스에 걸쳐 토픽을 파티션으로 분할하여 메시지를 관리한다.
  • RabbitMQ가 거래소와 큐를 사용해서 메시지를 처리하는 반면, 카프카는 토픽만 사용한다.
  • 카프카의 토픽은 클러스터의 모든 브로커에 걸쳐 복제된다.
  • 각 토픽은 여러 개의 파티션으로 분할될 수 있다. 이 경우 클러스터의 각 노드는 한 토픽의 하나 이상의 파티션의 리더가 된다.

카프카 사용을 위해 스프링 설정하기

  • 의존성 추가
<dependency>  
   <groupId>org.springframework.kafka</groupId>  
   <artifactId>spring-kafka</artifactId>  
</dependency>
  • KafkaTemplate은 기본적으로 localhost에서 실행되면서 9092 포트를 리스닝하는 카프카 브로커를 사용한다.
  • spring.kafka.bootstrap-servers 속성: 카프카 클러스터로의 초기 연결에 사용되는 하나 이상의 카프카 서버들의 위치
spring:
	kafka:  
	  bootstrap-servers:  
	    - kafka.tacocloud.com:9092  
	    - kafka.tacocloud.com:9093  
	    - kafka.tacocloud.com:9094

KafkaTemplate을 사용해서 메시지 전송하기

  • KafkaTemplate에는 send()sendDefault() 메서드가 있다.
    • KafkaTemplate는 제네릭 타입을 사용하고 있고, 메시지를 전송할 때 직접 도메인 타입을 처리할 수 있기 때문에, send() 메서드에서 메시지 타입 변환이 일어난다.
  • send() 메서드에 있을 수 있는 매개변수
    • 메시지가 전송될 토픽
    • 토픽 데이터를 쓰는 파티션(optional)
    • 레코드 전송 키(optional)
    • 타임스탬프(optional, 기본값은 System.currentTimeMillis())
    • 페이로드(메시지에 적재된 순수한 데이터)
  • send() 메서드에 ProducerRecord를 전송하는 것도 있따.
    • ProducerRecord: 위에 나와 있는 매개변수르 하나에 담은 타입
@Service  
class KafkaOrderMessagingService(  
   private val kafkaTemplate: KafkaTemplate<String, Order>  
): OrderMessagingService {  
   override fun sendOrder(order: Order) {  
      kafkaTemplate.send("tacocloud.orders.topic", order)  
   }  
}
  • sendDefault() 메서드를 사용하면 기본 토픽으로 메시지를 전송한다.
    • spring.kafka.template.default-topic 속성에 기본 토픽을 설정할 수 있다.
class KafkaOrderMessagingService(  
   private val kafkaTemplate: KafkaTemplate<String, Order>  
): OrderMessagingService {  
  
   override fun sendOrder(order: Order) {  
      kafkaTemplate.sendDefault(order)  
   }  
}

카프카 리스너 작성하기

  • 카프카는 KafkaTemplate 에서 메시지 수신을 제공하지 않는다.
  • 리스너를 작성해서 메시지를 가져올 수 있다.
  • @kafkaListener 애노테이션를 메서드에 정의하면 된다.
@KafkaListener(topics = ["tacocloud.orders.topic"])  
fun receiveOrder(order: Order) {  
   logger.info(order.deliveryName)  
}
  • 메시지의 추가적인 메타데이터가 필요하다면 ConsumeRecordMessage 객체도 인자로 받을 수 있다.
@KafkaListener(topics = ["tacocloud.orders.topic"])  
fun receiveOrder(order: Order, record: ConsumerRecord<String, Order>) {  
   logger.info("Received from partition {} with timestamp {}", record.partition(), record.timestamp())  
   logger.info(order.deliveryName)  
}