Java & Spring

🙌 쓰레드풀을 이용해 두 비동기 작업 처리 완료 후 후속 처리하기

리차드 2022. 9. 29. 15:40

 

전체 코드는 여기에서 확인하실 수 있습니다 :)

 

 

두 곳에 Ajax 요청을 보낸 뒤, 결과를 취합해 후속 작업을 해야 한다면?


언제 처리가 완료될지 정확히 알 수 없는 두 가지 작업이 있고,
그 작업들이 모두 완료된 이후에 취합하여 후속작업을 처리해야 한다고 가정해보겠습니다.

가령 서비스 레이어에서 두 곳의 엔드포인트에 요청을 보내고,
응답 값을 취합해서 결과에 따른 분기처리가 필요한 상황이라고 생각해볼 수 있을 것 같아요.

 

요청한 클라이언트에게 즉시 결과를 응답해야 한다면

  • 두 요청을 쓰레드 풀 작업 큐에 넣고, 두 Future 객체의 get() 메서드를 이용해 기다린 후 응답한다

이 방법의 장점은 하나의 요청이 응답될 때까지 기다렸다가 다음 요청이 나가는 것이 아니라,
동시에 두 가지 요청이 호출된다는 점 정도입니다.

 

요청한 클라이언트에게 접수됐다고 응답하고 추후  응답해도 된다면

  • 두 요청을 쓰레드 풀 작업 큐에 넣고, 클라이언트에겐 접수됐다고 즉시 응답처리
  • 작업 큐에 넣은 두 작업의 Future 객체를 가지고 후속처리 하는 작업을 작업큐에 넣음
  • 후속처리 작업큐는 두 Future를 get()으로 가져온 뒤, 클라이언트에게 처리 결과 응답
    • 응답하는 과정도 EventListener 기반으로 구현할 수도

 

위와 같이 시나리오를 상상해봤습니다.
예제 코드에서는 후자를 선택하되, 이벤트 리스너 기반까지 가지는 않았습니다.
컨셉에 대한 소개를 마쳤으니 이제 코드로 넘어가보겠습니다.

 

 

 

public <T> Future<T> submit(Runnable task, T result)


ExecutorService 인터페이스에 정의된 submit 메서드를 사용합니다.

그런데 리턴값이 없는 Runnable 인데도 submit 메서드의 두번째 매개변수로 result T 가 전달되고 있습니다.
이는 Future.get()을 했을 때 T가 응답되도록 정해진 메서드입니다.

그러면 저 T result 를 입맛에 맞게 직접 구현하고, 
생성해서 참조값을 Runnable 을 구현한 객체에도 전달하고,
submit 메서드를 호출할때도 전달한다면,
run 메서드 실행 시 처리 결과를 T result에 담도록 구현할 수 있을 것 같습니다.

 

 

 

처리 결과가 담길 ResultDto


import java.util.HashMap;
import java.util.Map;

public class ResultDto {
    private Map<String, Object> data;

    public ResultDto() {
        this.data = new HashMap<>();
    }

    public void addResult(final String key, final Object value) {
        data.put(key, value);
    }

    public Map<String, Object> getData() {
        return data;
    }
}

 

먼저 처리 결과가 담길 Dto를 선언해보겠습니다.
특별한 내용은 없는, Map을 필드로 갖는 일급 컬렉션 객체입니다.
이제 처리 결과를 여기에 담을, Runnable 을 구현한 클래스를 선언해보겠습니다.

 

 

 

비동기 작업을 수행할 클래스 선언


public class Job implements Runnable {
    private final ResultDto resultDto;
    private final int seconds;
    private final String key;
    private final Object value;

    public Job(final ResultDto resultDto, final int seconds, final String key, final Object value) {
        this.resultDto = resultDto;
        this.seconds = seconds;
        this.key = key;
        this.value = value;
    }

    @Override
    public void run() {
        System.out.println("sleep for :" + 1000 * seconds);

        try {
            Thread.sleep(1000L * seconds);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        System.out.println();
        System.out.println("I'm awake after sleep for " + 1000 * seconds);
        System.out.println("putting data " + "key : " + key + " value : " + value);
        resultDto.addResult(this.key, this.value);
        System.out.println("job done!");
        System.out.println("current data status");
        System.out.println(resultDto.getData());
    }
}

 

생성자 매개변수를 많이 받고 있습니다. 쉽고 빠르게 예제를 만들다 보니 그렇게 됐습니다.
첫번째 매개변수는 가장 중요한, 처리 결과를 담을 ResultDto입니다.
그 이후로는 예제 진행을 위한 매개변수들입니다.
지연을 발생시킬 시간을 의미하는 seconds와 처리 결과를 담을 key, value입니다.

이 클래스를 이용해서 서로 다른 지연 시간을 가진,
서로 다른 처리 결과를 담을 인스턴스를 두 개 만들어서 처리를 진행할 예정입니다.

 

 

 

두 비동기 작업을 취합해서 추가 처리를 할 클래스 선언


import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

public class ResultCallback implements Runnable {
    private final Future<ResultDto> job1;
    private final Future<ResultDto> job2;
    private final ThreadPoolExecutor threadPoolExecutor;

    public ResultCallback(final Future<ResultDto> job1, final Future<ResultDto> job2,
                          final ThreadPoolExecutor threadPoolExecutor) {
        this.job1 = job1;
        this.job2 = job2;
        this.threadPoolExecutor = threadPoolExecutor;
    }

    @Override
    public void run() {
        try {
            final ResultDto resultDto3 = job1.get();
            final ResultDto resultDto4 = job2.get();

            System.out.println();
            System.out.println("Future의 get메서드로 취합된 데이터");
            System.out.println(resultDto4.getData());

            System.out.println();
            System.out.println("I'm shutting down..");
            threadPoolExecutor.shutdown();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }
}

 

앞서 선언했던 Job 클래스 인스턴스가 작업 큐에 담긴 결과로 반환되는 두 개의 Future.
그것들을 받아서 처리 완료를 기다린 후 취합해서 추가 처리를 진행할 ResultCallback입니다.

두 개의 Future에 대해 get()을 호출하고 있는데요,
각각의 Future는 서로 다른 Runnable이 작업 큐에 담긴 결과이기 때문에 
서로 비동기적으로 수행되고 있습니다.
따라서 job1을 먼저 get할지, job2를 먼저 get할지는 현재 코드 상에선 큰 의미 없습니다.

다만 get이 blocking 메서드이기 때문에, 두 작업이 모두 수행 완료되길 기다린다는 점이 중요합니다.

모두 기다린 뒤엔 데이터를 출력하고, 쓰레드 풀을 종료하는 작업입니다.

 

 

 

메인 애플리케이션


import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolApplication {

    public static void main(String[] args) {
        // 쓰레드 풀 생성
        final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                4,
                8,
                120L,
                TimeUnit.SECONDS,
                new SynchronousQueue<>()
        );

        // 두 비동기 작업 결과가 저장될 객체 선언
        final ResultDto resultDto = new ResultDto();

        // 두 비동기 작업 생성, 작업 결과를 저장할 객체도 함께 전달
        final Job job = new Job(resultDto, 3, "hello", "world");
        final Job job2 = new Job(resultDto, 5, "nickname", "richard");

        // 두 비동기 작업을 스레드풀 작업 큐에 전달.
        // 두번째 매개변수로 객체를 주면 Future.get()의 결과로 그 객체를 돌려줌
        // 두 비동기 작업의 run 메서드 내부에서 해당 객체에 값을 넣는 로직을 담았기에 의미가 있음
        final Future<ResultDto> future = threadPoolExecutor.submit(job, resultDto);
        final Future<ResultDto> future2 = threadPoolExecutor.submit(job2, resultDto);

        // 두 비동기 작업을 block으로 기다린 뒤에 결과를 출력하는 작업 생성
        final ResultCallback resultCallback = new ResultCallback(future, future2, threadPoolExecutor);

        // 두 비동기 작업을 block으로 기다린 뒤 결과를 출력하는 작업을 쓰레드풀 작업큐에 전달
        threadPoolExecutor.submit(resultCallback);

        // block으로 기다리는 작업도 쓰레드풀에서 다른 스레드로 실행하기에 메인스레드는 바로 이어서 다음 작업을 수행할 수 있음
        System.out.println("명령 전달 완료!");
    }
}
명령 전달 완료! // 다른 스레드들에서 작업이 수행되기에 block되지 않고 즉시 출력됨
sleep for :3000 // 3초가 소요되는 작업 수행 시작
sleep for :5000 // 5초가 소요되는 작업 수행 시작

// 3초 소요 작업 완료 후 결과 출력. 1쌍의 key-value만 저장된 모습
I'm awake after sleep for 3000
putting data key : hello value : world
job done!
current data status
{hello=world} 

// 5초 소요 작업 완료 후 결과 출력. 2쌍의 key-value가 저장된 모습
I'm awake after sleep for 5000
putting data key : nickname value : richard
job done!
current data status
{nickname=richard, hello=world}

// 두 비동기 작업을 취합하는 객체의 출력. get()으로 두 비동기를 block으로 대기했음
Future의 get메서드로 취합된 데이터
{nickname=richard, hello=world}

// 모든 작업 수행 완료 후 쓰레드 풀을 종료함으로써 애플리케이션이 종료되도록 함
I'm shutting down..

Process finished with exit code 0

 

 

 

학습 출처