- 애플리케이션 코드를 개발할 때는 명령형과 리액티브의 두 가지 형태로 코드를 작성할 수 있다.
- 명령형: 순차적으로 연속된 작업이 있으며, 각 작업은 한 번에 하나씩 그리고 이전 작업 다음에 실행된다.
- 리액티브: 데이터 처리를 위해 일련의 작업들이 정의되지만, 이 작업들은 병렬로 실행될 수 있다. 그리고 각작업은 부분 집합의 데이터를 처리할 수 있으며, 처리가 끝난 데이터를 다음 작업에 넘겨주고 다른 부분 집합의 데이터로 계속 작업할 수 있다.
리액티브 프로그래밍 이해하기#
- 명령형 프로그래밍
- 한 번에 하나씩 만나는 순서대로 실행되는 명령어들로 코드를 작성
- 프로그램에서는 하나의 작업이 완전히 끝나기를 기다렸다가 그당므 작업을 수행한다.
- 각 단계마다 처리되는 데이터는 전체를 처리할 수 있도록 사용할 수 있어야한다.
- 하지만 이 작업이 원격지 서버로부터 데이터베이스에 데이터를 쓰거나 가져오는 것이라면 이 작업이 완료될 때까지 아무 것도 할 수 없다.
- 자바를 비롯해서 대부분의 프로그래밍 언어는 동시 프로그래밍을 지원한다.
- 그러나 다중 스레드로 동시성을 관리하는 것은 복잡해져서 쉽지 않다.
- 리액티브 프로그래밍은 함수적이면서 선언적이다.
- 순차적으로 수행되는 작업 단계를 나타낸 것이 아니라 데이터가 흘러가는 파이프라인이나 스트림을 포함한다.
- 리액티브 스트림은 데이터 전체를 사용할 수 있을 때까지 기다리지 않고 사용 가능한 데이터가 있을 때마다 처리되므로 사실상 입력되는 데이터는 무한할 수 있다.
리액티브 스트림 정의하기#
- 리액티브 스트림은 차단되지 않는 백 프레셔(backpressure)를 갖는 비동기 스트림 처리의 표준을 제공하는 것이 목적이다.
- 자바 스트림 vs. 리액티브 스트림
- 자바 스트림은 대개 동기화되어 있고 한정된 데이터로 작업을 수행한다.
- 리액티브 스트림은 무한 데이터셋을 비롯해서 어떤 크기의 데이터셋이건 비동기 처리를 지원한다. 그리고 실시간으로 데이터를 처리하며, 백 프레셔를 사용해서 데이터 전달 폭주를 막는다.
- 리액티브 스트림은 4개의 인터페이스인
Publisher, Subscriber, Subscription, Processor로 요약할 수 있다. Publisher는 하나의 Subscription당 하나의 Subscriber에 발행하는 데이터는 생성한다.Publisher 인터페이스에는 Subscriber가 Publisher를 구독 신청할 수 있는 subscribe() 메서드 한 개가 선언되어 있다.Subscriber가 구독 신청되면 Publisher로부터 이벤트를 수신할 수 있다.Subscriber가 수신할 첫 번째 이벤트는 onSubsribe() 호출을 통해 이루어진다.Publisher가 onSubsribe()를 호출할 때 이 메서드의 인자로 Subscription 객체를 Subscriber에 전달한다.Subscriber는 Subscription 객체를 통해서 구독을 관리할 수 있다.
Subscriber는 Subscription의 request()를 호출하여 전송되는 데이터를 요청하거나, 또는 더 이상 데이터를 수신하지 않고 취소한다는 것을 나타내기 위해 cancel()을 호출할 수 있다.request()를 호출할 때 Subscriber는 받고자 하는 데이터 항목 수를 나타내는 long 타입의 값을 인자로 전달한다. 바로 이것이 백 프레셔이며, Subscriber가 처리할 수 있는 것보다 더 많은 데이터를 Publisher가 전송하는 것을 막아준다.- 요청된 수의 데이터를
Publisher가 전송한 후에 Subscriber는 다시 request()를 호출하여 더 많은 요청을 할 수 있다. 
Subscriber의 데이터 요청이 완료되면 데이터가 스트림을 통해 전달되기 시작한다. 이 때 onNext() 메서드가 호출되어 Publisher가 전송하는 데이터가 Subscriber에게 전달되며, 만일 에러가 생길 떄는 onError()가 호출된다.Publisher에서 전송할 데이터가 없고 더 이상의 데이터를 생성하지 않는다면 Publisher가 onComplete()를 호출하여 작업이 끝났다고 Subscriber에게 알려준다.Processor 인터페이스는 Subscriber 인터페이스와 Publisher 인터페이스를 결합한 것이다.Subscriber 역할로 Processor는 데이터를 수신하고 처리한다. 그다음에 역할을 바꾸어 Publisher 역할로 처리 결과를 자신의 Subscriber 들에게 발행한다.
- 리액티브 스트림 인터페이스는 스트림을 구성하는 기능이 없다. 이에 따라 프로젝트 리액터에서는 리액티브 스트림을 구성하는 API를 제공하여 리액티브 스트림 인터페이스를 구현하였다.
리액터 시작하기#
Mono.just("Craig")
.map { n -> n.uppercase() }
.map { cn -> "Hello, $cn!" }
.subscribe { message -> println(message) }
- 위 리액티브 코드는 데이터가 전달되는 파이프라인을 구성하는 것이다.
- 파이프라인의 각 단계에서는 어떻게 하든 데이터가 변경된다.
- 각 오퍼레이션은 같은 스레드로 실행되거나 다른 스레드로 실행될 수 있다.
- 리액터의 두 가지 핵심 타입
Flux: 0, 1 또는 다수의 데이터를 갖는 파이프라인을 나타낸다.Mono: 하나의 데이터 항목만 갖는 데이터셋에 최적화된 리액티브 타입이다.- 두 타입 모두 리액티브 스트림의
Publisher 인터페이스를 구현한 것이다. - 위의 예에서는 세 개의
Mono가 있으며, just() 오퍼레이션은 첫 번째 것을 생성한다.
리액티브 플로우의 다이어그램#

리액터 의존성 추가하기#
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
</dependency>
리액티브 오퍼레이션 적용하기#
Flux와 Mono는 래익터가 제공하는 가장 핵심적인 구성 요소다.Flux와 Mono가 제공하는 오퍼레이션들은 두 타입을 함께 결합하여 데이터가 전달될 수 있는 파이프라인을 생성한다.Flux와 Mono에는 500개 이상의 오펴레이션이 있으며, 각 오퍼레이션에는 다음과 같이 분류될 수 있다.- 생성(creation) 오퍼레이션
- 조합(combination) 오퍼레이션
- 변환(transformation) 오퍼레이션
- 로직(logic) 오퍼레이션
리액티브 타입 생성하기#
Flux나 Mono의 just() 메서드를 사용하여 리액티브 타입을 생성할 수 있다.- 리액터의
StepVerifier를 사용해서 Flux나 Mono를 테스트할 수 있다.
@Test
fun createFlux_just() {
val fruitFlux: Flux<String> = Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry")
StepVerifier.create(fruitFlux)
.expectNext("Apple")
.expectNext("Orange")
.expectNext("Grape")
.expectNext("Banana")
.expectNext("Strawberry")
.verifyComplete()
}
컬렉션으로부터 생성하기#

fromArray()를 통해서 배열로 Flux 생성이 가능하다.
@Test
fun createFlux_fromArray() {
val fruits: Array<String> = arrayOf("Apple", "Orange", "Grape", "Banana", "Strawberry")
val fruitFlux = Flux.fromArray(fruits)
}
fromIterable()을 통해서 List, Set, Iterable 컬렉션으로부터 Flux 생성이 가능하다.
@Test
fun createFlux_fromIterable() {
val fruits: List<String> = listOf("Apple", "Orange", "Grape", "Banana", "Strawberry")
val fruitFlux = Flux.fromIterable(fruits)
}
fromStream()을 통해서 Stream 객체를 Flux로 생성이 가능하다.
val fruits: Stream<String> = Stream.of("Apple", "Orange", "Grape", "Banana", "Strawberry")
val fruitFlux = Flux.fromStream(fruits)
Flux 데이터 생성하기#
range()를 사용해서, 1씩 증가하는 Flux 생성이 가능하다.- 첫번째 인자 값부터 시작해서, 1씩 증가하여 두번째 인자만큼 반복한다.

@Test
fun creatFlux_range() {
val rangeFlux = Flux.range(1, 5)
StepVerifier.create(rangeFlux)
.expectNext(1)
.expectNext(2)
.expectNext(3)
.expectNext(4)
.expectNext(5)
.verifyComplete()
}
interval() 메서드를 통해서, 0부터 시작해서 duration 시간마다 값을 1씩 증가 시키면서 방출한다.- 최대값이 지정되지 않으므로, 무한정 실행된다.
take() 오퍼레이션을 사용해서 첫 번째 5개의 항목으로 결과를 제한할 수 있다.

@Test
fun createFlux_interval() {
val intervalFlux = Flux.interval(Duration.ofSeconds(1))
.take(5)
StepVerifier.create(intervalFlux)
.expectNext(0L)
.expectNext(1L)
.expectNext(2L)
.expectNext(3L)
.expectNext(4L)
.verifyComplete()
}
리액티브 타입 조합하기#
mergeWith()로 다른 Flux와 결합할 수 있다.- 일반적으로
Flux는 가능한 빨리 데이터를 방출한다. delayElements()를 사용해서 500밀리초마다 하나씩 방출하도록 한다.delaySubscription()을 사용해서 250밀리초가 지난 후에 구독 및 데이터를 방출하도록 한다.mergeWith()는 소스 Flux들의 값이 완벽하게 번갈아 방출되게 보장할 수 없다.

@Test
fun mergeFluxes() {
val characterFlux = Flux.just("Garfield", "Kojak", "Barbossa")
.delayElements(Duration.ofMillis(500))
val foodFlux = Flux.just("Lasagna", "Lollipops", "Apples")
.delaySubscription(Duration.ofMillis(250))
.delayElements(Duration.ofMillis(500))
val mergedFlux = characterFlux.mergeWith(foodFlux)
StepVerifier.create(mergedFlux)
.expectNext("Garfield")
.expectNext("Lasagna")
.expectNext("Kojak")
.expectNext("Lollipops")
.expectNext("Barbossa")
.expectNext("Apples")
.verifyComplete()
}
zip() 오퍼레이션은 각 Flux 소스로부터 한 항목씩 번갈아 가져와서 새로운 Flux를 생성한다.- 각 소스
Flux가 순서대로 방출되어 Tuple2를 만든다.

@Test
fun zipFluxes() {
val characterFlux = Flux.just("Garfield", "Kojak", "Barbossa")
val foodFlux = Flux.just("Lasagna", "Lollipops", "Apples")
val zippedFLux = Flux.zip(characterFlux, foodFlux)
StepVerifier.create(zippedFLux)
.expectNextMatches{ it.t1 == "Garfield" && it.t2 == "Lasagna" }
.expectNextMatches{ it.t1 == "Kojak" && it.t2 == "Lollipops" }
.expectNextMatches{ it.t1 == "Barbossa" && it.t2 == "Apples" }
.verifyComplete()
}
zip() 오퍼레이션 마지막 파라미터로 람다를 념겨주면, 생성할 객체를 정의할 수 있다.

@Test
fun zipFluxesToObject() {
val characterFlux = Flux.just("Garfield", "Kojak", "Barbossa")
val foodFlux = Flux.just("Lasagna", "Lollipops", "Apples")
val zippedFLux = Flux.zip(characterFlux, foodFlux) { c, f -> "$c eats $f" }
StepVerifier.create(zippedFLux)
.expectNext("Garfield eats Lasagna")
.expectNext("Kojak eats Lollipops")
.expectNext("Barbossa eats Apples")
.verifyComplete()
}
firstWithSignal() 오퍼레이션은 두 Flux 객체 중 먼저 방출하는 Flux의 값을 선택해서 이 값을 발행한다.

@Test
fun firstFlux() {
val slowFlux = Flux.just(4, 5, 6)
.delayElements(Duration.ofMillis(500))
.delaySubscription(Duration.ofMillis(250))
val fastFlux = Flux.just(1, 2, 3)
.delayElements(Duration.ofMillis(500))
val firstFlux = Flux.firstWithSignal(slowFlux, fastFlux)
StepVerifier.create(firstFlux)
.expectNext(1)
.expectNext(2)
.expectNext(3)
.verifyComplete()
}
리액티브 스트림의 변환과 필터링#
- 리액티브 타입으로부터 데이터 필터링하기
skip(): 앞의 n개 건너 뛰기 or 지정된 시간이 경과할 때까지 처음의 여러 항목 건너 뛰기take(): 처음 n개 항목만 발행하기 or 지정된 시간이 경과할 때 동안만 발행하기filter(): 조건식이 true인 경우만 발행하기
- 리액티브 데이터 매핑하기
map(): 변환을 수행하는 Flux를 생성한다.- 각 항목이 소스
Flux로부터 발행될 때 동기적으로 매핑이 수행된다. - 비동기적으로 매핑을 수행하고 싶다면
flatMap() 오퍼레이션을 사용해야 한다.
flatMap(): 각 객체를 새로운 Mono나 Flux로 매핑하며, 해당 Mono나 Flux들의 결과는 하나의 새로운 Flux가 된다.- 아래에
String 타입의 입력 문자열을 String 타입의 Mono로 변환하는 람다가 지정되어 있다. subscribeOn() 메서드를 통해 다수의 입력 객체들의 map() 오퍼레이션이 비동기적으로 병행 수행될 수 있다.subscribe()는 구독 요청하고 실제로 구독하는 반면, subscribeOn()은 구독이 동시적으로 처리되어야 한다는 것을 지정한다.

- 리액티브 스트림의 데이터 버퍼링하기
buffer(): 데이터 스트림을 작은 덩어리로 분할

flatMap과 함께 사용하여 하나의 Flux를 별도의 스레드에서 병행으로 처리하게 할 수 있다.
Flux가 방출하는 모든 항목을 List로 모을 필요가 있다면 인자를 전달하지 않고 buffer()를 호출하면 된다.collectList(): Flux가 발행한 모든 항목을 포함하는 List를 방출하는 새로운 Mono를 생성한다.collectMap(): Flux가 방출하는 모든 항목을 포함하는 Map을 방출하는 새로운 Mono를 생선한다.- 인자로 키를 지정하는 함수를 넘긴다.

리액티브 타입에 로직 오퍼레이션 수행하기#
Mono나 Flux가 발행한 항목이 어떤 조건과 일치하는지만 알아야 할 경우도 있다.all() 이나 any() 오퍼레이션이 그런 로직을 수행한다.
