일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 | 31 |
- 스칼라 강좌
- Actor
- Play2 로 웹 개발
- Golang
- hyperledger fabric
- 그라파나
- 스칼라
- 이더리움
- Hyperledger fabric gossip protocol
- play2 강좌
- 엔터프라이즈 블록체인
- Adapter 패턴
- Akka
- 하이브리드앱
- 파이썬 머신러닝
- play 강좌
- 하이퍼레저 패브릭
- 주키퍼
- 파이썬 데이터분석
- akka 강좌
- CORDA
- Play2
- 파이썬 동시성
- 파이썬
- 안드로이드 웹뷰
- 파이썬 강좌
- 스위프트
- 블록체인
- 스칼라 동시성
- 플레이프레임워크
- Today
- Total
HAMA 블로그
JAVA 쓰레드풀 분석 - newFixedThreadPool 는 어떻게 동작하는가? 본문
소프트웨어 엔지니어링에서 풀의 종류는 다양한데요.
쓰레드풀,메모리풀,캐쉬풀,커넥션풀,객체풀 (자바에서 객체풀은 사용을 지양합니다. 메모리를 할당하는 작업이 C/C++보다 빠름) 등등이 있습니다. "풀"어서 말하면 미리 만들어두고 돌려막기로 사용하자 라고 볼 수 있는데요. 미리 만들어 두는 방식 / 쓰레드가 태스크를 처리하는 방식에 따라서 다양한 풀의 구현체들이 있을 수 있습니다. 이 글에서는 openJDK8 기준의 자바에서 구현된 newFixedThreadPool 를 해부해보도록 하겠습니다.
쓰레드풀은 동일하고 서로 독립적인 다수의 작업을 실행 할 때 가장 효과적이다.실행 시간이 오래 걸리는 작업과 금방 끝나는 작업을 섞어서 실행하도록 하면 풀의 크기가 굉장히 크지 않은 한 작업 실행을 방해하는 것과 비슷한 상황이 발생한다. 또한 크기게 제한되어 있는 쓰레드 풀에 다른 작업의 내용에 의존성을 갖고 있는 작업을 등록하면 데드락이 발생할 가능성이 높다. 다행스컯게도 일반적인 네트웍 기반의 서버 어플리케이션 (웹서버,메일서버,파일서버등)은 작업이 서로 동일하면서 독립적이어야 한다는 조건을 대부분 만족한다. - Java concurrency in practice 책 발췌
전설의 개발자들이 참여한 저 엄청난 책은 자신들이 참여한 java.util.concurrent 패키지를 기준으로 한다. 근데 이 책이 2006년에 JDK1.5 기준으로 쓰여졌기 때문에 "동일하고 서로 독립적인 다수의 작업" 이라고 한정지었는데, 후에 JDK1.7에서 FORK-JOIN 풀이 나오면서 기존 풀들에서는 할 수 없었던 분할 수행작업을 효율적으로 할 수 있는 보완재가 되었다.
1.newFixedThreadPool 사용법 (자바8기준)
execute 메소드
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; public class SimpleThreadPool { public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(10); IntStream.range(0, 10).forEach( n -> executor.execute( () ->
{
try {
TimeUnit.MILLISECONDS.sleep(300);
String threadName = Thread.currentThread().getName();
System.out.println("Hello " + threadName);
} catch (InterruptedException e) {
e.printStackTrace();
}
})
); } }
- 쓰레드풀에 쓰레드를 10개를 뛰어놀게 한다.
- 10번을 반복해서 쓰레드풀에 일을 시킨다. (쓰레드풀안의 쓰레드 하나가 선택되어 일처리를 할것이다)
- execute 메소드를 사용했다. 이 메소드는 void 를 리턴한다. 즉 일처리를 시키기만 하지 결과를 보고받지 않을것이다.
- 구현내용은 300밀리초를 기다렸다가 Hello 쓰레드이름을 출력하는 것이다.
- 자바8부터는 for(int i = 0; i < 10; i++) 보다는 저렇게 사용하는게 좋다.
submit 메소드
import java.util.Arrays; import java.util.List; import java.util.concurrent.*; public class SimpleThreadPool {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(2);
final List<Integer> integers = Arrays.asList(1,2,3,4,5);
Future<Integer> future = executor.submit(() -> {
TimeUnit.MILLISECONDS.sleep(5000);
int result = integers.stream().mapToInt(i -> i.intValue()).sum();
return result;
});
try {
Integer result = future.get();
System.out.print("result: " + result);
executor.shutdownNow();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
} }
- 쓰레드풀에 쓰레드를 10개를 뛰어놀게 한다.
- 쓰레드풀에 하나의 일을 시킨다. (쓰레드풀안의 쓰레드 하나가 선택되어 일처리를 할것이다)
- submit 메소드를 사용했다. 이 메소드는 future 를 리턴한다. 즉 일처리를 시키기고, 그에 따른 결과를 보고받을 것이다.
- 구현내용은 5000밀리초를 기다렸다가 리스트안의 숫자들의 합을 리턴한다.
- 리턴 받은 future 로 부터 값을 얻는다. 여기서 get()메소드는 블럭된다. (타임아웃을 매개변수로 넣을 수도 있다) 아마 일처리를 하는 쪽의 쓰레드에서 일을 다 끝내고 set() 같은 것을 해 줄 때가지 블럭될거 같다.
이제 사용법은 알았으니 과연 쓰레드풀을 자바에서 어떻게 구현하고 있는지 확인 해보자.
@어떻게 10개의 쓰레드를 가진 풀을 만드는지? (newFixedThreadPool(10))
@어떻게 풀에서 쓰레드 하나를 할당해 주는지? (execute)
@쓰레드풀에서 할당 될 워커(쓰레드)가 없을 때 태스크가 어떻게 되는지?
@어떻게 future 를 리턴해주는지. (submit)
2.newFixedThreadPool 소스 분석(자바8기준)
* 참고로 지면이 한정된 관계로 예외처리 포함해서 여러 로직들이 생략되었다.
(발췌: http://codepumpkin.com/threadpool-using-executor-framework/)
2-1. 어떻게 10개의 쓰레드를 가진 풀을 만드는지? (newFixedThreadPool(10))
public class Executors {
java/util/concurrent/Executors.java 라는 파일에는 Executors 라는 클래스가 있으며, 정적메소드로써 newFiexedThreadPool 이라는 것을 제공한다. 매개변수는 쓰레드풀에 들어갈 쓰레드들의 개수를 넣어준다.
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
퍼사드 역할 (Gof의 패턴중 하나로 내부 시스템의 복잡함을 감추고 사용자가 간편히 사용하도록 하는 의도)
을 하고있다. 그저 팩토리로써 내부의 ThreadPoolExecutor 객체를 대신 만들어주고 있는데, 매개변수로는 순서대로 살펴보자.
- corePoolSize : 풀 안에 유지되는 쓰레드 개수 (시작시)
- maximumPoolSize : 풀에 유지되는 최대 쓰레드 개수
- keepAliveTime : corePoolSize 보다 쓰레드 개수가 많아 질 때, 새로운 테스크를 기다리기 위한 시간. 시간이 지나면 쓰레드를 없애서 corePoolSize 를 유지한다.
- unit : keepAliveTime 시간단위
- workQueue : 실행 되기전에 홀드시켜 두는 태스크를 유지하는 큐. 쓰레드가 남지 않을 경우 여기 태스크를 넣는다.
시작개수가 1이고 최대개수가 1이면 newSingleThreadExecutor 이 되는것이고,
시작개수가 0이고 최대개수가 MAX_VALUE 가 되면, newCachedThreadPool 풀이라 한다.
우리는 최소,최대가 10개로 고정된 쓰레드풀을 만든 것이다.
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
java/util/concurrent/ThreadPoolExecutor.java 라는 파일에는 ThreadPoolExecutor라는 클래스가 있으며, 들어온 매개변수에 추가로 Executors.defaultThreadFactory()와 defaultHandler 를 추가 매개변수로 받아서 새 객체를 만든다.
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
Executors.defaultThreadFactory()
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
return t;
}
}
- 비데몬 쓰레드로 만든다. (메인쓰레드가 종료되도 종료되지 않고 살아있는)
public void execute(Runnable command) {
int c = ctl.get(); // 1 단계
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
}
// 2 단계
.... 생략 .... }
private boolean addWorker(Runnable firstTask, boolean core) {
.. 상태체크 부분 생략 ..
Worker w = null;
w = new Worker(firstTask); // (1)
f
inal Thread t = w.thread;
if (t != null) {
workers.add(w); // (1)
t.start();
workerStarted = true;
}
return workerStarted; }
w = new Worker(firstTask); // (1)
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
여기서 눈여겨 봐야 할 것이, 워커객체는 쓰레드를 참조하고 있으며, 역으로 쓰레드에도 this (워커객체)가 매개변수로 들어가는 것인데, 쓰레드는 워커객체 this 를 통해서 할당된 태스크에 대한 처리를 하리라는 것을 알 수 있다. 즉 쓰레드에 태스크가 직접적으로 할당되는 것이 아니라, 워커객체라는 대리자를 통해서 관리되는 것.
workers.add(w);
t.start();
public void execute(Runnable command) {
// 1단계
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2단계
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
2단계에서는 이제 더 이상 처리할 워커(스레드)가 없을 경우에 대해서 처리하는데, workerQueue.offer(command) 즉 워커큐에 태스크를 넣어주고 리턴한다.
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
while (task != null || (task = getTask()) != null) {
... 생략 ...
task.run();
... 생략 ...
}
}
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
.. 생략 ..
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
public <T> Future<T> submit(Callable<T> task) {
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
public FutureTask(Callable<V> callable) {
this.callable = callable;
this.state = NEW; // ensure visibility of callable }
참고로 RunnableFuture 는 Runnable 인터페이스를 상속받았기 때문에 문제없다.
public void execute(Runnable command) {
int c = ctl.get(); //1 단계
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//2 단계
.... 생략 .... }
public void run() {
runWorker(this); }
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
try {
task.run();
} finally {
processWorkerExit(w, completedAbruptly);
} }
task.run();
public void run() {
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} .... }
if (ran)
set(result);
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L); // awaitDone 에서 블럭되고 있음을 알수 있다. 하염없이 기다림
return report(s);
}
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // RUNNING 상태와 쓰레드 개수1로 초기화
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 00011111 11111111 11111111 11111111
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS; // 11100000 00000000 00000000 00000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
private static int ctlOf(int rs, int wc) { return rs | wc; } // 2가지 상태를 하나의 Int 로 합쳐주는 함수
첫째. workerCount - 현재 워커(쓰레드) 의 개수
workerCount 는 0 이고 runState 는 RUNNING 이라는 것이다. 앞쪽비트는 runState 로 사용하고, 뒤쪽 비트는 workerCount 로 사용 한다는 것인데, 맞는지 확인 해보자.
(1<< 29) - 1 = 2^29 -1 = 536870911 = 0x1FFFFFFF = 00011111 11111111 11111111 11111111
private static final int COUNT_BITS = Integer.SIZE - 3; // 32-3 = 29
private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 00011111 11111111 11111111 11111111
private static int workerCountOf(int c) { return c & CAPACITY; } // 현재 워커 갯수는?
int c = ctl.get();
int ws = workerCountOf(c); // ws 는 0
참고:
http://tutorials.jenkov.com/java-util-concurrent/executorservice.html
http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/classes/java/util/concurrent/Executors.java
'Java' 카테고리의 다른 글
초보의 좌충우돌 한글 인코딩 (euc-kr 에서 UTF-8) 삽질기 (0) | 2020.10.11 |
---|---|
JAVA 쓰레드풀의 상태관리 (0) | 2018.03.08 |
Java 에서 리소스 관리하기 ARM&EAM 패턴 (feat.Scala) (0) | 2018.03.05 |
[Java 8] 인터페이스 vs 추상 클래스 (0) | 2016.09.30 |
[옛날 Java] 인터페이스 vs 추상클래스 (0) | 2016.09.14 |