본문 바로가기

[Spring WebFlux] Timeout on Mono blocking read 해결

by 사랑꾼이야 2021. 11. 10.


현재 저희팀에서 개발 시 Spring WebFlux를 사용하고 있습니다.
Spring WebFlux의 대한 기본 지식없이 맨 땅에 헤딩하면서 진행하여서 해당 부분을 정리하려고 합니다.


사용자 아이디를 이용해서 item을 갖고오는 API가 있습니다.

public Flux<Item> collect(final String userId)


이 API의 응답값이 Flux으로 되어 있는데, 이 부분을 List으로 변환하고 싶습니다.
그래서 다음과 같은 작업을 하였습니다.


사건의 시작

Flux을 List으로 받기 위해서 아래 소스를 이용해서 응답을 받았습니다. 하지만...

private List<Item> callUserItem(Request request) {
    return apiItemController.collect(request).collectList().share().block(Duration.ofMillis(10_000));


여러건의 호출이 있을 경우 타임아웃 에러가 발생합니다.


해당 에러가 발생하는 Webflux의 Class 파일을 찾아보았습니다.

public O block(@Nullable Duration timeout) {
    try {
        if (isTerminated()) {
            return peek();


        long delay;
        if (null == timeout) {
            delay = 0L;
        else {
            delay = System.nanoTime() + timeout.toNanos();
        for (; ; ) {
            if (isTerminated()) {
                if (error != null) {
                    RuntimeException re = Exceptions.propagate(error);
                    re = Exceptions.addSuppressed(re, new Exception("Mono#block terminated with an error"));
                    throw re;
                return value;
            if (timeout != null && delay < System.nanoTime()) {
                throw new IllegalStateException("Timeout on Mono blocking read");


    catch (InterruptedException ie) {

        throw new IllegalStateException("Thread Interruption on Mono blocking read");


Timeout이 발생하는 소스를 보면, 파라미터로 넘긴 1_000(10초)를 기다리고 발생한 것으로 파악이 됩니다.
응답이 반환되지 않고 타임아웃만큼 기다렸다가 에러를 발생합니다.
좀 이상하지만, 원인을 못 찾았습니다.
그래서 share() 가 무슨 작업을 하는지 모르겠어서 제거하였습니다.

private List<Item> callUserItem(Request request) {
    return apiItemController.collect(request).collectList().block(Duration.ofMillis(10_000));


그리고는 아래와 같은 에러가 발생하였습니다.


해당 에러는 Webflux에서 아래 클래스에서 에러를 발생합니다.

public T block() {
   BlockingMonoSubscriber<T> subscriber = new BlockingMonoSubscriber<>();
   subscribe((Subscriber<T>) subscriber);
   return subscriber.blockingGet();
final T blockingGet() {
   if (Schedulers.isInNonBlockingThread()) {
      throw new IllegalStateException("block()/blockFirst()/blockLast() are blocking, which is not supported in thread " + Thread.currentThread().getName());
   if (getCount() != 0) {
      try {
      catch (InterruptedException ex) {
         throw Exceptions.propagate(ex);

   Throwable e = error;
   if (e != null) {
      RuntimeException re = Exceptions.propagate(e);
      //this is ok, as re is always a new non-singleton instance
      re.addSuppressed(new Exception("#block terminated with an error"));
      throw re;
   return value;


해당 Thread가 Non Blocking인 경우 block을 하게 되면 해당 에러를 뱉게 된다고 되어 있습니다.
그래서 block을 사용하지 않고 NonBlocking 응답을 처리하여 반환하는 방식으로 수정을 하게 됩니다.

private List<Item> callUserItem(Request request) {
    List<Item> itemList = new ArrayList<>();
    Flux<Item> response = apiItemController.collect(request);
    response.subscribe(res -> {
        if( res != null ) {
    return itemList;


현재까지는 타임아웃 관련된 에러는 발생하지 않습니다.



WebFlux는 인터넷의 정말 많은 자료들이 있지만 정확하지 않은 정보 또한 많습니다.
그전에 java doc, 구현 class 파일을 통해서 정확하게 정보를 보는 것이 맞겠다고 생각하였습니다.

