자바 Concurrent 소개

Concurrent 소프트웨어

  • 동시에 여러 작업을 할 수 있는 소프트웨어
  • Ex) 웹 브라우저로 유튜브를 보면서 키보드로 문서에 타이핑을 할 수 있다.

 

자바에서 지원하는 Concurrent 프로그래밍

  • 멀티프로세싱 (ProcessBuilder)
  • 멀티쓰레드

 

자바 멀티쓰레드 프로그래밍

  • Thread / Runnable

 

Thread 상속

    public static void main(String[] args) {
        HelloThread helloThread = new HelloThread();
        helloThread.start();
        System.out.println("Main : " + Thread.currentThread().getName());
    }

    static class HelloThread extends Thread {
        @Override
        public void run() {
            System.out.println("Thread : " + Thread.currentThread().getName());
        }
    }

Main이 먼저 출력 될 수도 있고, Thread가 먼저 출력될 수도있다. 

 

Runnable 구현 또는 람다

Thread thread = new Thread(() -> System.out.println("Thread : " + Thread.currentThread().getName()));
thread.start();
System.out.println("Main : " + Thread.currentThread().getName());

Runnable 또한 Main이 먼저 출력 될 수도 있고, Thread가 먼저 출력될 수도있다. 

 

Thread 주요 기능

  • 현재 쓰레드 멈춰두기(sleep)
    • 다른 쓰레드가 처리할 수 있도록 기회를 주지만 그렇다고 lock을 놔주진 않는다. (==>> 잘못하면 Deadlock (교착상태)에 빠짐)
  • 다른 쓰레드 깨우기 (Interrupt)
    • 다른 쓰레드를 꺠워서 InterruptedException을 발생시킨다. 그 에러가 발생했을 때 할 일은 코딩하기 나름. 종료시킬 수도 있고, 계속 하던일을 할 수도 있다.
  • 다른 쓰레드 기다리기 (join)
    • 다른 쓰레드가 끝날 때까지 기다린다.
public static void main(String[] args) throws InterruptedException {
    // sleep, interrupt
    Thread thread = new Thread(() -> {
        while(true) {
            System.out.println("Thread : " + Thread.currentThread().getName());

            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                // 자는 동안 누군가가 깨우면 error
                System.out.println("exit!!");
                return;
                //e.printStackTrace();
            }
        }
    });
    thread.start();

    System.out.println("Main : " + Thread.currentThread().getName());
    Thread.sleep(3000L);
    thread.interrupt();
    
    
    // join
    Thread thread = new Thread(() -> {
        System.out.println("Thread : " + Thread.currentThread().getName());

        try {
            Thread.sleep(3000L);
        } catch (InterruptedException e) {
            // 자는 동안 누군가가 깨우면 error
            System.out.println("exit!!");
            return;
            //e.printStackTrace();
        }
    });
    thread.start();

    System.out.println("Main : " + Thread.currentThread().getName());
    thread.join();      // thread가 끝날 때까지 기다린 다음 아래 출력
    System.out.println(thread + " is finished");

}

Thread가 두개만 되도 복잡한데 Thread가 여러개면 어떻게 될까?? 상상만해도 너무 복잡할 것 같다..

 

 

 

Executors

고수준 Concurrency 프로그래밍

  • 쓰레드를 만들고 관리하는 작업을 애플리케이션에서 분리
  • 위의 기능을 Executors에게 위임

 

Executors가 하는 일

  • 쓰레드 만들기
    • 애플리케이션이 사용할 쓰레드 풀을 만들어 관리한다.
  • 쓰레드 생명 주기 관리
  • 작업 처리 및 실행
    • 쓰레드로 실행할 작업을 제공할 수 있는 API를 제공

ExecutorService를 사용한 코드
실행한 결과

실행한 결과를 보면, main쓰레드가 아닌 미리 만들어놓은 쓰레드 풀에서 가져온 것을 확인할 수 있다. 특이한 점은 shutdown을 해주기전까지 쓰레드는 종료되지 않는다(당연히 프로세스도 죽지 않음)는 점이다.

 

주요 인터페이스

  • Executor
    • execute(Runnable)
  • ExecutorService
    • Executor 상송 받은 인터페이스로, Callable도 실행할 수 있으며, Executor를 종료시키거나 여러 Callable을 동시에 실행하는 등의 기능을 제공
  • ScheduleExecutorService
    • ExecutorService를 상속받은 인터페이스로 특정 시간 이후에 또는 주기적으로 작업을 실행할 수 있다.

 

ExecutorService로 작업 실행하기

ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(() -> {
    System.out.println("Hello :" + Thread.currentThread().getName());
});

 

ExecutorService로 멈추기

executorService.shutdown(); // 처리중인 작업 기다렸다가 종료
executorService.shutdownNow(); // 당장 종료

 

ScheduledExecutorService로 5초 뒤에 작업 실행하기

ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
scheduledExecutorService.schedule(()-> System.out.println(Thread.currentThread().getName()),
        5, TimeUnit.SECONDS);
scheduledExecutorService.shutdown();

 

Fork/Join 프레임워크

  • ExecutorService의 구현체로 손쉽게 멀티 프로세서를 활용할 수 있게끔 도와준다.

 

지금까지 사용한 Runnable은 기본적으로 return이 void로 반환 값이 없다. 만약, 반환을 받고싶으면 Callable을 사용해야 한다.

 

 

Callable과 Future

Callable

  • Runnable과 유사하지만 작업의 결과를 받을 수 있다. 

Future

  • 비동기적인 작업의 현재 상태를 조회하거나 결과를 가져올 수 있다.

 

결과를 가져오기 get()

ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> helloFuture = executorService.submit(() -> {
    Thread.sleep(2000L);
    return "Callable";
});
System.out.println("Hello");
String result = helloFuture.get();	//결과 값을 가져올 때까지 기다림  ==>> 블록킹
System.out.println(result);
executorService.shutdown();
  • 블록킹 콜이다.
  • 타임아웃(최대한으로 기다릴 시간)을 설정할 수 있다.

 

작업 상태 확인하기

  • isDone()
    • 완료했으면? true:false를 return

 

작업 취소하기

  • cancel()
    • 취소했으면? true:false를 return
    • parameter로 true를 전달하면 현재 진행중인 쓰레드를 interrupt하고 그러지 않으면 현재 진행중인 작업이 끝난 뒤 취소한다.

 

여러 작업 동시에 실행하기

  • invokeAll()
    • 동시에 실행한 작업 중에 제일 오래 걸리는 작업만큼 시간이 걸린다.

 

여러 작업 중에 하나라도 먼저 응답이 오면 끝내기 

  • invokeAny()
    • 동시에 실행한 작업 중에 가장 짧게 걸리는 작업만큼 시간이 걸린다.
    • 블록킹 콜이다.

 

 

CompletableFuture

자바에서 비동기(Asynchronous) 프로그래밍을 가능하게하는 인터페이스

  • Future를 사용해서 어느정도 가능했지만 하기 힘들 일들이 많다고 한다.

 

Future로 하기 어렵던 작업

  • Future를 외부에서 완료시킬 수 없다. 취소하거나 get()에 타임아웃을 설정할 수는 있따.
  • 블로킹 코드인 get()을 사용하지 않고서는 작업이 끝났을 때 콜백을 실행할 수 없다.
  • 여러 Future를 조합할 수 없다.
    • Ex) Event 정보 가져온 다음 Event에 참여하는 회원 목록 가져오기
  • 예외 처리용 API를 제공하지 않는다.

 

CompletableFuture

 

비동기로 작업 실행하기

  • 리턴 값이 없는 경우
    • runAsync()
  • 리턴 값이 있는 경우
    • supplyAsync()
  • 원하는 Executor(쓰레드풀)를 사용해서 실행할 수도 있다.
    • 기본은 ForkJoinPool.commonPool()
// 리턴 X
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    System.out.println("Not Return : " + Thread.currentThread().getName());
});
future.get();       // Not Return : ForkJoinPool.commonPool-worker-3

// 리턴 O
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    return "Return : " + Thread.currentThread().getName();
});
System.out.println(future2.get());      // Return : ForkJoinPool.commonPool-worker-3

 

콜백 제공

  • thenApply(Function)
    • 리턴 값을 받아서 다른 값으로 바꾸는 콜백
  • thenAccept(Consumer)
    • 리턴 값을 또 다른 작업을 처리하는 콜백 (리턴 X)
  • thenRun(Runnable)
    • 리턴 값 받고 다른 작업을 처리하는 콜백
  • 콜백 자체를 또 다른 쓰레드에서 실행할 수 있다.
// 리턴 값을 받아서 다른 값으로 바꾸는 콜백
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    return "Return : " + Thread.currentThread().getName();
}).thenApply((s) -> {   // 이것이 future와 다른 점
    System.out.println(s);  // Return : ForkJoinPool.commonPool-worker-3
    return s.toUpperCase();
});
System.out.println(future.get());   // RETURN : FORKJOINPOOL.COMMONPOOL-WORKER-3


// 리턴 값을 또 다른 작업을 처리하는 콜백 (리턴 X)
CompletableFuture<Void> future2 = CompletableFuture.supplyAsync(() -> {
    return "Return : " + Thread.currentThread().getName();
}).thenAccept((s) -> {
    System.out.println(s.toUpperCase());  // Return : ForkJoinPool.commonPool-worker-3
});

// 리턴 값 받고 다른 작업을 처리하는 콜백
CompletableFuture<Void> future3 = CompletableFuture.supplyAsync(() -> {
    return "Return : " + Thread.currentThread().getName();
}).thenRun(() -> {
    System.out.println("thenRun");
});

 

조합하기

  • thenCompose()
    • 두 작업이 서로 이어서 실행하도록 조합
  • thenCombine()
    • 두 작업을 독립적으로 실행하고 둘 다 종료 했을 때 콜백 실행
  • allOf()
    • 여러 작업을 모두 실행하고 모든 작업 결과에 콜백 실행
  • anyOf()
    • 여러 작업 중에 가장 빨리 끝난 하나의 결과에 콜백 실행

 

예외처리

  • Exceptionally(Function)
  • handle(BiFunction)
// exceptionally
boolean throwError = true;
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
    if (throwError) {
        throw new IllegalArgumentException();
    }

    System.out.println("Hello " + Thread.currentThread().getName());
    return "Hello";
}).exceptionally(ex -> {
    return "Error!";
});

System.out.println(hello.get());

// handle
boolean throwError2 = true;
CompletableFuture<String> hello2 = CompletableFuture.supplyAsync(() -> {
    if (throwError2) {
        throw new IllegalArgumentException();
    }

    System.out.println("Hello " + Thread.currentThread().getName());
    return "Hello";
}).handle((result,ex) -> {
    if(ex != null) {
        return "Error!";
    }
    return result;
});

System.out.println(hello2.get());

 

 

 

'JAVA' 카테고리의 다른 글

[Java 8] Metaspace  (0) 2022.10.18
[JAVA 8] ParallelSort  (0) 2022.10.15
[Java 8] Date와 Time  (0) 2022.10.12
[Java 8] Optional  (0) 2022.10.07
[JAVA 8] Stream  (0) 2022.10.06

+ Recent posts