개발이 취미인 사람

[Java] Callable과 Runnable 및 ExecutorService 그리고 Executors, Executor 본문

언어(Programming Language)/Java

[Java] Callable과 Runnable 및 ExecutorService 그리고 Executors, Executor

RyanSin 2023. 12. 6. 21:54
반응형

개요

안녕하세요. 이번 시간에는 Callable과 Runnable에 대해 알아보겠습니다. 그리고 더 나아가 ExecutorSerivce, Executors, Executor까지 알아보는 시간을 갖겠습니다.

 

기본적으로 Runnable에 대해 잘 모르시는 분들은 아래 링크를 통해 학습하고 오시는 걸 추천드리겠습니다.

[Java] Thread 클래스와 Runnable 인터페이스 개념 및 사용법

 

[Java] Thread 클래스와 Runnable 인터페이스 개념 및 사용법

package Access; //사람 스래드 public class Person extends Thread { @Override public void run() { for (int i=0; i< 10; i++){ System.out.println("Sub Thread 일 시작: "+ i); } } } - 지난 시간 안녕하세요. 지난 시간에는 자바 인터페

any-ting.tistory.com

 

개념

Callable과 Runnable은 둘 다 Tread와 관련이 있습니다. 둘의 차이는 결과를 반환하고 안 하고에 차이가 있습니다.

 

간단하게 코드를 살펴봐도 다음과 같이 알 수 있습니다.

Runnable runnable = new Runnable() {
    @Override
    public void run() {
        System.out.println("runnable 실행" );
    }
};

runnable.run();


Callable<String> callable = new Callable() {
    @Override
    public String call() throws Exception {
        System.out.println("callable 실행");
        return "callable 실행";
    }
};

try {
    callable.call();
} catch (Exception e) {
    throw new RuntimeException(e);
}

 

run() 메서드와 call() 메서드를 호출하면 콘솔 로그가 출력되는 걸 확인할 수 있습니다.

하지만 위 코드처럼 사용하지 않고 실제 사용할 때는 Thread를 만들어서 run()과 call() 메서드 로직을 수행합니다.

 

Tread를 생성해서 사용할 때 이전 포스팅에서 다음과 같이 생성해서 사용했습니다.

Runnable runnable = new Runnable() {
    @Override
    public void run() {
        System.out.println("runnable 실행");
    }
};

Thread thread1 = new Thread(runnable);
thread1.start();

 

Callable을 사용할 때는 Thread 클래스를 사용할 수 없습니다. Thread 클래스는 사실 Runnable 인터페이스를 구현받아 사용하고 있기 때문입니다.

Callable과 Thread를 함께 사용하기 위해서는 ExecutorService를 활용해야 합니다.

Callable<String> callable = new Callable() {
    @Override
    public String call() throws Exception {
        System.out.println("callable 실행");
        return "callable 실행";
    }
};

ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(callable);

// ExecutorService 종료
executorService.shutdown();

 

실제 submit 메서드를 호출할 때 Callable이 아닌 Runnable을 사용할 수 있습니다.

ExecutorService란?

ExecutorService는  실제 Excutor 인터페이스를 확장한 인터페이스입니다. Thread Pool을 관리하는 데 사용합니다.

 

ExecutorService는 다음과 같은 메서드를 제공합니다.

public interface ExecutorService extends Executor {
    void shutdown();
    List<Runnable> shutdownNow();
    boolean isShutdown();
    boolean isTerminated();
    Future<?> submit(Runnable task);
    <T> Future<T> submit(Callable<T> task);
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

 

  • shutdown() : 스레드 풀을 정상적으로 종료합니다.
  • shutdownNow() : 스레드 풀을 즉시 종료하려고 시도하며 실행 중인 작업을 중단시킵니다.
  • isShutdown() : 현재 스레드 풀이 종료 여부를 반환합니다. → 종료된 경우 true를 반환합니다.
  • isTerminated() : shutdown 후에 모든 작업이 완료된 경우 true를 반환합니다. shutdown 또는 shutdownNow 가 먼저 호출되지 않는 한 isTerminated는 절대 true 가 아니다.
  • submit(Runnable task) & submit(Callable<T> task) : 작업을 제출하고 해당 작업의 결과를 나타내는 Future 객체를 반환합니다.
  • invokeAll, invokeAny : 여러 작업을 동시에 실행하고 그 결과를 다루는 메서드입니다.

Excutor 인터페이스는 다음과 같이 메서드를 제공합니다.

public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}

 

Executor 인터페이스는 작업을 비동기적으로 실행하기 위한 최상위 인터페이스이며, 하나의 단일 작업을 실행하는 간단한 인터페이스입니다.

 

Thread Pool은 무엇일까요?...

Thread Pool이란 Thread를 new 키워드를 통해 그때그때 마다 생성하는 방식이 아닌 Thread를 미리 생성하고 관리하고 필요에 따라 Thread를 호출해서 사용하고 다 사용하고 나면 다시 Thread를 Pool의 관리하여 사용하는 방식을 말합니다.

 

자바에서 new 키워드를 통해 객체를 생성하는 비용이 많이 들기 때문에 필요에 따라 Thread Pool을 사용하는 게 좋을 수 있습니다.

(위 코드 Executors.newSingleThreadExecutor(); 는 여러 Thread를 생성하는 방식이 아닌 싱글 Thread를 생성합니다.)

 

ExcutorService는 ThreadPool를 관리한다고 했습니다. 그럼 ThreadPool을 생성은 누가 할까요? Java에 익숙하신 분들은 눈치를 채셨을 거라고 생각합니다. 바로 Executors 클래스를 사용하고 있습니다.

 

Executors 클래스를 확인하면 대표적으로 다음과 같이 메서드를 제공합니다.

  • newFixedThreadPool(int) : 인자 개수만큼 고정 된 스레드풀을 생성합니다.
  • newCachedThreadPool(): 필요할 때, 필요한 만큼 쓰레드풀을 생성한다. 이미 생성된 스레드를 재활용할 수 있기 때문에 성능상의 이점이 있을 수 있다.
  • newScheduledThreadPool(int): 일정 시간 뒤에 실행되는 작업이나, 주기적으로 수행되는 스레드풀을 인자 개수만큼 생성한다.
  • newSingleThreadExecutor(): 단일 쓰레드인 풀을 생성한다. 단일 스레드에서 동작해야 하는 작업을 처리할 때 사용한다.
  • newWorkStealingPool(int parallelism): 시스템에 가용 가능한 만큼 스레드를 활용하는 ExecutorService를 생성한다.

newWorkStealing는 ThreadPool이 아닌 ForkJoinPool을 사용해서 생성합니다. ForkJoinPool은 다른 포스팅에 설명하도록 하겠습니다.

 

위 메서드 말고도 Executors 클래스는 다양한 메서드를 제공합니다! 너무 많은 내용일 다룰 수 없어서 공식 홈페이지 내용을 참고 부탁드리겠습니다.

 

예시 코드 구현

백문일이 불여일타 위 내용을 기반으로 예시 코드를 작성하도록 하겠습니다.

 

예시 상황 - 대량의 AppPush 메시지 발송 성공여부 확인

 

간단한 코드를 작성하기 때문에 많이 부족한 부분 참고 부탁드리겠습니다.

 

사용한 클래스

  • AppPushResult - 메시지 발송 결과를 관리하는 클래스
  • AppPushMessageTask - 메시지 발송처리를 관리하는 클래스
  • Main - 실행되는 메인 클래스

 

기본적으로 실제 메시지를 발송하기 위한 클래스를 작성하겠습니다.

package org.example;

import java.util.concurrent.Callable;

public class AppPushMessageTask implements Callable<AppPushResult> {

    private Integer id;
    private String message;

    public AppPushMessageTask(Integer id, String message) {
        this.id = id;
        this.message = message;
    }

    /**
     * @author Ryan
     * @description 메시지 발송
     *
     * @return 발송 결과
     */
    @Override
    public AppPushResult call() {

        System.out.println("Thread: " + Thread.currentThread().getName() + ", 발송 메시지 아이디: " + this.id + " 발송 메시지 내용:  " + this.message);
        int status = this.id == 9 ? 200 : 500;
        String message = this.id == 9 ? "발송 성공" : "발송 실패";

        return new AppPushResult(status, message);
    }
}

 

 

메시지 발송 후 결과 값을 담는 클래스를 작성하겠습니다.

package org.example;

public class AppPushResult {
    private int status;
    private String message;

    public AppPushResult(int status, String message) {
        this.status = status;
        this.message = message;
    }

    @Override
    public String toString() {
        return "AppPushResult{" +
                "status=" + status +
                ", message='" + message + '\'' +
                '}';
    }
}

 

우리는 메시지 발송 후 외부 API 서버로부터 결과 응답 받는다고 가정합니다.

  • AppPushResult - 결과 정보 저장
  • AppPushMessageTask - 하나의 메시지 발송 작업

우리는 메시지 발송 완료 후 결과 값을 핸드링 해야 하기 때문에 Callable 인터페이스를 사용했습니다.

 

그럼 실제 발송하기 위해 main()메서드에서 작성하도록 하겠습니다.

 

package org.example;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class Main {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // 해당 메서드 내부에 Runtime.getRuntime().availableProcessors()를 사용한다.(OS CPU 프로세스 개수 만큼 스레드를 생성한다.)
        ExecutorService executorService = Executors.newWorkStealingPool();

        //발송 메시지를 생성합니다. (실제는 발송 대기열의 등록된 메시지를 조회 후 발송한다고 생각합니다.)
        List<AppPushMessageTask> appPushMessageTaskList = new ArrayList<>();
        for (int i = 1; i <= 10; i++) {
            appPushMessageTaskList.add(new AppPushMessageTask(i, "메시지 내용"));
        }

        System.out.println("-------작업 실행------ ");
        List<Future<AppPushResult>> futures = executorService.invokeAll(appPushMessageTaskList);
        System.out.println("-------작업 종료------ \n");

        System.out.println("-------작업 결과 ----- ");
        for (Future<AppPushResult> future : futures) {
            System.out.println("future = " + future.get());
        }

        //작업 완료 후 서비스 종료
        executorService.shutdown();
    }
}

 

위 코드를 살펴보면 다음과 같은 프로세스로 진행됩니다.

  1. PC OS CPU 개수만큼 Thread Pool에 Thread를 생성 - ExecutorService executorService
  2. 실제 존재하는 메시지가 아닌 임시 메시지 발송 데이터를 생성 - List<AppPushMessageTask> appPushMessageTaskList
  3. 메시지 발송 작업 시작 - executorService.invokeAll(appPushMessageTaskList)
  4. 결과 확인 - future.get();

 

그럼 실행 결과를 확인해 볼까요?

실행 결과

 

참고로 저는 M1 Pro 10코어 스펙 PC를 사용하고 있습니다.

코어수 확인하기
- 명령어
sysctl hw.physicalcpu hw.logicalcpu

- 결과
hw.physicalcpu: 10
hw.logicalcpu: 10

 

 

10개의 Thread가 생성되었으며 또한 10개의 메시지를 발송했습니다. 그리고 마지막 500 Error 메시지를 확인할 수 있습니다.

 

또한 해당 콘솔 정보를 확인하면 비동기적으로 작업을 실행하고 있습니다.

 

이번 시간에는 좀 길었지만 Callable과 Runnable 그리고 ExecutorService, Executors, Executor, ThreadPool에 대해 알아봤습니다.

백문이 불여일타 꼭!! 실습을 통해 학습 하시는 걸 추천드리겠습니다.