언어(Programming Language)/Java

[Java] fork/join framework 란?

RyanSin 2023. 12. 9. 16:44
반응형

개요

안녕하세요. 이번 시간에는 fork/join framework에 대해 알아보겠습니다.

 

framework라고 해서 우리가 생각하는 spring framework와 같은 개념은 아닙니다. 

 

기존에 ThreadPool 방식을 사용하는 게 아니라 ForkJoinPool이라는 방식을 사용해서 멀티스레드 작업을 처리합니다.

 

그래서 fork/join이 무엇인지 설명드리겠습니다.

 

개념

Oracle 공식 문서 튜토리얼 내용을 확인하면 다음과 같은 내용이 있습니다.

 

Fork/Join

The fork/join framework is an implementation of the ExecutorService interface that helps you take advantage of multiple processors. It is designed for work that can be broken into smaller pieces recursively. The goal is to use all the available processing power to enhance the performance of your application.

 

As with any ExecutorService implementation, the fork/join framework distributes tasks to worker threads in a thread pool. The fork/join framework is distinct because it uses a work-stealing algorithm. Worker threads that run out of things to do can steal tasks from other threads that are still busy.

 

The center of the fork/join framework is the ForkJoinPool class, an extension of the AbstractExecutorService class. ForkJoinPool implements the core work-stealing algorithm and can execute ForkJoinTask processes.

 

해석

 

Fork/Join framework는 여러 프로세스를 활용할 수 있도록 도와주는 ExecutorService 인터페이스의 구현입니다.

 

실제 ForkJoinPool 클래스는 구조는 아래와 같습니다.

@sun.misc.Contended
public class ForkJoinPool extends AbstractExecutorService {}

public abstract class AbstractExecutorService implements ExecutorService {}

 

음 우리가 ThreadPool을 사용할 때 선언한 ExcutorService 기능은 상속받고 있습니다.

 

그리고 재귀적으로 작업들을 조각조각 나눌 수 있게 설계되었으며, 목표는 사용 가능한 모든 처리 능력을 사용한다라고 설명하고 있습니다.

 

여기서 중요한 내용은 다음과 같습니다.

The fork/join framework is distinct because it uses a work-stealing algorithm
Worker threads that run out of things to do can steal tasks from other threads that are still busy.

 

여러 Thread는 공정하게 작업을 하기 위해서 할 일이 부족한 스레드는 아직 사용 중인 다른 스레드 작업을 훔칠 수 있다는 내용입니다.

 

 

기본적으로 사용하는 방법에 대해서도 설명이 나와있습니다.

Basic Use

The first step for using the fork/join framework is to write code that performs a segment of the work. Your code should look similar to the following pseudocode:

if (my portion of the work is small enough)
  do the work directly
else
  split my work into two pieces
  invoke the two pieces and wait for the results

Wrap this code in a ForkJoinTask subclass, typically using one of its more specialized types, either RecursiveTask (which can return a result) or RecursiveAction.

After your ForkJoinTask subclass is ready, create the object that represents all the work to be done and pass it to the invoke() method of a ForkJoinPool instance.

 

 

위에서 중요한 내용으로는 fork/join framework를 사용할 때 우리가 이전 시간에 Callable과 Runnable처럼 스레드 작업 결과 정보를 알고 싶거나 알지 않아도 되는 상황에 따라 코드를 구현이 필요했습니다.

 

위 내용에서 fork/join framework에서도 스레드 결과 정보를 알고 싶을 때와 알고 싶지 않을 때 사용하는 추상 클래스가 있습니다.

그건 바로 RecursiveTask<V>RecursiveAction입니다.

public abstract class RecursiveTask<V> extends ForkJoinTask<V> {}

public abstract class RecursiveAction extends ForkJoinTask<Void> {}

 

 

기본적인 로직을 의사코드 예시로 설명하고 있습니다.

 

if(나의 작업량이 작다면)

   직접 작업 수행

else 그렇지 않다면

  나의 작업을 두 조각으로 나눈다.

  두 조각을 호출하고 결과를 기다립니다.

 

 

무슨 말인지 의사 코드를 통해 알 수 없습니다. 백문일이 불여일타 지난 시간에 대량 메시지 발송 예시를 기준으로 fork/join framework를 사용해서 메시지 발송 예시를 구현해 보도록 하겠습니다.

 

예시 코드 구현

예시 상황 - 대량의 AppPush 메시지 발송 성공여부 확인

 

간단한 코드를 작성하기 때문에 많이 부족한 부분 참고 부탁드리겠습니다.

 

사용한 클래스

  • AppPushResult - 메시지 발송 결과를 관리하는 클래스
  • AppPushMessageTask - 메시지 발송처리를 관리하는 클래스
  • Main - 실행되는 메인 클래스

실제 메시지 발송처리를 관리하는 클래스를 작성하겠습니다.

package org.example;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RecursiveTask;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class AppPushMessageTask extends RecursiveTask<List<AppPushResult>> {

    private int criticalPoint = 10; //처리 임의 값 설정
    private List<AppPushMessage> appPushMessages;

    public AppPushMessageTask(List<AppPushMessage> appPushMessages) {
        this.appPushMessages = appPushMessages;
    }

    @Override
    protected List<AppPushResult> compute() {
        int size = this.appPushMessages.size();

        if (size <= this.criticalPoint) {
            List<AppPushResult> appPushResults = new ArrayList<>();
            for (AppPushMessage appPushMessage : appPushMessages) {
                appPushResults.add(appPushMessage.sendAppPushMessage("AppPush 메시지 발송"));
            }
            return appPushResults;
        }
        return splitAppPushTasks();
    }

    private List<AppPushResult> splitAppPushTasks() {
        int size = this.appPushMessages.size();
        int middle = size / 2;
        
        List<AppPushMessage> subList1 = appPushMessages.subList(0, middle);
        List<AppPushMessage> subList2 = appPushMessages.subList(middle, size);

        AppPushMessageTask appPushMessageTask1 = new AppPushMessageTask(subList1);
        appPushMessageTask1.fork();

        AppPushMessageTask appPushMessageTask2 = new AppPushMessageTask(subList2);
        List<AppPushResult> resultList1 = appPushMessageTask2.compute();
        List<AppPushResult> resultList2 = appPushMessageTask1.join();

        return Stream
                .concat(resultList1.stream(), resultList2.stream())
                .collect(Collectors.toList());
    }
}

 

 

RecursiveTask 클래스를 상속을 받으면 compute() 메서드를 기본적으로 오버라이드 해야 합니다.

compute() 메서드는 실제 작업을 처리하기 메서드라고 생각하시면 됩니다.

 

코드를 살펴보면 다음과 같습니다. 임계점을 10으로 설정한 this.criticalPoint 값이 10이거나 10보다 작다면 메시지를 발송하고, 그렇지 않다면 다시 작업을 세분화합니다.

 

splitAppPushTasks() 메서드를 확인하면 다음과 같습니다.

  1. 현재 작업 크기를 조회합니다. - size
  2. 현재 작업 리스트에서 발송해야 하는 작업 태스크를 나눕니다.  -  subList1, subList2 (Main Thread가 열심히 작업을 나눕니다.)
  3. 작업 테스크를 나눴어도 임계점 크기보다 크다면 다시 작업을 나눕니다.

 

메시지 발송 후 결과 값을 담는 클래스를 작성하겠습니다.

package org.example;

public class AppPushResult {
    private int status;
    private String message;

    public AppPushResult(int status, String message) {
        this.status = status;
        this.message = message;
    }

    @Override
    public String toString() {
        return "AppPushResult{" +
                "status=" + status +
                ", message='" + message + '\'' +
                '}';
    }
}

 

마지막으로 실제 발송하기 위해 main()메서드에서 작성하도록 하겠습니다.

package org.example;


import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
import java.util.stream.Collectors;

public class Main {

    public static void main(String[] args) {
        List<AppPushMessage> appPushMessages = createAppPushMessage();
        AppPushMessageTask appPushMessageTask = new AppPushMessageTask(appPushMessages);

        List<AppPushResult> invoke = new ForkJoinPool().commonPool().invoke(appPushMessageTask);
        System.out.println("메시지 발송 결과 = " + invoke);
    }

    /**
     * @description 메시지 발송 예시 데이터 생성
     * @return List<AppPushMessage>
     */
    private static List<AppPushMessage> createAppPushMessage() {
        List<AppPushMessage> appPushMessages = new ArrayList<>();
        for (int i = 1; i <= 100; i++) {
            appPushMessages.add(new AppPushMessage(i));
        }
        return appPushMessages;
    }
}

 

메시지 발송 예시 데이터를 100개 생성으로 설정했는데 100개를 20개로 줄여서 결과를 확인해 보겠습니다.

 

실행 결과

결과 정보는 사실 그때그때 다르게 출력됩니다. (Main Thread 작업이 별로 없으면 실제 Main Thread도 메시지를 발송합니다.

만약 그렇지 않고 처리해야 하는 작업 양이 많다면 Sub Thread가 메시지를 발송합니다.)

 

하지만 마지막 메시지 발송 결과는 한 번에 취합해서 출력이 되는 걸 확인할 수 있습니다.

 

이번 시간에는 Fork/Join Framework에 대해 알아봤습니다. 꼭 실습을 통해 학습하신 걸 추천드리겠습니다!!