현재 저희팀에서 개발 시 Spring WebFlux를 사용하고 있습니다.
Spring WebFlux의 대한 기본 지식없이 맨 땅에 헤딩하면서 진행하여서 해당 부분을 정리하려고 합니다.
개요
사용자 아이디를 이용해서 item을 갖고오는 API가 있습니다.
public Flux<Item> collect(final String userId)
이 API의 응답값이 Flux으로 되어 있는데, 이 부분을 List으로 변환하고 싶습니다.
그래서 다음과 같은 작업을 하였습니다.
사건의 시작
Flux을 List으로 받기 위해서 아래 소스를 이용해서 응답을 받았습니다. 하지만...
private List<Item> callUserItem(Request request) {
return apiItemController.collect(request).collectList().share().block(Duration.ofMillis(10_000));
}
여러건의 호출이 있을 경우 타임아웃 에러가 발생합니다.
해당 에러가 발생하는 Webflux의 Class 파일을 찾아보았습니다.
@Override
@Nullable
public O block(@Nullable Duration timeout) {
try {
if (isTerminated()) {
return peek();
}
connect();
long delay;
if (null == timeout) {
delay = 0L;
}
else {
delay = System.nanoTime() + timeout.toNanos();
}
for (; ; ) {
if (isTerminated()) {
if (error != null) {
RuntimeException re = Exceptions.propagate(error);
re = Exceptions.addSuppressed(re, new Exception("Mono#block terminated with an error"));
throw re;
}
return value;
}
if (timeout != null && delay < System.nanoTime()) {
cancel();
throw new IllegalStateException("Timeout on Mono blocking read");
}
Thread.sleep(1);
}
}
catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Thread Interruption on Mono blocking read");
}
}
Timeout이 발생하는 소스를 보면, 파라미터로 넘긴 1_000
(10초)를 기다리고 발생한 것으로 파악이 됩니다.
응답이 반환되지 않고 타임아웃만큼 기다렸다가 에러를 발생합니다.
좀 이상하지만, 원인을 못 찾았습니다.
그래서 share() 가 무슨 작업을 하는지 모르겠어서 제거하였습니다.
private List<Item> callUserItem(Request request) {
return apiItemController.collect(request).collectList().block(Duration.ofMillis(10_000));
}
그리고는 아래와 같은 에러가 발생하였습니다.
해당 에러는 Webflux에서 아래 클래스에서 에러를 발생합니다.
@Nullable
public T block() {
BlockingMonoSubscriber<T> subscriber = new BlockingMonoSubscriber<>();
subscribe((Subscriber<T>) subscriber);
return subscriber.blockingGet();
}
@Nullable
final T blockingGet() {
if (Schedulers.isInNonBlockingThread()) {
throw new IllegalStateException("block()/blockFirst()/blockLast() are blocking, which is not supported in thread " + Thread.currentThread().getName());
}
if (getCount() != 0) {
try {
await();
}
catch (InterruptedException ex) {
dispose();
throw Exceptions.propagate(ex);
}
}
Throwable e = error;
if (e != null) {
RuntimeException re = Exceptions.propagate(e);
//this is ok, as re is always a new non-singleton instance
re.addSuppressed(new Exception("#block terminated with an error"));
throw re;
}
return value;
}
해당 Thread가 Non Blocking
인 경우 block
을 하게 되면 해당 에러를 뱉게 된다고 되어 있습니다.
그래서 block을 사용하지 않고 NonBlocking 응답을 처리하여 반환하는 방식으로 수정을 하게 됩니다.
private List<Item> callUserItem(Request request) {
List<Item> itemList = new ArrayList<>();
Flux<Item> response = apiItemController.collect(request);
response.subscribe(res -> {
if( res != null ) {
itemList.add(res);
}
});
return itemList;
}
현재까지는 타임아웃 관련된 에러는 발생하지 않습니다.
정리
WebFlux는 인터넷의 정말 많은 자료들이 있지만 정확하지 않은 정보 또한 많습니다.
그전에 java doc, 구현 class 파일을 통해서 정확하게 정보를 보는 것이 맞겠다고 생각하였습니다.
'프로그래밍이야기 > Spring' 카테고리의 다른 글
[Chaos Engineering] CM4SB 적용 내용 정리 (0) | 2022.02.13 |
---|---|
[Chaos Engineering] 개념 정리 (0) | 2022.02.13 |
Spring Boot Cache 적용 (0) | 2022.01.30 |
[Spring Boot] 지원 버전 확인하기 (0) | 2022.01.23 |
[Spring ehcache] 로컬 캐시 라이브러리를 알아보자 (0) | 2021.11.10 |
댓글