• 애플리케이션 코드를 개발할 때는 명령형과 리액티브의 두 가지 형태로 코드를 작성할 수 있다.
    • 명령형: 순차적으로 연속된 작업이 있으며, 각 작업은 한 번에 하나씩 그리고 이전 작업 다음에 실행된다.
    • 리액티브: 데이터 처리를 위해 일련의 작업들이 정의되지만, 이 작업들은 병렬로 실행될 수 있다. 그리고 각작업은 부분 집합의 데이터를 처리할 수 있으며, 처리가 끝난 데이터를 다음 작업에 넘겨주고 다른 부분 집합의 데이터로 계속 작업할 수 있다.

리액티브 프로그래밍 이해하기

  • 명령형 프로그래밍
    • 한 번에 하나씩 만나는 순서대로 실행되는 명령어들로 코드를 작성
    • 프로그램에서는 하나의 작업이 완전히 끝나기를 기다렸다가 그당므 작업을 수행한다.
    • 각 단계마다 처리되는 데이터는 전체를 처리할 수 있도록 사용할 수 있어야한다.
    • 하지만 이 작업이 원격지 서버로부터 데이터베이스에 데이터를 쓰거나 가져오는 것이라면 이 작업이 완료될 때까지 아무 것도 할 수 없다.
  • 자바를 비롯해서 대부분의 프로그래밍 언어는 동시 프로그래밍을 지원한다.
    • 그러나 다중 스레드로 동시성을 관리하는 것은 복잡해져서 쉽지 않다.
  • 리액티브 프로그래밍은 함수적이면서 선언적이다.
    • 순차적으로 수행되는 작업 단계를 나타낸 것이 아니라 데이터가 흘러가는 파이프라인이나 스트림을 포함한다.
    • 리액티브 스트림은 데이터 전체를 사용할 수 있을 때까지 기다리지 않고 사용 가능한 데이터가 있을 때마다 처리되므로 사실상 입력되는 데이터는 무한할 수 있다.

리액티브 스트림 정의하기

  • 리액티브 스트림은 차단되지 않는 백 프레셔(backpressure)를 갖는 비동기 스트림 처리의 표준을 제공하는 것이 목적이다.
  • 자바 스트림 vs. 리액티브 스트림
    • 자바 스트림은 대개 동기화되어 있고 한정된 데이터로 작업을 수행한다.
    • 리액티브 스트림은 무한 데이터셋을 비롯해서 어떤 크기의 데이터셋이건 비동기 처리를 지원한다. 그리고 실시간으로 데이터를 처리하며, 백 프레셔를 사용해서 데이터 전달 폭주를 막는다.
  • 리액티브 스트림은 4개의 인터페이스인 Publisher, Subscriber, Subscription, Processor로 요약할 수 있다.
  • Publisher는 하나의 Subscription당 하나의 Subscriber에 발행하는 데이터는 생성한다.
  • Publisher 인터페이스에는 SubscriberPublisher를 구독 신청할 수 있는 subscribe() 메서드 한 개가 선언되어 있다.
  • Subscriber가 구독 신청되면 Publisher로부터 이벤트를 수신할 수 있다.
    • Subscriber가 수신할 첫 번째 이벤트는 onSubsribe() 호출을 통해 이루어진다.
    • PublisheronSubsribe()를 호출할 때 이 메서드의 인자로 Subscription 객체를 Subscriber에 전달한다.
    • SubscriberSubscription 객체를 통해서 구독을 관리할 수 있다.
  • SubscriberSubscriptionrequest()를 호출하여 전송되는 데이터를 요청하거나, 또는 더 이상 데이터를 수신하지 않고 취소한다는 것을 나타내기 위해 cancel()을 호출할 수 있다.
    • request()를 호출할 때 Subscriber는 받고자 하는 데이터 항목 수를 나타내는 long 타입의 값을 인자로 전달한다. 바로 이것이 백 프레셔이며, Subscriber가 처리할 수 있는 것보다 더 많은 데이터를 Publisher가 전송하는 것을 막아준다.
    • 요청된 수의 데이터를 Publisher가 전송한 후에 Subscriber는 다시 request()를 호출하여 더 많은 요청을 할 수 있다.
  • Subscriber의 데이터 요청이 완료되면 데이터가 스트림을 통해 전달되기 시작한다. 이 때 onNext() 메서드가 호출되어 Publisher가 전송하는 데이터가 Subscriber에게 전달되며, 만일 에러가 생길 떄는 onError()가 호출된다.
  • Publisher에서 전송할 데이터가 없고 더 이상의 데이터를 생성하지 않는다면 PublisheronComplete()를 호출하여 작업이 끝났다고 Subscriber에게 알려준다.
  • Processor 인터페이스는 Subscriber 인터페이스와 Publisher 인터페이스를 결합한 것이다.
    • Subscriber 역할로 Processor는 데이터를 수신하고 처리한다. 그다음에 역할을 바꾸어 Publisher 역할로 처리 결과를 자신의 Subscriber 들에게 발행한다.
  • 리액티브 스트림 인터페이스는 스트림을 구성하는 기능이 없다. 이에 따라 프로젝트 리액터에서는 리액티브 스트림을 구성하는 API를 제공하여 리액티브 스트림 인터페이스를 구현하였다.

리액터 시작하기

Mono.just("Craig")  
   .map { n -> n.uppercase() }  
   .map { cn -> "Hello, $cn!" }  
   .subscribe { message -> println(message) }
  • 위 리액티브 코드는 데이터가 전달되는 파이프라인을 구성하는 것이다.
    • 파이프라인의 각 단계에서는 어떻게 하든 데이터가 변경된다.
    • 각 오퍼레이션은 같은 스레드로 실행되거나 다른 스레드로 실행될 수 있다.
  • 리액터의 두 가지 핵심 타입
    • Flux: 0, 1 또는 다수의 데이터를 갖는 파이프라인을 나타낸다.
    • Mono: 하나의 데이터 항목만 갖는 데이터셋에 최적화된 리액티브 타입이다.
    • 두 타입 모두 리액티브 스트림의 Publisher 인터페이스를 구현한 것이다.
    • 위의 예에서는 세 개의 Mono가 있으며, just() 오퍼레이션은 첫 번째 것을 생성한다.

리액티브 플로우의 다이어그램

리액터 의존성 추가하기

<dependency>  
   <groupId>io.projectreactor</groupId>  
   <artifactId>reactor-core</artifactId>  
</dependency>  
  
<dependency>  
   <groupId>io.projectreactor</groupId>  
   <artifactId>reactor-test</artifactId>  
</dependency>

리액티브 오퍼레이션 적용하기

  • FluxMono는 래익터가 제공하는 가장 핵심적인 구성 요소다.
  • FluxMono가 제공하는 오퍼레이션들은 두 타입을 함께 결합하여 데이터가 전달될 수 있는 파이프라인을 생성한다.
  • FluxMono에는 500개 이상의 오펴레이션이 있으며, 각 오퍼레이션에는 다음과 같이 분류될 수 있다.
    • 생성(creation) 오퍼레이션
    • 조합(combination) 오퍼레이션
    • 변환(transformation) 오퍼레이션
    • 로직(logic) 오퍼레이션

리액티브 타입 생성하기

  • FluxMonojust() 메서드를 사용하여 리액티브 타입을 생성할 수 있다.
    • 리액터의 StepVerifier를 사용해서 FluxMono를 테스트할 수 있다.
@Test  
fun createFlux_just() {  
    val fruitFlux: Flux<String> = Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry")  
  
    StepVerifier.create(fruitFlux)  
       .expectNext("Apple")  
       .expectNext("Orange")  
       .expectNext("Grape")  
       .expectNext("Banana")  
       .expectNext("Strawberry")  
       .verifyComplete()  
}

컬렉션으로부터 생성하기

  • fromArray()를 통해서 배열로 Flux 생성이 가능하다.
@Test  
fun createFlux_fromArray() {  
    val fruits: Array<String> = arrayOf("Apple", "Orange", "Grape", "Banana", "Strawberry")  
    val fruitFlux = Flux.fromArray(fruits)  
}
  • fromIterable()을 통해서 List, Set, Iterable 컬렉션으로부터 Flux 생성이 가능하다.
@Test  
fun createFlux_fromIterable() {  
    val fruits: List<String> = listOf("Apple", "Orange", "Grape", "Banana", "Strawberry")  
    val fruitFlux = Flux.fromIterable(fruits)  
}
  • fromStream()을 통해서 Stream 객체를 Flux로 생성이 가능하다.
val fruits: Stream<String> = Stream.of("Apple", "Orange", "Grape", "Banana", "Strawberry")  
val fruitFlux = Flux.fromStream(fruits)

Flux 데이터 생성하기

  • range()를 사용해서, 1씩 증가하는 Flux 생성이 가능하다.
  • 첫번째 인자 값부터 시작해서, 1씩 증가하여 두번째 인자만큼 반복한다.

@Test  
fun creatFlux_range() {  
    val rangeFlux = Flux.range(1, 5)  
  
    StepVerifier.create(rangeFlux)  
       .expectNext(1)  
       .expectNext(2)  
       .expectNext(3)  
       .expectNext(4)  
       .expectNext(5)  
       .verifyComplete()  
}
  • interval() 메서드를 통해서, 0부터 시작해서 duration 시간마다 값을 1씩 증가 시키면서 방출한다.
    • 최대값이 지정되지 않으므로, 무한정 실행된다.
    • take() 오퍼레이션을 사용해서 첫 번째 5개의 항목으로 결과를 제한할 수 있다.

@Test  
fun createFlux_interval() {  
    val intervalFlux = Flux.interval(Duration.ofSeconds(1))  
       .take(5)  
  
    StepVerifier.create(intervalFlux)  
       .expectNext(0L)  
       .expectNext(1L)  
       .expectNext(2L)  
       .expectNext(3L)  
       .expectNext(4L)  
       .verifyComplete()  
}

리액티브 타입 조합하기

  • mergeWith()로 다른 Flux와 결합할 수 있다.
    • 일반적으로 Flux는 가능한 빨리 데이터를 방출한다.
    • delayElements()를 사용해서 500밀리초마다 하나씩 방출하도록 한다.
    • delaySubscription()을 사용해서 250밀리초가 지난 후에 구독 및 데이터를 방출하도록 한다.
    • mergeWith()는 소스 Flux들의 값이 완벽하게 번갈아 방출되게 보장할 수 없다.

@Test  
fun mergeFluxes() {  
    val characterFlux = Flux.just("Garfield", "Kojak", "Barbossa")  
       .delayElements(Duration.ofMillis(500))  
    val foodFlux = Flux.just("Lasagna", "Lollipops", "Apples")  
       .delaySubscription(Duration.ofMillis(250))  
       .delayElements(Duration.ofMillis(500))  
  
    val mergedFlux = characterFlux.mergeWith(foodFlux)  
  
    StepVerifier.create(mergedFlux)  
       .expectNext("Garfield")  
       .expectNext("Lasagna")  
       .expectNext("Kojak")  
       .expectNext("Lollipops")  
       .expectNext("Barbossa")  
       .expectNext("Apples")  
       .verifyComplete()  
}
  • zip() 오퍼레이션은 각 Flux 소스로부터 한 항목씩 번갈아 가져와서 새로운 Flux를 생성한다.
    • 각 소스 Flux가 순서대로 방출되어 Tuple2를 만든다.

@Test  
fun zipFluxes() {  
    val characterFlux = Flux.just("Garfield", "Kojak", "Barbossa")  
    val foodFlux = Flux.just("Lasagna", "Lollipops", "Apples")  
  
    val zippedFLux = Flux.zip(characterFlux, foodFlux)  
    StepVerifier.create(zippedFLux)  
       .expectNextMatches{ it.t1 == "Garfield" && it.t2 == "Lasagna" }  
       .expectNextMatches{ it.t1 == "Kojak" && it.t2 == "Lollipops" }  
       .expectNextMatches{ it.t1 == "Barbossa" && it.t2 == "Apples" }  
       .verifyComplete()  
}
  • zip() 오퍼레이션 마지막 파라미터로 람다를 념겨주면, 생성할 객체를 정의할 수 있다.

@Test  
fun zipFluxesToObject() {  
    val characterFlux = Flux.just("Garfield", "Kojak", "Barbossa")  
    val foodFlux = Flux.just("Lasagna", "Lollipops", "Apples")  
  
    val zippedFLux = Flux.zip(characterFlux, foodFlux) { c, f -> "$c eats $f" }  
  
    StepVerifier.create(zippedFLux)  
       .expectNext("Garfield eats Lasagna")  
       .expectNext("Kojak eats Lollipops")  
       .expectNext("Barbossa eats Apples")  
       .verifyComplete()  
}
  • firstWithSignal() 오퍼레이션은 두 Flux 객체 중 먼저 방출하는 Flux의 값을 선택해서 이 값을 발행한다.

@Test  
fun firstFlux() {  
    val slowFlux = Flux.just(4, 5, 6)  
       .delayElements(Duration.ofMillis(500))  
       .delaySubscription(Duration.ofMillis(250))  
    val fastFlux = Flux.just(1, 2, 3)  
       .delayElements(Duration.ofMillis(500))  
  
    val firstFlux = Flux.firstWithSignal(slowFlux, fastFlux)  
  
    StepVerifier.create(firstFlux)  
       .expectNext(1)  
       .expectNext(2)  
       .expectNext(3)  
       .verifyComplete()  
}

리액티브 스트림의 변환과 필터링

  • 리액티브 타입으로부터 데이터 필터링하기
    • skip(): 앞의 n개 건너 뛰기 or 지정된 시간이 경과할 때까지 처음의 여러 항목 건너 뛰기
    • take(): 처음 n개 항목만 발행하기 or 지정된 시간이 경과할 때 동안만 발행하기
    • filter(): 조건식이 true인 경우만 발행하기
  • 리액티브 데이터 매핑하기
    • map(): 변환을 수행하는 Flux를 생성한다.
      • 각 항목이 소스 Flux로부터 발행될 때 동기적으로 매핑이 수행된다.
      • 비동기적으로 매핑을 수행하고 싶다면 flatMap() 오퍼레이션을 사용해야 한다.
    • flatMap(): 각 객체를 새로운 MonoFlux로 매핑하며, 해당 MonoFlux들의 결과는 하나의 새로운 Flux가 된다.
      • 아래에 String 타입의 입력 문자열을 String 타입의 Mono로 변환하는 람다가 지정되어 있다.
      • subscribeOn() 메서드를 통해 다수의 입력 객체들의 map() 오퍼레이션이 비동기적으로 병행 수행될 수 있다.
      • subscribe()는 구독 요청하고 실제로 구독하는 반면, subscribeOn()은 구독이 동시적으로 처리되어야 한다는 것을 지정한다.
  • 리액티브 스트림의 데이터 버퍼링하기
    • buffer(): 데이터 스트림을 작은 덩어리로 분할
      • flatMap과 함께 사용하여 하나의 Flux를 별도의 스레드에서 병행으로 처리하게 할 수 있다.
    • Flux가 방출하는 모든 항목을 List로 모을 필요가 있다면 인자를 전달하지 않고 buffer()를 호출하면 된다.
    • collectList(): Flux가 발행한 모든 항목을 포함하는 List를 방출하는 새로운 Mono를 생성한다.
    • collectMap(): Flux가 방출하는 모든 항목을 포함하는 Map을 방출하는 새로운 Mono를 생선한다.
      • 인자로 키를 지정하는 함수를 넘긴다.

리액티브 타입에 로직 오퍼레이션 수행하기

  • MonoFlux가 발행한 항목이 어떤 조건과 일치하는지만 알아야 할 경우도 있다.
  • all() 이나 any() 오퍼레이션이 그런 로직을 수행한다.