REACTOR 7

07 Blocking to Reactive

이 글은 tech.io/playgrounds/929/reactive-programming-with-reactor-3/BlockingToReactive 에 대한 문제풀이이다. Reactive 코드 속에서 레거시하고, Reactive하지 않은 코드는 어떻게 처리해야할까? 예를들어, JDBC connection과 같은 Blocking 코드가 있고 이를 Reactive pipeline에 통합해한다면 어떻게해야할까? 가장 좋은 방법은, blocking이 진행되는 부분을 Schedulers를 통해 별도의 스레드에서 실행되도록 고립시키는 것이다. JDBC의 예에서는 fromIterable 팰터리 메서드를 사용할 수 있다. subscribeOn 이 메서드는 제공된 스케줄러에서 시작부터 시퀀스를 분리해서 실행할수 있도록..

06 Reactive To Blocking

이 글은 tech.io/playgrounds/929/reactive-programming-with-reactor-3/ReactiveToBlocking 에 대한 문제풀이이다. Reactive에 기존 코드의 일부를 사용하고 싶을 때에는 어떻게 해야할까? 해당 코드가 Blocking코드라면, 해당 로직을 고효율 Reactive 코드의 흐름 속에 어떻게 추가할 수 있을까? 이런 코드는 절대 Reactive 코드 실행 중간에 실행되도록 하면 안된다. Reactive pipeline의 중간에 block이 되어버릴 수 있기 때문이다. 예를 들어, 중간에 Block이 되는 코드로서 다음과 같은 예시를 들 수 있다. User monoToValue(Mono mono) { return mono.block(); } Itera..

05 Others operations

이 글은 tech.io/playgrounds/929/reactive-programming-with-reactor-3/OthersOperations 에 대한 문제풀이이다. 이번 장은 Reactor 3의 다양하고 유용한 Operator들에 대해 살펴본다. 먼저, zip Operator는 복수개의, 각각의 레이턴시를 두고도착하는 Publisher들을 합쳐서 연산할 수 있는 Operator이다. Flux userFluxFromStringFlux(Flux usernameFlux, Flux firstnameFlux, Flux lastnameFlux) { return Flux.zip( (args) -> { return new User((String) args[0], (String) args[1], (String) a..

04 Adapt

이 글은 tech.io/playgrounds/929/reactive-programming-with-reactor-3/Adapt의 문제풀이이다. RxJava3나 Reactor3 모두 Reactive Streams의 구현체이기 때문에, 외부 라이브러리 없이 서로와 호환된다. 예를들어, Flux(Reactor3의 Publisher 구현체)를 Flowable(RxJava3의 Publisher 구현체)로 (혹은 반대로) 변경할 수 있다. Flowable fromFluxToFlowable(Flux flux) { return Flowable.fromPublisher(flux); } Flux fromFlowableToFlux(Flowable flowable) { return Flux.from(flowable); } O..

03 Request

PubSub 구조는 대표적인 Push방식의 디자인 패턴이다. Push방식의 대표적인 특징으로, 데이터를 받는 입장에서는 주는 데이터를 여건에 상관없이 받아서 처리해야 한다. 만약 데이터를 받아서 복잡한 로직을 처리해야 하는데 계속 데이터가 밀려올 경우 부하가 발생할 수도 있다. 초기 Push방식의 디자인패턴인 Observer패턴에서 이러한 단점이 나타나자, 이를 보완한 PubSub 디자인 패턴이 등장하게 되었다. PubSub 디자인패턴에서는 request() 메서드를 통해서 Subscriber의 상황에 따라 Publisher에게 얼만큼의 데이터를 받을 수 있는지 전달 할 수 있다. 1. thenRequest(Long.MAX_VALUE) Reactor의 Subscriber에서 Publisher로 처리가능한..

02 Merge

복수개의 Publisher를 합칠 수 있다. 1. merge(Flux flux1, Flux flux2) merge() 메서드는 먼저 도착하는 데이터가 먼저 처리되는 Flux를 반환한다. 즉, flux1과 flux2가 생상하는 데이터들의 순서가 보장되지 않는다. Flux mergeFluxWithInterleave(Flux flux1, Flux flux2) { return Flux.merge(flux1, flux2); } 2. concat(Flux flux1, Flux flux2) concat() 메서드는 인자로 받는 Flux 객체들의 순서를 보장한다. Flux mergeFluxWithNoInterleave(Flux flux1, Flux flux2) { return Flux.concat(flux1, flux..

flatMap 알아보기

Flux.range(1, 10).flatMap( A ).map(i -> i * 10).subscribe(System.out::println);flatMap이 어떻게 동작하는지 알아보기 위해, 다음 코드의 동작을 잘 살펴보려 한다.A에 함수에 따라 달라지는 동작을 그림과 함께 살펴보자.(flatMap의 비동기성을 잘 살펴보기 위해 스레드는 1개로 제한할 것이다.)    # 1. 1초 후에 인자를 단순히 Mono로 감싸 반환하는 함수를 사용Flux.range(1, 10).flatMap(this::sleepAndWrap).map(i -> i * 10).subscribe(System.out::println);private Mono sleepAndWrap(int i) { return Mono.just(i)...