01 Introduction to Reactive Programming

2021. 3. 2.

이 글은 에 대한 풀이 및 정리입니다.


Reactive Programming


리액티브 프로그래밍이란,

비동기&논블라킹으로 동작하는 애플리케이션을 작성하기 위한 새로운 패러다임의 프로그래밍이다.


Reactive Streams & Reactor

Reactive Streams

  • Reactive Programming에 대한 수요가 증가하면서, Reactive 코드를 작성하는 개발자/코드/라이브러리가 많아졌다.
  • but, 코드마다 구현 방식이 각각 달라지면 다른사람의 코드를 라이브러리로 사용하거나 협업이 필요할 때 혼란이 생겼다.
  • 이를 위해 Reactive Programming에 대한 하나의 표준이 생겼는데, 그것이 바로 Reactive Streams 스펙이다.


  • Reactive Streams 스펙을 구현체에는 RxJava(>=2), Akka Streams, Vert.x 등이 있다.
  • Reactor 역시 그 중 하나로, Spring 5 Webflux core에 포함되어 있는 API이다.
  • Reactive Streams Publisher 위에 구축되어 있으며, 광범위한 상황에서 활용할 수 있는 더 높은 수준의 API를 제공한다.


사용 이유

위 그래프는 동시 요청수에 따른 spring webmvc와 spring webflux의 처리 속도를 나타내는 그래프이다.

결론부터 말하자면, Reactor를 사용하는 이유는 보다 적은 리소스를 사용해 많은 스루풋을 처리하기 위해서이다.


Spring Webmvc

  • Spring의 Webmvc에서는 기본적으로 요청이 오면 스레드풀에서 스레드를 할당한다.
  • 만약 스레드들이 이미 모두 할당되어있으면, 요청은 응답을 마친 스레드가 pool에 들어갈 때까지 queue에서 기다리게된다.
  • 톰캣의 기본 스레드풀 max-size는 200이고, 이를 초과한 요청이 한번에 밀려올 경우 당연히 응답은 지연된다.
  • 스레드 풀의 스레드를 5000개까지 늘린다면...? 동시 처리 가능한 스레드가 일정 수준을 넘어가면, 불필요한 Context Switching이 많아져 되려 성능에 악영향을 미친다.
  • 요청을 처리하는 워커 스레드를 따로 둔다면...? 역시 워커 스레드의 max-size가 굉장히 많아야하고 위와 똑같은 이슈를 초래한다.


Spring Webflux

  • 사실 단순 계산만 하는 프로그램에서는 Webflux를 사용할 필요가 없다.
  • DB 접근이나 API 콜 등 시간이 걸리는 I/O 작업이 스레드를 잡고있기(blocking) 때문에 새로운 요청을 처리할 별도의 스레드가 필요한 것이다.
  • webflux에는 API 콜을 비동기적으로 처리할 수 있는 WebClient라는 API가 존재하는데, 요청 스레드는 이러한 외부 I/O 통신을 WebClient에게 맞겨놓고 다른 요청을 처리한다.
  • 이후 WebClient로부터 응답이 오면, callback 방식으로 다시 해당 요청 처리를 이어간다.
  • 참고로 DB에 접근하기 위한 스펙인 JDBC는 아직 비동기 처리에 대한 지원이 없어서 webflux와 함께 사용할 수 없다.






문제 및 풀이

#1. Which of these types are defined in the Reactive Streams specification?


Subscriber, Publisher


Reactive Streams 스펙에 정의되어있는 타입은 Processor, Subscriber, Publisher, Subscription 네가지이다.

참고 :


#2. In order to create a Reactive Stream Publisher or Subscriber,


1. its code must comply with the spec and pass the TCK.

2. I should favor using an existing library like Reactor.



Reactive Streams를 구현하기 위해서는 제시하는 표준 스펙을 모두 준수해야하고, TCK(Technology Compatibility Kit) 테스트코드를 통과해야한다.


혹은 이미 Reactive Streams를 구현한 스펙인 Reactor같은 라이브러리를 사용할 수도 있다.



#3. In the code below, what did I forgot?

Publisher<Integer> source = Flux.range(1, 10);


To Subscribe



Reactive Streams를 구현한 모든 코드는 subscribe() 메서드를 호출할 때 비로소 내부 동작을 실행한다.

subscribe() 메서드를 호출하지 않으면, 메모리에 Publisher(Flux의 스펙) 객체를 만들기만 하게 되며, subscribe() 메서드를 통해 생성된 Publisher 객체에서 실제 데이터를 Push하게 된다.



#4. I expected the code below to emit "fooA" but it didn't, why?

Flux<String> flux = Flux.just("A"); -> "foo" + s);


The Result of of the `map` operator is a new Flux which was discarded.



WebFlux는 기본적으로 함수형 프로그래밍을 지향하고 있다.

즉, Flux.just("A").map(s -> "foo" + s).subscribe(System.out::println); 과 같은 형태로 사용하게 된다는 것이다.


위 코드에 대해 잘 생각해 보면, Flux.just("A")의 반환 객체(Publisher)가 가지고 있는 map() 메서드를 실행하고, map(s -> "foo" + s)의 반환 객체(Publisher)가 가지고 있는 subscribe() 메서드를 실행한다고 생각할 수 있다.


하지만, 보기의 문제의 코드에서는 Flux.just("A")의 반환 객체의 subscribe() 메서드를 호출하고 있기 때문에 기대했던 출력값인 "fooA"가 아닌 첫번째 객체에서 생성한 데이터인 "A"를 출력하게 된다.


기대했던 값을 출력하기 위해서는 다음과 같이 작성할 수 있다.

// Alt1 (recommended)
Flux.just("A").map(s -> "foo" + s).subscribe(System.out::println);

// Alt2
Flux flux1 = Flux.just("A");
Flux flux2 = -> "foo" + s);







