🙌 쓰레드풀을 이용해 두 비동기 작업 처리 완료 후 후속 처리하기
전체 코드는 여기에서 확인하실 수 있습니다 :)
두 곳에 Ajax 요청을 보낸 뒤, 결과를 취합해 후속 작업을 해야 한다면?
언제 처리가 완료될지 정확히 알 수 없는 두 가지 작업이 있고,
그 작업들이 모두 완료된 이후에 취합하여 후속작업을 처리해야 한다고 가정해보겠습니다.
가령 서비스 레이어에서 두 곳의 엔드포인트에 요청을 보내고,
응답 값을 취합해서 결과에 따른 분기처리가 필요한 상황이라고 생각해볼 수 있을 것 같아요.
요청한 클라이언트에게 즉시 결과를 응답해야 한다면
- 두 요청을 쓰레드 풀 작업 큐에 넣고, 두 Future 객체의 get() 메서드를 이용해 기다린 후 응답한다
이 방법의 장점은 하나의 요청이 응답될 때까지 기다렸다가 다음 요청이 나가는 것이 아니라,
동시에 두 가지 요청이 호출된다는 점 정도입니다.
요청한 클라이언트에게 접수됐다고 응답하고 추후 응답해도 된다면
- 두 요청을 쓰레드 풀 작업 큐에 넣고, 클라이언트에겐 접수됐다고 즉시 응답처리
- 작업 큐에 넣은 두 작업의 Future 객체를 가지고 후속처리 하는 작업을 작업큐에 넣음
- 후속처리 작업큐는 두 Future를 get()으로 가져온 뒤, 클라이언트에게 처리 결과 응답
- 응답하는 과정도 EventListener 기반으로 구현할 수도
위와 같이 시나리오를 상상해봤습니다.
예제 코드에서는 후자를 선택하되, 이벤트 리스너 기반까지 가지는 않았습니다.
컨셉에 대한 소개를 마쳤으니 이제 코드로 넘어가보겠습니다.
public <T> Future<T> submit(Runnable task, T result)
ExecutorService 인터페이스에 정의된 submit 메서드를 사용합니다.
그런데 리턴값이 없는 Runnable 인데도 submit 메서드의 두번째 매개변수로 result T 가 전달되고 있습니다.
이는 Future.get()을 했을 때 T가 응답되도록 정해진 메서드입니다.
그러면 저 T result 를 입맛에 맞게 직접 구현하고,
생성해서 참조값을 Runnable 을 구현한 객체에도 전달하고,
submit 메서드를 호출할때도 전달한다면,
run 메서드 실행 시 처리 결과를 T result에 담도록 구현할 수 있을 것 같습니다.
처리 결과가 담길 ResultDto
import java.util.HashMap;
import java.util.Map;
public class ResultDto {
private Map<String, Object> data;
public ResultDto() {
this.data = new HashMap<>();
}
public void addResult(final String key, final Object value) {
data.put(key, value);
}
public Map<String, Object> getData() {
return data;
}
}
먼저 처리 결과가 담길 Dto를 선언해보겠습니다.
특별한 내용은 없는, Map을 필드로 갖는 일급 컬렉션 객체입니다.
이제 처리 결과를 여기에 담을, Runnable 을 구현한 클래스를 선언해보겠습니다.
비동기 작업을 수행할 클래스 선언
public class Job implements Runnable {
private final ResultDto resultDto;
private final int seconds;
private final String key;
private final Object value;
public Job(final ResultDto resultDto, final int seconds, final String key, final Object value) {
this.resultDto = resultDto;
this.seconds = seconds;
this.key = key;
this.value = value;
}
@Override
public void run() {
System.out.println("sleep for :" + 1000 * seconds);
try {
Thread.sleep(1000L * seconds);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println();
System.out.println("I'm awake after sleep for " + 1000 * seconds);
System.out.println("putting data " + "key : " + key + " value : " + value);
resultDto.addResult(this.key, this.value);
System.out.println("job done!");
System.out.println("current data status");
System.out.println(resultDto.getData());
}
}
생성자 매개변수를 많이 받고 있습니다. 쉽고 빠르게 예제를 만들다 보니 그렇게 됐습니다.
첫번째 매개변수는 가장 중요한, 처리 결과를 담을 ResultDto입니다.
그 이후로는 예제 진행을 위한 매개변수들입니다.
지연을 발생시킬 시간을 의미하는 seconds와 처리 결과를 담을 key, value입니다.
이 클래스를 이용해서 서로 다른 지연 시간을 가진,
서로 다른 처리 결과를 담을 인스턴스를 두 개 만들어서 처리를 진행할 예정입니다.
두 비동기 작업을 취합해서 추가 처리를 할 클래스 선언
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
public class ResultCallback implements Runnable {
private final Future<ResultDto> job1;
private final Future<ResultDto> job2;
private final ThreadPoolExecutor threadPoolExecutor;
public ResultCallback(final Future<ResultDto> job1, final Future<ResultDto> job2,
final ThreadPoolExecutor threadPoolExecutor) {
this.job1 = job1;
this.job2 = job2;
this.threadPoolExecutor = threadPoolExecutor;
}
@Override
public void run() {
try {
final ResultDto resultDto3 = job1.get();
final ResultDto resultDto4 = job2.get();
System.out.println();
System.out.println("Future의 get메서드로 취합된 데이터");
System.out.println(resultDto4.getData());
System.out.println();
System.out.println("I'm shutting down..");
threadPoolExecutor.shutdown();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
}
앞서 선언했던 Job 클래스 인스턴스가 작업 큐에 담긴 결과로 반환되는 두 개의 Future.
그것들을 받아서 처리 완료를 기다린 후 취합해서 추가 처리를 진행할 ResultCallback입니다.
두 개의 Future에 대해 get()을 호출하고 있는데요,
각각의 Future는 서로 다른 Runnable이 작업 큐에 담긴 결과이기 때문에
서로 비동기적으로 수행되고 있습니다.
따라서 job1을 먼저 get할지, job2를 먼저 get할지는 현재 코드 상에선 큰 의미 없습니다.
다만 get이 blocking 메서드이기 때문에, 두 작업이 모두 수행 완료되길 기다린다는 점이 중요합니다.
모두 기다린 뒤엔 데이터를 출력하고, 쓰레드 풀을 종료하는 작업입니다.
메인 애플리케이션
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolApplication {
public static void main(String[] args) {
// 쓰레드 풀 생성
final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
4,
8,
120L,
TimeUnit.SECONDS,
new SynchronousQueue<>()
);
// 두 비동기 작업 결과가 저장될 객체 선언
final ResultDto resultDto = new ResultDto();
// 두 비동기 작업 생성, 작업 결과를 저장할 객체도 함께 전달
final Job job = new Job(resultDto, 3, "hello", "world");
final Job job2 = new Job(resultDto, 5, "nickname", "richard");
// 두 비동기 작업을 스레드풀 작업 큐에 전달.
// 두번째 매개변수로 객체를 주면 Future.get()의 결과로 그 객체를 돌려줌
// 두 비동기 작업의 run 메서드 내부에서 해당 객체에 값을 넣는 로직을 담았기에 의미가 있음
final Future<ResultDto> future = threadPoolExecutor.submit(job, resultDto);
final Future<ResultDto> future2 = threadPoolExecutor.submit(job2, resultDto);
// 두 비동기 작업을 block으로 기다린 뒤에 결과를 출력하는 작업 생성
final ResultCallback resultCallback = new ResultCallback(future, future2, threadPoolExecutor);
// 두 비동기 작업을 block으로 기다린 뒤 결과를 출력하는 작업을 쓰레드풀 작업큐에 전달
threadPoolExecutor.submit(resultCallback);
// block으로 기다리는 작업도 쓰레드풀에서 다른 스레드로 실행하기에 메인스레드는 바로 이어서 다음 작업을 수행할 수 있음
System.out.println("명령 전달 완료!");
}
}
명령 전달 완료! // 다른 스레드들에서 작업이 수행되기에 block되지 않고 즉시 출력됨
sleep for :3000 // 3초가 소요되는 작업 수행 시작
sleep for :5000 // 5초가 소요되는 작업 수행 시작
// 3초 소요 작업 완료 후 결과 출력. 1쌍의 key-value만 저장된 모습
I'm awake after sleep for 3000
putting data key : hello value : world
job done!
current data status
{hello=world}
// 5초 소요 작업 완료 후 결과 출력. 2쌍의 key-value가 저장된 모습
I'm awake after sleep for 5000
putting data key : nickname value : richard
job done!
current data status
{nickname=richard, hello=world}
// 두 비동기 작업을 취합하는 객체의 출력. get()으로 두 비동기를 block으로 대기했음
Future의 get메서드로 취합된 데이터
{nickname=richard, hello=world}
// 모든 작업 수행 완료 후 쓰레드 풀을 종료함으로써 애플리케이션이 종료되도록 함
I'm shutting down..
Process finished with exit code 0
학습 출처