Reactive Programmingaaa/쓸데없이 자세한 Reactor

flatMap 알아보기

peter.j 2021. 3. 9. 04:28
Flux.range(1, 10).flatMap( A ).map(i -> i * 10).subscribe(System.out::println);

flatMap이 어떻게 동작하는지 알아보기 위해, 다음 코드의 동작을 잘 살펴보려 한다.

A에 함수에 따라 달라지는 동작을 그림과 함께 살펴보자.

(flatMap의 비동기성을 잘 살펴보기 위해 스레드는 1개로 제한할 것이다.)

 

 

 

 

# 1. 1초 후에 인자를 단순히 Mono로 감싸 반환하는 함수를 사용

Flux.range(1, 10).flatMap(this::sleepAndWrap).map(i -> i * 10).subscribe(System.out::println);



private Mono<Integer> sleepAndWrap(int i) {
    return Mono.just(i).map(j -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return j;
    });
}

1) Flux.range(1, 10) 으로 단순 Flux 객체를 메모리에 만든다.

2) .flatMap(this::sleepAndWrap)으로 또 다른 Flux 객체를 메모리에 만든다.

노란색으로 칠해진 부분의 코드를 읽었을 때 this:sleepAndWrap은 바로 실행되는 것이 아니고,

이후에 subscribe() 가 호출된 후 1~10이 생성된 후 호출될 콜백으로서 Flux2 객체에 저장할 뿐이다.

3) .map(i -> i * 10)으로 또 다른 Flux 객체를 메모리에 만든다.

역시, 노란색으로 칠해진 부분의 코드를 읽었을 때 i -> i * 10은 바로 실행되는 것이 아니고,

flatMap 내부의 콜백이 완료되었을 때 호출될 콜백으로서 Flux3 객체에 저장할 뿐이다.

4) .subscribe(System.out::println)을 읽자마자 1부터 생산을 시작한다.

1이 가장 먼저 생성되고, Stream이기 때문에 스레드가 노는 상황이 나오기 전까지 Operator를 차례대로 실행한다.

가장 먼저, this::sleepAndWrap 콜백을 실행한다.

5) Mono.just(i)로 Mono 객체를 메모리에 만든다.

flatMap은 인자로 받은 함수의 반환값을 한꺼풀 벗기는, 즉 Publisher를 하나 벗기는 Operator이기때문에 Publisher로 Wrapping된 객체를 반환하는 함수를 인자로 받는다. (ex. Mono<Integer>)

확실하진 않지만, flatMap 내부에서 onSubscribe() 메서드가 실행되어서 구하고자 하는 값을 생산하는 Publisher를 반환하도록 하는 것으로 예상된다.

6) .map(j -> { Thread.sleep(1000); return j; });로 또 다른 Mono 객체를 메모리에 만든다.

이 코드가 실행되었을때 역시 스레드는 잠들지 않는다.

flatMap으로 한꺼풀 벗겨져 Mono.just(1)로 데이터가 생성된 후 실행될 콜백(1초 자고 값 그대로 반환 함수)를 Mono 2에 저장할 뿐이다.

7) onSubscribe()가 발동되어 바로 데이터를 생산한 후에 Mono2의 콜백을 실행한다.

여기서 현재 실행중인 1개뿐인 스레드가 1초동안 잠을 잔다.

8) 스레드가 잠에서 깨고 값을 반환하면 바로 Flux3의 콜백을 실행한다.

스레드는 잠시 잠을 잤을 뿐 정해진 순서를 계속 따른다.

9) Operator를 모두 거친 값을 subscriber가 출력한다.

10) 참조가 없는 Mono들은 GC에 의해 지워지고, 이와 같은 Stream을 나머지 데이터에 적용한다.

 

 

 

 

 

 

# 2. 1초 후에 값을 그대로 반환하는 api를 call 하는 함수를 사용

Flux.range(1, 10).flatMap(this::apiCall).map(i -> i * 10).subscribe(System.out::println);



private Mono<Integer> apiCall(int i) {
    return WebClient.create()
    	.get()
        .uri("http://localhost:8000?num={num}", i) // 1초 걸리는 API, num 그대로 반환
        .retrieve()
        .bodyToMono(Integer.class);
}

1) Flux.range(1, 10) 으로 단순 Flux 객체를 메모리에 만든다.

#1 - 1) 과 같다.

 

2) .flatMap(this::apiCall)으로 또 다른 Flux 객체를 메모리에 만든다.

#1 - 1) 과 같다.

3) .map(i -> i * 10)으로 또 다른 Flux 객체를 메모리에 만든다.

#1 - 1) 과 같다.

4) .subscribe(System.out::println)을 읽자마자 1부터 생산을 시작한다.

#1 - 1) 과 같다.

5) 메인 스레드는 WebClient(별도의 스레드)에게 일을 넘기고, WebClient는 Http 요청을 보낸다.

WebClient 역시 non-blocking이기 때문에, 응답 이벤트가 오기 전까지 스레드를 blocking하고 있지 않는다.

(netty 기반의 non-blocking)

6) 메인 스레드는 이어지는 작업이 없으므로 Flux2에서 2를 생성한 후, apiCall 콜백을 적용한다.

 

7) 이 작업을 1~10까지 반복한다.

1~10까지 반복해도 1초가 넘지 않을 것이기 때문에, 모두 Netty의 응답 이벤트를 기다리고 있다.

8) 가장 먼저 오는 응답을 메인 스레드의 Queue에 넣고 처리한다.

외부 API의 응답 순서는 정해져있지 않다.

 

9) 메인 스레드의 Queue에 작업이 들어오면 바로 그 이후에 저장되어 있는 콜백을 적용한다.

콜백을 실행하는동안 외부 API 응답이 더 처리될 수 있고, 처리 순서대로 메인 스레드 Queue에 쌓이게된다.

이후에 메인 스레드는 Queue에서 하나씩 꺼내어 콜백을 적용할 것이다.

 

10) 콜백이 적용되고 최종 결괏값을 subscriber를 통해 출력한다.

나머지 Queue 의 작업들도 모두 진행한 후에 작업이 마무리된다.

 

 

 

 

사업자 정보 표시
1 | g | asdf | 사업자 등록번호 : 123-12-12345 | TEL : 010-111-1111 | Mail : asdf@gmail.com | 통신판매신고번호 : 호 | 사이버몰의 이용약관 바로가기