Flux.range(1, 10).flatMap( A ).map(i -> i * 10).subscribe(System.out::println);
flatMap이 어떻게 동작하는지 알아보기 위해, 다음 코드의 동작을 잘 살펴보려 한다.
A에 함수에 따라 달라지는 동작을 그림과 함께 살펴보자.
(flatMap의 비동기성을 잘 살펴보기 위해 스레드는 1개로 제한할 것이다.)
Flux.range(1, 10).flatMap(this::sleepAndWrap).map(i -> i * 10).subscribe(System.out::println);
private Mono<Integer> sleepAndWrap(int i) {
return Mono.just(i).map(j -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return j;
});
}
노란색으로 칠해진 부분의 코드를 읽었을 때 this:sleepAndWrap은 바로 실행되는 것이 아니고,
이후에 subscribe() 가 호출된 후 1~10이 생성된 후 호출될 콜백으로서 Flux2 객체에 저장할 뿐이다.
역시, 노란색으로 칠해진 부분의 코드를 읽었을 때 i -> i * 10은 바로 실행되는 것이 아니고,
flatMap 내부의 콜백이 완료되었을 때 호출될 콜백으로서 Flux3 객체에 저장할 뿐이다.
1이 가장 먼저 생성되고, Stream이기 때문에 스레드가 노는 상황이 나오기 전까지 Operator를 차례대로 실행한다.
가장 먼저, this::sleepAndWrap 콜백을 실행한다.
flatMap은 인자로 받은 함수의 반환값을 한꺼풀 벗기는, 즉 Publisher를 하나 벗기는 Operator이기때문에 Publisher로 Wrapping된 객체를 반환하는 함수를 인자로 받는다. (ex. Mono<Integer>)
확실하진 않지만, flatMap 내부에서 onSubscribe() 메서드가 실행되어서 구하고자 하는 값을 생산하는 Publisher를 반환하도록 하는 것으로 예상된다.
이 코드가 실행되었을때 역시 스레드는 잠들지 않는다.
flatMap으로 한꺼풀 벗겨져 Mono.just(1)로 데이터가 생성된 후 실행될 콜백(1초 자고 값 그대로 반환 함수)를 Mono 2에 저장할 뿐이다.
여기서 현재 실행중인 1개뿐인 스레드가 1초동안 잠을 잔다.
스레드는 잠시 잠을 잤을 뿐 정해진 순서를 계속 따른다.
Flux.range(1, 10).flatMap(this::apiCall).map(i -> i * 10).subscribe(System.out::println);
private Mono<Integer> apiCall(int i) {
return WebClient.create()
.get()
.uri("http://localhost:8000?num={num}", i) // 1초 걸리는 API, num 그대로 반환
.retrieve()
.bodyToMono(Integer.class);
}
#1 - 1) 과 같다.
#1 - 1) 과 같다.
#1 - 1) 과 같다.
#1 - 1) 과 같다.
WebClient 역시 non-blocking이기 때문에, 응답 이벤트가 오기 전까지 스레드를 blocking하고 있지 않는다.
(netty 기반의 non-blocking)
1~10까지 반복해도 1초가 넘지 않을 것이기 때문에, 모두 Netty의 응답 이벤트를 기다리고 있다.
외부 API의 응답 순서는 정해져있지 않다.
콜백을 실행하는동안 외부 API 응답이 더 처리될 수 있고, 처리 순서대로 메인 스레드 Queue에 쌓이게된다.
이후에 메인 스레드는 Queue에서 하나씩 꺼내어 콜백을 적용할 것이다.
나머지 Queue 의 작업들도 모두 진행한 후에 작업이 마무리된다.
댓글 영역