- 애플리케이션 코드를 개발할 때는 명령형과 리액티브의 두 가지 형태로 코드를 작성할 수 있다.
- 명령형: 순차적으로 연속된 작업이 있으며, 각 작업은 한 번에 하나씩 그리고 이전 작업 다음에 실행된다.
- 리액티브: 데이터 처리를 위해 일련의 작업들이 정의되지만, 이 작업들은 병렬로 실행될 수 있다. 그리고 각작업은 부분 집합의 데이터를 처리할 수 있으며, 처리가 끝난 데이터를 다음 작업에 넘겨주고 다른 부분 집합의 데이터로 계속 작업할 수 있다.
리액티브 프로그래밍 이해하기#
- 명령형 프로그래밍
- 한 번에 하나씩 만나는 순서대로 실행되는 명령어들로 코드를 작성
- 프로그램에서는 하나의 작업이 완전히 끝나기를 기다렸다가 그당므 작업을 수행한다.
- 각 단계마다 처리되는 데이터는 전체를 처리할 수 있도록 사용할 수 있어야한다.
- 하지만 이 작업이 원격지 서버로부터 데이터베이스에 데이터를 쓰거나 가져오는 것이라면 이 작업이 완료될 때까지 아무 것도 할 수 없다.
- 자바를 비롯해서 대부분의 프로그래밍 언어는 동시 프로그래밍을 지원한다.
- 그러나 다중 스레드로 동시성을 관리하는 것은 복잡해져서 쉽지 않다.
- 리액티브 프로그래밍은 함수적이면서 선언적이다.
- 순차적으로 수행되는 작업 단계를 나타낸 것이 아니라 데이터가 흘러가는 파이프라인이나 스트림을 포함한다.
- 리액티브 스트림은 데이터 전체를 사용할 수 있을 때까지 기다리지 않고 사용 가능한 데이터가 있을 때마다 처리되므로 사실상 입력되는 데이터는 무한할 수 있다.
리액티브 스트림 정의하기#
- 리액티브 스트림은 차단되지 않는 백 프레셔(backpressure)를 갖는 비동기 스트림 처리의 표준을 제공하는 것이 목적이다.
- 자바 스트림 vs. 리액티브 스트림
- 자바 스트림은 대개 동기화되어 있고 한정된 데이터로 작업을 수행한다.
- 리액티브 스트림은 무한 데이터셋을 비롯해서 어떤 크기의 데이터셋이건 비동기 처리를 지원한다. 그리고 실시간으로 데이터를 처리하며, 백 프레셔를 사용해서 데이터 전달 폭주를 막는다.
- 리액티브 스트림은 4개의 인터페이스인
Publisher
, Subscriber
, Subscription
, Processor
로 요약할 수 있다. Publisher
는 하나의 Subscription
당 하나의 Subscriber
에 발행하는 데이터는 생성한다.Publisher
인터페이스에는 Subscriber
가 Publisher
를 구독 신청할 수 있는 subscribe()
메서드 한 개가 선언되어 있다.Subscriber
가 구독 신청되면 Publisher
로부터 이벤트를 수신할 수 있다.Subscriber
가 수신할 첫 번째 이벤트는 onSubsribe()
호출을 통해 이루어진다.Publisher
가 onSubsribe()
를 호출할 때 이 메서드의 인자로 Subscription
객체를 Subscriber
에 전달한다.Subscriber
는 Subscription
객체를 통해서 구독을 관리할 수 있다.
Subscriber
는 Subscription
의 request()
를 호출하여 전송되는 데이터를 요청하거나, 또는 더 이상 데이터를 수신하지 않고 취소한다는 것을 나타내기 위해 cancel()
을 호출할 수 있다.request()
를 호출할 때 Subscriber
는 받고자 하는 데이터 항목 수를 나타내는 long
타입의 값을 인자로 전달한다. 바로 이것이 백 프레셔이며, Subscriber
가 처리할 수 있는 것보다 더 많은 데이터를 Publisher
가 전송하는 것을 막아준다.- 요청된 수의 데이터를
Publisher
가 전송한 후에 Subscriber
는 다시 request()
를 호출하여 더 많은 요청을 할 수 있다. 
Subscriber
의 데이터 요청이 완료되면 데이터가 스트림을 통해 전달되기 시작한다. 이 때 onNext()
메서드가 호출되어 Publisher
가 전송하는 데이터가 Subscriber
에게 전달되며, 만일 에러가 생길 떄는 onError()
가 호출된다.Publisher
에서 전송할 데이터가 없고 더 이상의 데이터를 생성하지 않는다면 Publisher
가 onComplete()
를 호출하여 작업이 끝났다고 Subscriber
에게 알려준다.Processor
인터페이스는 Subscriber
인터페이스와 Publisher
인터페이스를 결합한 것이다.Subscriber
역할로 Processor
는 데이터를 수신하고 처리한다. 그다음에 역할을 바꾸어 Publisher
역할로 처리 결과를 자신의 Subscriber
들에게 발행한다.
- 리액티브 스트림 인터페이스는 스트림을 구성하는 기능이 없다. 이에 따라 프로젝트 리액터에서는 리액티브 스트림을 구성하는 API를 제공하여 리액티브 스트림 인터페이스를 구현하였다.
리액터 시작하기#
Mono.just("Craig")
.map { n -> n.uppercase() }
.map { cn -> "Hello, $cn!" }
.subscribe { message -> println(message) }
- 위 리액티브 코드는 데이터가 전달되는 파이프라인을 구성하는 것이다.
- 파이프라인의 각 단계에서는 어떻게 하든 데이터가 변경된다.
- 각 오퍼레이션은 같은 스레드로 실행되거나 다른 스레드로 실행될 수 있다.
- 리액터의 두 가지 핵심 타입
Flux
: 0, 1 또는 다수의 데이터를 갖는 파이프라인을 나타낸다.Mono
: 하나의 데이터 항목만 갖는 데이터셋에 최적화된 리액티브 타입이다.- 두 타입 모두 리액티브 스트림의
Publisher
인터페이스를 구현한 것이다. - 위의 예에서는 세 개의
Mono
가 있으며, just()
오퍼레이션은 첫 번째 것을 생성한다.
리액티브 플로우의 다이어그램#

리액터 의존성 추가하기#
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
</dependency>
리액티브 오퍼레이션 적용하기#
Flux
와 Mono
는 래익터가 제공하는 가장 핵심적인 구성 요소다.Flux
와 Mono
가 제공하는 오퍼레이션들은 두 타입을 함께 결합하여 데이터가 전달될 수 있는 파이프라인을 생성한다.Flux
와 Mono
에는 500개 이상의 오펴레이션이 있으며, 각 오퍼레이션에는 다음과 같이 분류될 수 있다.- 생성(creation) 오퍼레이션
- 조합(combination) 오퍼레이션
- 변환(transformation) 오퍼레이션
- 로직(logic) 오퍼레이션
리액티브 타입 생성하기#
Flux
나 Mono
의 just()
메서드를 사용하여 리액티브 타입을 생성할 수 있다.- 리액터의
StepVerifier
를 사용해서 Flux
나 Mono
를 테스트할 수 있다.
@Test
fun createFlux_just() {
val fruitFlux: Flux<String> = Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry")
StepVerifier.create(fruitFlux)
.expectNext("Apple")
.expectNext("Orange")
.expectNext("Grape")
.expectNext("Banana")
.expectNext("Strawberry")
.verifyComplete()
}
컬렉션으로부터 생성하기#

fromArray()
를 통해서 배열로 Flux
생성이 가능하다.
@Test
fun createFlux_fromArray() {
val fruits: Array<String> = arrayOf("Apple", "Orange", "Grape", "Banana", "Strawberry")
val fruitFlux = Flux.fromArray(fruits)
}
fromIterable()
을 통해서 List
, Set
, Iterable
컬렉션으로부터 Flux
생성이 가능하다.
@Test
fun createFlux_fromIterable() {
val fruits: List<String> = listOf("Apple", "Orange", "Grape", "Banana", "Strawberry")
val fruitFlux = Flux.fromIterable(fruits)
}
fromStream()
을 통해서 Stream
객체를 Flux
로 생성이 가능하다.
val fruits: Stream<String> = Stream.of("Apple", "Orange", "Grape", "Banana", "Strawberry")
val fruitFlux = Flux.fromStream(fruits)
Flux 데이터 생성하기#
range()
를 사용해서, 1씩 증가하는 Flux
생성이 가능하다.- 첫번째 인자 값부터 시작해서, 1씩 증가하여 두번째 인자만큼 반복한다.

@Test
fun creatFlux_range() {
val rangeFlux = Flux.range(1, 5)
StepVerifier.create(rangeFlux)
.expectNext(1)
.expectNext(2)
.expectNext(3)
.expectNext(4)
.expectNext(5)
.verifyComplete()
}
interval()
메서드를 통해서, 0부터 시작해서 duration 시간마다 값을 1씩 증가 시키면서 방출한다.- 최대값이 지정되지 않으므로, 무한정 실행된다.
take()
오퍼레이션을 사용해서 첫 번째 5개의 항목으로 결과를 제한할 수 있다.

@Test
fun createFlux_interval() {
val intervalFlux = Flux.interval(Duration.ofSeconds(1))
.take(5)
StepVerifier.create(intervalFlux)
.expectNext(0L)
.expectNext(1L)
.expectNext(2L)
.expectNext(3L)
.expectNext(4L)
.verifyComplete()
}
리액티브 타입 조합하기#
mergeWith()
로 다른 Flux
와 결합할 수 있다.- 일반적으로
Flux
는 가능한 빨리 데이터를 방출한다. delayElements()
를 사용해서 500밀리초마다 하나씩 방출하도록 한다.delaySubscription()
을 사용해서 250밀리초가 지난 후에 구독 및 데이터를 방출하도록 한다.mergeWith()
는 소스 Flux
들의 값이 완벽하게 번갈아 방출되게 보장할 수 없다.

@Test
fun mergeFluxes() {
val characterFlux = Flux.just("Garfield", "Kojak", "Barbossa")
.delayElements(Duration.ofMillis(500))
val foodFlux = Flux.just("Lasagna", "Lollipops", "Apples")
.delaySubscription(Duration.ofMillis(250))
.delayElements(Duration.ofMillis(500))
val mergedFlux = characterFlux.mergeWith(foodFlux)
StepVerifier.create(mergedFlux)
.expectNext("Garfield")
.expectNext("Lasagna")
.expectNext("Kojak")
.expectNext("Lollipops")
.expectNext("Barbossa")
.expectNext("Apples")
.verifyComplete()
}
zip()
오퍼레이션은 각 Flux
소스로부터 한 항목씩 번갈아 가져와서 새로운 Flux
를 생성한다.- 각 소스
Flux
가 순서대로 방출되어 Tuple2
를 만든다.

@Test
fun zipFluxes() {
val characterFlux = Flux.just("Garfield", "Kojak", "Barbossa")
val foodFlux = Flux.just("Lasagna", "Lollipops", "Apples")
val zippedFLux = Flux.zip(characterFlux, foodFlux)
StepVerifier.create(zippedFLux)
.expectNextMatches{ it.t1 == "Garfield" && it.t2 == "Lasagna" }
.expectNextMatches{ it.t1 == "Kojak" && it.t2 == "Lollipops" }
.expectNextMatches{ it.t1 == "Barbossa" && it.t2 == "Apples" }
.verifyComplete()
}
zip()
오퍼레이션 마지막 파라미터로 람다를 념겨주면, 생성할 객체를 정의할 수 있다.

@Test
fun zipFluxesToObject() {
val characterFlux = Flux.just("Garfield", "Kojak", "Barbossa")
val foodFlux = Flux.just("Lasagna", "Lollipops", "Apples")
val zippedFLux = Flux.zip(characterFlux, foodFlux) { c, f -> "$c eats $f" }
StepVerifier.create(zippedFLux)
.expectNext("Garfield eats Lasagna")
.expectNext("Kojak eats Lollipops")
.expectNext("Barbossa eats Apples")
.verifyComplete()
}
firstWithSignal()
오퍼레이션은 두 Flux
객체 중 먼저 방출하는 Flux
의 값을 선택해서 이 값을 발행한다.

@Test
fun firstFlux() {
val slowFlux = Flux.just(4, 5, 6)
.delayElements(Duration.ofMillis(500))
.delaySubscription(Duration.ofMillis(250))
val fastFlux = Flux.just(1, 2, 3)
.delayElements(Duration.ofMillis(500))
val firstFlux = Flux.firstWithSignal(slowFlux, fastFlux)
StepVerifier.create(firstFlux)
.expectNext(1)
.expectNext(2)
.expectNext(3)
.verifyComplete()
}
리액티브 스트림의 변환과 필터링#
- 리액티브 타입으로부터 데이터 필터링하기
skip()
: 앞의 n개 건너 뛰기 or 지정된 시간이 경과할 때까지 처음의 여러 항목 건너 뛰기take()
: 처음 n개 항목만 발행하기 or 지정된 시간이 경과할 때 동안만 발행하기filter()
: 조건식이 true인 경우만 발행하기
- 리액티브 데이터 매핑하기
map()
: 변환을 수행하는 Flux
를 생성한다.- 각 항목이 소스
Flux
로부터 발행될 때 동기적으로 매핑이 수행된다. - 비동기적으로 매핑을 수행하고 싶다면
flatMap()
오퍼레이션을 사용해야 한다.
flatMap()
: 각 객체를 새로운 Mono
나 Flux
로 매핑하며, 해당 Mono
나 Flux
들의 결과는 하나의 새로운 Flux
가 된다.- 아래에
String
타입의 입력 문자열을 String
타입의 Mono
로 변환하는 람다가 지정되어 있다. subscribeOn()
메서드를 통해 다수의 입력 객체들의 map()
오퍼레이션이 비동기적으로 병행 수행될 수 있다.subscribe()
는 구독 요청하고 실제로 구독하는 반면, subscribeOn()
은 구독이 동시적으로 처리되어야 한다는 것을 지정한다.

- 리액티브 스트림의 데이터 버퍼링하기
buffer()
: 데이터 스트림을 작은 덩어리로 분할

flatMap
과 함께 사용하여 하나의 Flux
를 별도의 스레드에서 병행으로 처리하게 할 수 있다.
Flux
가 방출하는 모든 항목을 List
로 모을 필요가 있다면 인자를 전달하지 않고 buffer()
를 호출하면 된다.collectList()
: Flux
가 발행한 모든 항목을 포함하는 List
를 방출하는 새로운 Mono
를 생성한다.collectMap()
: Flux
가 방출하는 모든 항목을 포함하는 Map
을 방출하는 새로운 Mono
를 생선한다.- 인자로 키를 지정하는 함수를 넘긴다.

리액티브 타입에 로직 오퍼레이션 수행하기#
Mono
나 Flux
가 발행한 항목이 어떤 조건과 일치하는지만 알아야 할 경우도 있다.all()
이나 any()
오퍼레이션이 그런 로직을 수행한다.
