관리 메뉴

HAMA 블로그

자바 Concurrent 라이브러리 정리 본문

Java

자바 Concurrent 라이브러리 정리

[하마] 이승현 (wowlsh93@gmail.com) 2015. 8. 31. 10:45


Executors 유틸리티 클래스를 이용하여 각종  쓰레드 풀 생성하기 


ExecutorService =  Executors.newFixedThreadPool(int nThreads)

최대 지정한 개수 만큼의 쓰레드를 가질 수 있는 쓰레드 풀을 생성한다. 실제 생성되는 객체는 ThreadPoolExecutor 객체이다.

항상 일정한 스레드 개수를 유지한다. 스레드가 유휴상태이더라도 제거하지 않고 유지한다.

다만 작업도중 비정상적으로 스레드가 종료하는 경우에는 스레드를 추가로 생성하며, nThreads 개수보다 1개가 더 생길 수

도 있다.


ScheduledExecutorService  =  Executors.newScheduledThreadPool(int corePoolSize)

지정한 개수만큼 쓰레드가 유지되는 스케줄 가능한 쓰레드 풀을 생성한다. 실제 생성되는 객체는

ScheduledThreadPoolExecutor 객체이다.



ExecutorService = Executors.newSingleThreadExecutor()

하나의 쓰레드만 사용하는 ExecutorService를 생성한다.

항상 1개의 스레드만 동작한다. 따라서 스레드가 동작중일 경우 나머지 작업은 모두 큐에서 대기하며, 순서대로 하나씩 실

행된다. 만약 비정상적으로 스레드가 종료되는 경우, 새로 스레드를 생성하고 남은 작업을 계속 한다.


ScheduledExecutorService = Executors. newSingleThreadScheduledExecutor()

하나의 쓰레드만 사용하는 ScheduledExecutorService를 생성한다.일정 시간 이후에 실행되거나 주기적으로 작업을 실행할 

수 있으며, 스레드의 수가 고정되어 있는 형태의 Executor.Timer  클래스의 기능과 유사하다 


ExecutorService = Executors.newCachedThreadPool()

필요할 때 마다 쓰레드를 생성하는 쓰레드 풀을 생성한다. 이미 생성된 쓰레드의 경우 재사용된다.

실제 생성되는 객체는 ThreadPoolExecutor 객체이다. 
스레드 개수에 제한이 없이 필요한 경우 계속 스레드 수가 증가한다.

다만 일정 시간(60초)동안 사용하지 않는(idle) 스레드는 종료된다.

필요없는 스레드를 제거하므로 서버 리소스(memory)는 적게 사용하지만, 스레드 생성과 삭제를 반복하므로 작업 부하가 

불규칙적인 경우 비효율적이다. 



(http://javacan.tistory.com/124  상세설명) 





CopyOnWriteArrayList 

CopyOnWrite 가 말해주는것처럼 read (select) 시는 아무런 동기화 문제가 없기때문에 놔두고 

변경이 일어날경우 객체를 clone 해서 다루자는 전략입니다. 따라서 읽기행위가 많이 일어나는 

곳에서 사용하기 좋습니다. 


BlockingQueue 

보통 생산자 - 소비자 패턴에서 활용되는 큐로 많이 사용된다. 사실 이야기는 이 큐는 멀티쓰레드환경에서 

대표할만한 컬렉션이라는 것이다. 전에 Actor / Akka 문서에 말한 큐같은것들이 대부분 이것으로 이루어져있다. 

소비자가 꺼내어 사용할동안 생산자는 멈춰있고, 생산자가 넣을동안 소비자는 멈춰있어야한다.

서로 쟁탈하면 선반은 망가질것이다.


ConcurrentHashMap

ConcurrentHashMap은 Map의 일부에만 Lock을 걸기때문에 HashTable과 synchronized Map 보다 

효율적인게 특징이다.




Runnable  :  결과값을 리턴하지 않는다 - void run()


Callable     :  결과 값을 리턴한다.  - V call()

 

Future        :  Callable의 리턴값은 실행시키지마자 얻을수있는게 아니라 미래에 얻게된다.
                      그 값을 받을수있는 인터페이스이다.



ThreadLocal  

함수안의 로컬변수는 쓰레드 마다  고유하게 가질수있는것은 알것이다. 그럼 쓰레드마다 고유의 변수를 해당 함수의 안 뿐만아니라, 클래스의 정적멤버등으로 생성하여  각각의  쓰레드가 다른곳에서 사용하고싶을땐?  그때 이것을 사용할수있다. 

ThreadLocal은 한 쓰레드에서 실행되는 코드가 동일한 객체를 사용할 수 있도록 해 주기 때문에 쓰레드와 관련된 코드에서 파라미터를 사용하지 않고 객체를 전파하기 위한 용도로 주로 사용되며, 주요 용도는 다음과 같다.

-사용자 인증정보 전파 - Spring Security에서는 ThreadLocal을 이용해서 사용자 인증 정보를 전파한다.
-트랜잭션 컨텍스트 전파 - 트랜잭션 매니저는 트랜잭션 컨텍스트를 전파하는 데 ThreadLocal을 사용한다.
-쓰레드에 안전해야 하는 데이터 보관

ThreadLocal 사용시 주의 사항

쓰레드 풀 환경에서 ThreadLocal을 사용하는 경우 ThreadLocal 변수에 보관된 데이터의 사용이 끝나면 반드시 해당 데이터를 삭제해 주어야 한다. 그렇지 않을 경우 재사용되는 쓰레드가 올바르지 않은 데이터를 참조할 수 있다.


CountDownLatch 

모든쓰레드(테스크)가 종료되면 , 호출될 필요가 있는곳에 사용된다.  관련  쓰레드들의  이벤트들을 감지 하기위해 

사용한다.


CyclicBarrier

CountDownLatch 와는 반대(?) 로 모든쓰레드가  종료가 아니라 블럭되면 , 호출될 필요가 있는곳에 사용된다.    

다른 쓰레드를 기다리기위해 사용한다고 볼수있다.

하마(HAMA) 라는 분산머신러닝에 사용되는 오픈소스가 있는데, BSP 알고리즘을 사용하는데 BSP 란 

각각의 컴퓨터가 일을 하고, 자신의 일이 끝나면 멈춰있게된다, 모든 컴퓨터들이 멈춰있게되면 ( 각각의 일을 끝마치면) 

서로 커뮤니케이션하는 구조인데,  전체 컴퓨터들의 분산락을 걸어주는것과 비슷한것이다.

분산락은 Zookeeper 라는 오픈소스를 사용한다.


Exchanger 

서로 다른 쓰레드에서 각각의 데이터 (컬렉션을 통채로도) 를 주고 받을수있게한다. 별로 쓸일 없어보인다.

public class ExchangerTest {  
 
    private static final int FULL = 5;
    private static final int COUNT = FULL * 2;
    private static final Random random = new Random();
   
    private static volatile int sum = 0;
    
    private static Exchanger<List<Integer>> exchanger =
        new Exchanger<List<Integer>>();
   
    private static CountDownLatch stopLatch =
        new CountDownLatch(2);
 
    private static List<Integer> initiallyEmptyBuffer;
    private static List<Integer> initiallyFillBuffer;
   
    private static class FillingLoop implements Runnable {
        public void run() {
            List<Integer> currentBuffer = initiallyFillBuffer;
            try {
                for (int i = 0; i < COUNT; i++) {
                    if (currentBuffer == null)
                        break;
                   
                    Integer item = random.nextInt(100);
                    System.out.println("Item Added: " + item);
                    currentBuffer.add(item);
                   
                    if (currentBuffer.size() == FULL) {
                        currentBuffer = exchanger.exchange(currentBuffer);
                    }
                }
            } catch (InterruptedException ex) {
                System.out.println("Bad exchange on filling side");
            }
            stopLatch.countDown();
        }
    }  
 
    private static class EmptyingLoop implements Runnable {
        public void run() {
            List<Integer> currentBuffer = initiallyEmptyBuffer;
            try {
                for (int i = 0; i < COUNT; i++) {
                    if (currentBuffer == null)
                        break;
 
                    if (currentBuffer.isEmpty()) {
                        currentBuffer = exchanger.exchange(currentBuffer);
                    }
                    
                    Integer item = currentBuffer.remove(0);
                    System.out.println("Item Got: " + item);
                    sum += item.intValue();
                   
                }
            } catch (InterruptedException ex) {
                System.out.println("Bad exchange on emptying side");
            }
            stopLatch.countDown();
        }
    }  
 
    public static void main(String args[]) {
        initiallyEmptyBuffer = new ArrayList<Integer>();
        initiallyFillBuffer = new ArrayList<Integer>();
   
        new Thread(new FillingLoop()).start();
        new Thread(new EmptyingLoop()).start();
   
        try {
            stopLatch.await();
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
        System.out.println("Sum of all items is.... " + sum);
    }
}


FutureTask

모든쓰레드(테스크)가 종료되면 , 호출될 필요가 있는곳에 사용된다. 

public class Preloader {

private final FutureTask<List<String>> task1 
= new FutureTask<List<String>>(new MyCallable());
private final FutureTask<List<String>> task2
= new FutureTask<List<String>>(new MyCallable());
private final FutureTask<List<String>> task3
= new FutureTask<List<String>>(new MyCallable());
private final FutureTask<List<String>> task4
= new FutureTask<List<String>>(new MyCallable());
private final FutureTask<List<String>> task5
= new FutureTask<List<String>>(new MyCallable());
ExecutorService es = Executors.newFixedThreadPool(5);
public void testGO() throws Exception{
es.submit (task1);
Thread.sleep(300);
es.submit (task2);
Thread.sleep(300);
es.submit (task3);
Thread.sleep(300);
es.submit (task4);
Thread.sleep(300);
es.submit (task5);
PrintTest(task2);
PrintTest(task5);
PrintTest(task4);
PrintTest(task3);
PrintTest(task1);
Thread.sleep(20000);
es.shutdown();
} //End Of testGo
private void PrintTest(FutureTask<List<String>> tempTask){
List<String> k = null;
try {
k = tempTask.get();
} catch (InterruptedException e) {
System.out.println("Exception : "+e.getMessage());
} catch (ExecutionException e) {
System.out.println("Exception : "+e.getMessage());
} //End Of try
for(String l : k){
System.out.println(l);
}
} //End Of printTest
    public static void main(String[] args) {

     Preloader preloader = new Preloader();
    
     try {
preloader.testGO();
} catch (Exception e) {
System.out.println(e.getMessage());
} //End Of try
    
    } //End Of main
    
} //End Of Class



public class MyCallable implements Callable<List<String>>{

@Override
public List<String> call() throws Exception {
List<String> ret = new Vector<String>();
for(int i=0; i<10; i++){
String temp = "안녕하세요 "+Thread.currentThread()+" 번 입니다.";
Thread.sleep(300);
ret.add(temp);
}
return ret;
}
}


Semaphore

import java.util.Random;
import java.util.concurrent.Semaphore;

public class SemaphoreTest {

      private static final Random rd = new Random(10000);

      static class Log {
             public static void debug(String strMessage) {
             System.out.println(Thread.currentThread().getName()  + " : " + strMessage);
            }
        }

      class SemaphoreResource extends Semaphore {

            private static final long serialVersionUID = 1L;

            public SemaphoreResource(final int permits) {
                   super(permits);
            }

           public void use() throws InterruptedException {

                  acquire(); // 세마포어 리소스 확보

                 try {
                        doUse();
                 } finally {
                       release(); // 세마포어 리소스 해제
                       Log.debug("Thread 종료 후 남은  permits: " +   this.availablePermits());
                }
             }

           protected void doUse() throws InterruptedException {

                // 임의의 프로그램을 실행하는데 거리는 가상의 시간
               int sleepTime = rd.nextInt(500);
               Thread.sleep(sleepTime); // 런타임 시간 설정
               Log.debug(" Thread 실행..................." + sleepTime);

                              /** something logic **/

              }

        }

     class MyThread extends Thread {

            private final SemaphoreResource resource;

            public MyThread(String threadName, SemaphoreResource resource) {
                   this.resource = resource;
                   this.setName(threadName);
            }

            @Override
             public void run() {
                  try {
                      resource.use();
                  } catch (InterruptedException e) {
                  } finally { }
             }

        }

     public static void main(String... s) {

          System.out.println("Test Start...");
          SemaphoreResource resource =  new SemaphoreTest().new SemaphoreResource(4);

         for (int i = 0; i < 20; i++) {
                new SemaphoreTest().new MyThread("Thread-" + i, resource) .start();
          }

      }

}




Comments