01 Introduction to Reactive Programming
이 글은 tech.io/playgrounds/929/reactive-programming-with-reactor-3/Intro 에 대한 풀이 및 정리입니다.
Reactive Programming
정의
리액티브 프로그래밍이란,
비동기&논블라킹으로 동작하는 애플리케이션을 작성하기 위한 새로운 패러다임의 프로그래밍이다.
Reactive Streams & Reactor
Reactive Streams
- Reactive Programming에 대한 수요가 증가하면서, Reactive 코드를 작성하는 개발자/코드/라이브러리가 많아졌다.
- but, 코드마다 구현 방식이 각각 달라지면 다른사람의 코드를 라이브러리로 사용하거나 협업이 필요할 때 혼란이 생겼다.
- 이를 위해 Reactive Programming에 대한 하나의 표준이 생겼는데, 그것이 바로 Reactive Streams 스펙이다.
Reactor
- Reactive Streams 스펙을 구현체에는 RxJava(>=2), Akka Streams, Vert.x 등이 있다.
- Reactor 역시 그 중 하나로, Spring 5 Webflux core에 포함되어 있는 API이다.
- Reactive Streams Publisher 위에 구축되어 있으며, 광범위한 상황에서 활용할 수 있는 더 높은 수준의 API를 제공한다.
사용 이유
위 그래프는 동시 요청수에 따른 spring webmvc와 spring webflux의 처리 속도를 나타내는 그래프이다.
결론부터 말하자면, Reactor를 사용하는 이유는 보다 적은 리소스를 사용해 많은 스루풋을 처리하기 위해서이다.
Spring Webmvc
- Spring의 Webmvc에서는 기본적으로 요청이 오면 스레드풀에서 스레드를 할당한다.
- 만약 스레드들이 이미 모두 할당되어있으면, 요청은 응답을 마친 스레드가 pool에 들어갈 때까지 queue에서 기다리게된다.
- 톰캣의 기본 스레드풀 max-size는 200이고, 이를 초과한 요청이 한번에 밀려올 경우 당연히 응답은 지연된다.
- 스레드 풀의 스레드를 5000개까지 늘린다면...? 동시 처리 가능한 스레드가 일정 수준을 넘어가면, 불필요한 Context Switching이 많아져 되려 성능에 악영향을 미친다.
- 요청을 처리하는 워커 스레드를 따로 둔다면...? 역시 워커 스레드의 max-size가 굉장히 많아야하고 위와 똑같은 이슈를 초래한다.
Spring Webflux
- 사실 단순 계산만 하는 프로그램에서는 Webflux를 사용할 필요가 없다.
- DB 접근이나 API 콜 등 시간이 걸리는 I/O 작업이 스레드를 잡고있기(blocking) 때문에 새로운 요청을 처리할 별도의 스레드가 필요한 것이다.
- webflux에는 API 콜을 비동기적으로 처리할 수 있는 WebClient라는 API가 존재하는데, 요청 스레드는 이러한 외부 I/O 통신을 WebClient에게 맞겨놓고 다른 요청을 처리한다.
- 이후 WebClient로부터 응답이 오면, callback 방식으로 다시 해당 요청 처리를 이어간다.
- 참고로 DB에 접근하기 위한 스펙인 JDBC는 아직 비동기 처리에 대한 지원이 없어서 webflux와 함께 사용할 수 없다.
문제 및 풀이
#1. Which of these types are defined in the Reactive Streams specification?
정답
Subscriber, Publisher
풀이
Reactive Streams 스펙에 정의되어있는 타입은 Processor, Subscriber, Publisher, Subscription 네가지이다.
참고 : www.reactive-streams.org/reactive-streams-1.0.3-javadoc/org/reactivestreams/package-summary.html
#2. In order to create a Reactive Stream Publisher or Subscriber,
정답
1. its code must comply with the spec and pass the TCK.
2. I should favor using an existing library like Reactor.
풀이
Reactive Streams를 구현하기 위해서는 제시하는 표준 스펙을 모두 준수해야하고, TCK(Technology Compatibility Kit) 테스트코드를 통과해야한다.
TCK : github.com/reactive-streams/reactive-streams-jvm/tree/master/tck
혹은 이미 Reactive Streams를 구현한 스펙인 Reactor같은 라이브러리를 사용할 수도 있다.
#3. In the code below, what did I forgot?
Publisher<Integer> source = Flux.range(1, 10);
정답
To Subscribe
풀이
Reactive Streams를 구현한 모든 코드는 subscribe() 메서드를 호출할 때 비로소 내부 동작을 실행한다.
subscribe() 메서드를 호출하지 않으면, 메모리에 Publisher(Flux의 스펙) 객체를 만들기만 하게 되며, subscribe() 메서드를 통해 생성된 Publisher 객체에서 실제 데이터를 Push하게 된다.
#4. I expected the code below to emit "fooA" but it didn't, why?
Flux<String> flux = Flux.just("A");
flux.map(s -> "foo" + s);
flux.subscribe(System.out::println);
정답
The Result of of the `map` operator is a new Flux which was discarded.
풀이
WebFlux는 기본적으로 함수형 프로그래밍을 지향하고 있다.
즉, Flux.just("A").map(s -> "foo" + s).subscribe(System.out::println); 과 같은 형태로 사용하게 된다는 것이다.
위 코드에 대해 잘 생각해 보면, Flux.just("A")의 반환 객체(Publisher)가 가지고 있는 map() 메서드를 실행하고, map(s -> "foo" + s)의 반환 객체(Publisher)가 가지고 있는 subscribe() 메서드를 실행한다고 생각할 수 있다.
하지만, 보기의 문제의 코드에서는 Flux.just("A")의 반환 객체의 subscribe() 메서드를 호출하고 있기 때문에 기대했던 출력값인 "fooA"가 아닌 첫번째 객체에서 생성한 데이터인 "A"를 출력하게 된다.
기대했던 값을 출력하기 위해서는 다음과 같이 작성할 수 있다.
// Alt1 (recommended)
Flux.just("A").map(s -> "foo" + s).subscribe(System.out::println);
// Alt2
Flux flux1 = Flux.just("A");
Flux flux2 = flux1.map(s -> "foo" + s);
flux2.subscribe(System.out::println);
참고
tech.io/playgrounds/929/reactive-programming-with-reactor-3/Intro
www.reactive-streams.org/reactive-streams-1.0.3-javadoc/org/reactivestreams/package-summary.html
https://sjh836.tistory.com/185