멀티쓰레드 디자인패턴의 꽃이라고 한다면 단연코 "생산자-소비자" 패턴이라고 할 수 있습니다. 그럼 Gof 의 디자인패턴의 꽃은 무엇인가? 라고 묻는다면   DI (Dependency Injection) 와 밀접한 "전략패턴" 이라고 대답하고 싶습니다. 물론 둘다 제 개인적인 생각이죠 :-) 

멀티쓰레드/서버코드를 작성할때 거의 무조건 "생산자-소비자" 패턴이 사용되기 마련이며, 다른 고차원 패턴들 (예를들면 node.js 의 기반패턴인 react 패턴) 의 기반이 되면서 동시에 멀티쓰레드 코어패턴을 포함하고 있는 , 즉  "허리" 역할을 제대로 하고 있는 패턴이라고 볼수 있기 때문에 아주 중요하다고 볼 수 있습니다. 

자 그럼 생산자-소비자 패턴이 무엇인지 살펴보도록 하죠. 이해하기 쉽게 코드를 쉽게 쉽게 간략히 만들었습니다. (예외등 빠짐)

* 제 글이 그렇듯이 폴리글랏으로 보여주며, 추가 기술들도 섞여있습니다. 


1.  싱글쓰레드상에서 생산자 - 소비자 

먼저 멀티쓰레드 말고 그냥 간단하게 코드로 살펴보자. 개발자들은 말보다는 코드가 더 이해하기 쉬우니~

class Producer {

    int id = 0

int produce(){

return nextid()

}

int nextId(){

return  id = id + 1

}

}

class Consumer{

void consume(int id){

        print ("ID : " + id)

}

}

void  test(){

     Producer  p = new Producer();

     Consumer c = new Consumer(); 

result = p.produce()

c.consume(result)

}

 1 씩 증가시키는 생산을 하는  Producer 클래스가 있고, 그것을 전달 받아 사용하는 Consumer 가 있습니다.
 즉 간단하게 생산하는 녀석 따로, 소비하는 녀석 따로 있으면 생산자-소비자 패턴입니다. 

 싱글쓰레드에서의 특징은 호출하면 바로 반응한다 입니다. produce 를 호출하면 다른짓을 하지 않고 바로 새로운 id 를 리턴해주죠.실시간 반응 (실시간 리턴은 아님) 입니다.  다만 produce 에서 먼가 오래 작업을 한다면 Consumer 객체는 오래 기다려야겠지요.

이것을 멀티쓰레드로는 어떻게 만들까요? 


2.  멀티쓰레드상에서 생산자 - 소비자 (by JAVA core) 


이 이미지는 좀 복잡한데..



     사실 매우 단순하다. Thread 1이 생산자 Thread 2가 소비자가 된다.



멀티쓰레드에서도 생산자-소비자 따로입니다만은 싱글쓰레드와 다른 이 소스의 특징은 

1.  Producer 와 Consumer 가 각각 쓰레드를 가지고있다. (내부에 루프를 가지고 있다) 
2.  전달 매개체 (보통 큐로 구현) 가 생겼다. (아래에서 Table 클래스)

입니다.  소스를 보시죠. 

public class main {

    public static void main(String[] args) {

        

    Table table = new Table(100);     

        new producerThread(table).start();

        new consumerThread(table).start();

    }

}


public class producerThread extends Thread {

    private static int id = 0; 

    Table table;

   

    public producerThread(Table table) {

        this.table = table;

    }

    public void run() {

            while (true) {

            Thread.sleep(1000);

                String packet = "No : " + nextId();

                table.put(packet);  // 큐에 추가 

            }

    }

    private static synchronized int nextId() {

        return id++;

    }

}



public class consumerThread extends Thread {

    private final Table table;

    public consumerThread(Table table) {

        this.table = table;

    }

    public void run() {

            while (true) {

            String packet = table.take();   // 큐에서 가져옴                      

                System.out.println("consumer : " + packet);

            }

    }

}


public class Table {

    private final String[] buffer;

    private int tail;  

    private int head;  

    private int count; 

    public Table(int count) {

        this.buffer = new String[count];

        this.head = 0;

        this.tail = 0;

        this.count = 0;

    }

    public synchronized void put(String packet)  {

        while (count >= buffer.length) {   // 버퍼가 가측 차면 대기!

            wait();

        }

        buffer[tail] = packet;   // 후입하라!

        tail = (tail + 1) % buffer.length;  // Circular 큐라서 tail 의 위치가 바뀜!

        count++;

        notifyAll();  // 버퍼에 먼가가 들어 갔으니 take 해도 된다고 이벤트 날림!!

    }

    public synchronized String take()  {

        while (count <= 0) {   // 버퍼에 아무것도 없으면 대기!

            wait();

        }

        String packet = buffer[head];  // 선출하라!

        head = (head + 1) % buffer.length;  // Circular 큐라서 header 의 위치가 바뀜!

        count--;

        notifyAll();  // 버퍼에서 하나를 가져갔으니 put 해도 된다고 이벤트 날림!!!

        return packet;

    }

}



코드의 중요 부분을 설명해보면 (주석과 함께 살펴보세요)

Producer 클래스는  1초 에 한번씩 table 에 id 를 집어 넣습니다. 
Consumer 클래스는 table 에서 id 를 가져와서 출력합니다.

Table 클래스는 좀 눈여겨 봐야합니다.
String 배열을 가지고 큐를 구현한 코드이며. 더불어 생산자와 소비자 클래스에서 그 배열에 동시접근을 막기위한 장치들이 있습니다. synchronized 와 wait() 인데요. 이 두가지는 매우 코어적인  방식이며 바른 이해가 필요합니다. 이것을 직접 조작하면서 멀티쓰레드 프로그래밍을 하는것에 대한 위험성이 공감대를 얻으면서, 좀 더 하이레벨에서 조작하는 방식들이 생겨납니다. 
그것을 위해 등장한것이 STM , Actor 등이 있으며, 아래에서는 2번의 큐를 훌륭한 개발자들이 미리 구현한 java.util.concurrent 를 사용합니다.


3.  멀티쓰레드상에서 생산자 - 소비자 (by JAVA Concurrent 패키지) 

위의 Table 클래스를  java.util.concurrent 패키지를 이용하여 구현해보면 


public class Table {

private final BlockingQueue<String> buffer; 

public Table(int count) {

 this.buffer = new ArrayBlockingQueue<String>(10);

}

public  void put(String packet) {

Thread.sleep(1000);

buffer.put(packet);

}

public  String take()  {

String packet = buffer.take();

return packet;

}

}

이렇게 BlockingQueue 를 사용함으로써 매우 간단해졌으며 실수의 여지를 줄였습니다.

하지만!! 이것도 동기화 객체를 사용하며, 상태변경에 대한 책임소재가 명확하지 않습니다. 결국 한층 더 동기화 문제를 줄일수있는 Actor 패턴이라는것으로 진화됩니다.   (조금있다가 설명합니다) 


4.  멀티쓰레드상에서 생산자 - 소비자 (by Python) 

* java 코드와 기능이 동일합니다. 설명은 주석으로 대신합니다.

#-*- coding: utf-8 -*-

from threading import Thread, Condition

import time

import random

 

queue = []  # 리스트입니다만 동기화가 기본적으로 됩니다. 

condition = Condition()  #  java 의 wait() 및 synchronized  혼종 

 

class ProducerThread(Thread):

    i = 0

    def run(self):

        global queue

        while True:

            condition.acquire() 

            if len(queue) == 100: // 버퍼가 가측 차면 대기!

                condition.wait()

            num = self.nextId()  // 생산 

            queue.append(num)  // 큐에 후입하라!

            condition.notify()      // 버퍼에 먼가가 들어 갔으니 take 해도 된다고 이벤트 날림!!

            condition.release()

            time.sleep(1)

 

    def nextId(self):

        self.i = self.i + 1

        return self.i

 

class ConsumerThread(Thread):

    def run(self):

        global queue

        while True:

            condition.acquire()

            if not queue:   // 버퍼가 비어있으면  대기!

                condition.wait()

            num = queue.pop(0)  // 선출하라!

            print "Consumed", num  // 소비~~~~~~

            condition.notify()  // 버퍼가 비었으니 put 해도 된다고 이벤트 날림!!

            condition.release()

            time.sleep(1)

 

 

ConsumerThread().start()  

ProducerThread().start()

* 참고로 하나의 쓰레드가 wait() 되는 순간에 condition.acquire() 는 해제됩니다. 

자. 이렇게 생산자-소비자 패턴에 대해서 알아봤는데요.
위에 3번에서 말해다시피 java.util.concurrent 를 사용하는것도 쉽지 않기 때문에 (물론 복잡도에 따라서 코어를 사용하는것이 문제점을 더 잘 파악하는 길이기도 합니다)  결국 한층 더 동기화 문제를 줄일수있는 Actor 패턴이라는것으로 진화됩니다.  


5.  Actor  

Actor 란 말 그대로 '행동자' 입니다. 능동적으로 비동기 메세지를 처리하는 녀석인데요.
능동적이라는 말은 쉽게 말해 쓰레드 하나가 할당되어 있다는 얘기입니다. 살아 숨쉬고 있죠. 
또한 비동기 적으로 메세지를 처리할수있습니다. 이건 메세지를 담아둘수있는 자신만의 큐를 가지고 있다는 얘기입니다.

즉 Actor = 객체+루프를 가진 쓰레드+큐 로 이루어진 것 이라고 보면됩니다.

이왕 Actor 가 나온김에 이것으로부터 나온 패턴들을 간단히 살펴보면 Reactor 패턴이 있습니다.

6.  Reactor 

 Node.js 의 기반패턴이라고 알려져있으며 , 쉽게 말해 Actor + 이벤트 핸들러 라고 보시면됩니다.
 우리말로 해석해보면 "반응로" 정도 될거 같은데요.  즉 Actor 안에다가 어떤 swich 문을 두고서 어떤 이벤트가 날라오면 
 그것과 연결된 어떤 핸들러를 디스패칭하라입니다.  보통 단일쓰레드로 이루어져있기때문에 하나의 핸들러가 시간을 많이 잡  아먹게 되면 전체적으로 성능이 급격히 저하됩니다.

 성능면이라든지 몇몇가지 단점이 있습니다. 이 얘기는 Node.js 도 그런 단점이 있다는 얘기겠지요. 
 물론 페라리보다 더 빠른 것들이 많이 있다고해서 페라리가 후진차가 되는것은 아닙니다. 

 다시 돌아와서 , Reactor 패턴 보다 더 나은 방식은 무엇이냐 라고 한다면 Proactor 패턴이라고 있습니다.

 7.  Proactor 

  Proactor 패턴 을 우리말로 바꾸면 음 잘 떠오르질 않네요. Reactor 랑 비교해서 생각해보면 Reactor 는 어떤 이벤트가 날라오   면 그 이벤트에 해당하는 행동을 하는것이라고 말씀드렸습니다.
  Proactor 는 먼저 행동을 디스패치하고 , 그 행동에 따른 결과가 날라오게 됩니다. 
  예를들면, Reactor 는  "너 지금 버퍼에 메세지를 담을 수 있어"   라고 메세지가 날라오면 그때서야 메세지 담는 행동을 하고
  Proactor 는 먼저 메세지에 담는 행동을 요청하면  "버퍼에 메세지가 모두 담겼어"  라는 결과 메세지가 날라오게 됩니다.
  사실 이해하기가 쉽지 않긴할텐데요. 윈도우 네트워크 프로그래밍을 해보셨다면 Select 가 react , IOCP 가 proact 라고 생각하   시면 얼추 들어 맞습니다. 



8.  Actor 패턴으로 해보는 생산자 - 소비자 (by scala) 

* 참고로 스칼라 액터는 deprecated 되었습니다. 밑에 설명할 akka 를 대신 사용해야합니다. 

* 액터를 사용한다면 , 어떤한 객체 및 값의 상태관리는 단 하나의 액터에서 전담시키는 방향으로 해야합니다. 내가 무엇인가를 수정하고 싶으면 그 해당 액터에 메세지를 전달하라는 뜻이지요. 

* 액터는 위치투명성을 갖기때문에 , 네트워크 넘어에 있는 액터와도 동일한 방식으로 사용 할 수 있게됩니다.

* 여기서는 actor 안에 Thread.sleep 으로 강제로 멈춤을 시도했지만,  액터안에서 저런식으로 흐름을 막는것은 지양해야합니다.

굳이 흐름을 막고 싶으면, 다른 전용 액터를 만들어서 그쪽 메세지를 보내고 받는 타이밍을 이용해야합니다.

import scala.actors.Reactor


object Test {

  case class Stop()

  case class Get(from: Reactor[Any])

  case class Put(x: Int)


  class UnboundedBuffer extends Reactor[Any] {

    def act() {

      react {

        case Get(from) =>

          val consumer = from

          react {

            case Put(x) =>

              consumer ! x

              act()

          }

      }

    }

  }


  class Producer(buf: UnboundedBuffer) extends Reactor[Any] {

    

    def act() {

      var i = 0

      while (i < 10) {

        i += 1

        Thread.sleep(1000)

        buf ! Put(i)

      }

    }

  }


  class Consumer(buf: UnboundedBuffer) extends Reactor[Any] {

    

    def act() {

      Thread.sleep(1000)

      buf ! Get(this)

      react {

        case res =>

            println(res)

          act()

      }

    

    }

  }


  def main(args: Array[String]) {

    val parent = new Reactor[Any] {

      def act() {

        val buffer = new UnboundedBuffer

        buffer.start()

        val producer = new Producer(buffer)

        producer.start()

        val consumer = new Consumer(buffer)

        consumer.start()

       

      }

    }

    parent.start()

  }

}


9.  Akka 로 해보는 생산자 - 소비자 (by scala) 

object HelloWorld extends App {

    val system = ActorSystem("ProConSystem")

    val con = system.actorOf(Props[ConsumerActor])

    val pro =  system.actorOf(Props(new ProducerActor(con)))

    pro ! "start"


}


class ProducerActor (con: ActorRef) extends Actor {

  

  def receive = {

    case "start" => 

      var i = 0

      

      while (i < 10){

           con ! i  

           i+=1

           Thread.sleep(1000)

      }

  }

}


class ConsumerActor extends Actor {

  def receive = {

    case id : Integer =>

        println("ID : " + id)

  

  }

}


10.  Akka 로 해보는 생산자 - 소비자 (by java) 

  public static void main(String... args) throws Exception {


        final ActorSystem actorSystem = ActorSystem.create("procon-System");

        final ActorRef actorRef = actorSystem.actorOf(Props.create(ProducerActor.class), "ProducerActor");


        actorRef.tell(new Command("START"), null);

        Thread.sleep(10000);

        System.out.println("Actor System Shutdown Starting...");

        actorSystem.shutdown();

    }


public class ProducerActor extends UntypedActor {


    private final ActorRef conActor;

    public ProducerActor() {

        conActor = getContext().actorOf(Props.create(ConsumerActor.class), "ConsumerActor");

    }


    @Override

    public void onReceive(Object msg) throws Exception {


        if (msg instanceof Command) {

            final String data = ((Command) msg).getData();

            if (data.equals("START")){

            int i = 0;

           

            while (i < 10){

            conActor.tell(i, null);

            System.out.println("send: "  + i );

            i++;

            Thread.sleep(1000);

            }

           

            }


        } 

    }

}


public class ConsumerActor extends UntypedActor {


    @Override

    public void onReceive(Object msg) {

     System.out.println("Received Event: " + msg);

    }



}


12.  Vertx 로 해보는 생산자 - 소비자 (by java) 

public class VertxApp {


    public static void main(String[] args) {

        Vertx vertx = Vertx.vertx();

        vertx.deployVerticle(new ConsumerVerticle());

        vertx.deployVerticle(new ProducerVertcle());

    }


}


public class ProducerVertcle extends AbstractVerticle {

    

private static int id = 0; 

    

@Override

    public void start() {

        System.out.println("ProducerVertcle started!");

        while (true) {

            String packet = "No : " + nextId();

            vertx.eventBus().send("anAddress", packet);

        }

       

    }


    private static synchronized int nextId() {

        return id++;

    }


}


public class ConsumerVerticle extends AbstractVerticle {


    public void start() {

       

    vertx.eventBus().consumer("anAddress", message -> {

            System.out.println(" received message: " + message.body());

        });

    }

    

}


13. Akka 와 Vert.x 의 선택 이슈 

왜 우리는 클라우드 시스템에  Akka 를 사용하게 되었나?   참고 


http://brad2014.tistory.com/215  이전 포스팅에서 언급된 Actor 모델 과 ActiveObject  패턴의 차이에
대한 논문이 있다. 
http://members.unine.ch/anita.sobe/res/RR-I-AS-2014.06.1.pdf


간단 요약 


역사


-  멀티코어 프로그래밍의 시대가 왔고 장점에 대한 소개 
-  멀티쓰레드 프로그래밍의 어려움 소개 ( 데드락, 레이스 컨디션, 일관성깨짐등) 
-  그런 환경에서 좀더 안전하고 빠르게 개발하기위하여 Actor 모델같은 동시성 패턴이 생겨나다.
-  1973년 에 Actor 모델이 소개되었고, 인공지능의 멀티 에이전트 디자인에 의해 발전되어졌다.
-  Actor 모델은 그후  Actove Object 패턴같은 다양한 패턴들에 영향을 주었다.


차이

-   구조적으로 두 패턴모두 스케쥴러와 큐를 가지고있다.
-   구조적으로 ActiveObject 는 프록시를 활용하며 , Future 객체를 사용한다.
-   구조적으로 Actor 패턴은  ActiveObject 에 비해서 자유성이 높다. 
-   메세징/통신측면에서 ActiveObject 패턴은  미리 정해진 함수호출을 통해서 잡을 전달한다.  
-   메세징/통신측면에서 Actor 패턴은 문자열을 보내며, 그 문자열 포맷은 매우 자유롭다.
    따라서 Actor 패턴에서 각각의 Actor 의 OnRecvier 구현은 매우 유연하다.  

-   Actor 모델은 더욱 확장하기 쉬운데 어떤 액터나 프록시가 될수있고 다른 액터와 분산해서
    작업부하를 가질수있다.

-  대부분의  AOM 구현들은 정해진 서번트와 함께 쓰레드풀에 의존적이다.  
-  양쪽 모델모두 가능한 이미 존재하는 Actor 나 ActiveObject 를 재사용한다는걸 알아둬라. 
-  보통 Actor 모델은 큐가 제한이 없으며, ActiveObject 는 큐의 크기에 제한이 있다. 



다양한 패턴들  


object :   객체로 호출하면 바로 반응한다. 

actor  :   능동적인 객체 ( 즉 자신의 쓰레드가 있고 큐를 가지고있다 )  호출하면 바로 응답하지 않는다. 

reactor : actor 에 추가적으로 해당 이벤트에 대한 핸들러가 매핑되어서 디스패치하는 구조. (selector 느낌)

proactor :  행위를 actor 에 넘겨서 그 행위에 대한 결과를 받은 구조.  ( IOCP 느낌)

activeobject : 행위할수 있는 actor 에게 자신의 요청 사항을 넘겨준후에 future 객체를 이용하여 결과 파악을 함. 



Actor 모델의 기본을 짚어보고  Akka 에 예제를 짧막하게 살펴본다. (굉장히 두서없는 글이 될것이다) 
Actor 모델이 굉장히 유명한데, 개인적으로 ActiveObject 패턴으로 알고있었다.
정확히 둘 간에 어떤  차이점이 있는지는  모르겠다.  패턴과 모델 ??  

내가 읽은 어느 책에서는 ActiveObject  패턴이 Actor 과 같다고 나오고, POSA2 편을 보면 (Pattern-Oriented Software Architecture 2 : http://www.cs.wustl.edu/~schmidt/POSA/POSA2에서는 ActiveObject 패턴과 Reactor / Proactor 패턴이 나온다.  분명히 다르긴 하다. 


정리


object :   객체로 호출하면 바로 반응한다. 

actor  :   능동적인 객체 ( 즉 자신의 쓰레드가 있고 큐를 가지고있다 )  호출하면 바로 응답하지 않는다. 

reactor : actor 에 추가적으로 해당 이벤트에 대한 핸들러가 매핑되어서 디스패치하는 구조. (selector 느낌)

proactor :  행위를 actor 에 넘겨서 그 행위에 대한 결과를 받은 구조.  ( IOCP 느낌)

activeobject : 행위할수 있는 actor 에게 자신의 요청 사항을 넘겨준후에 future 객체를 이용하여 결과 파악을 함. 


이번글을 쓰는 계기로 차이점에 관련된 읽을거리를 검색해봤다 ;;

http://www.carlgibbs.co.uk/blog/?p=237

http://members.unine.ch/anita.sobe/res/RR-I-AS-2014.06.1.pdf

Active Object pattern: http://www.dre.vanderbilt.edu/~schmidt/PDF/Active-Objects.pdf

Actor Model: http://en.wikipedia.org/wiki/Actor_model

솔직히 나는 저 둘의 관계에 대해  깊숙히 이해하고 싶은 생각이 없다. (지금 당장은 말이다) 
따라서 깊은 이해는 없다라는걸 미리 말한다. 대략 요약하면 


차이

-   구조적으로 두 패턴모두 스케쥴러와 큐를 가지고있다.
-   구조적으로 ActiveObject 는 프록시를 활용하며 , Future 객체를 사용한다.
-   구조적으로 Actor 패턴은  ActiveObject 에 비해서 자유성이 높다. 
-   메세징/통신측면에서 ActiveObject 패턴은  미리 정해진 함수호출을 통해서 잡을 전달한다.  
-   메세징/통신측면에서 Actor 패턴은 문자열을 보내며, 그 문자열 포맷은 매우 자유롭다.
    따라서 Actor 패턴에서 각각의 Actor 의 OnRecvier 구현은 매우 유연하다.  

-   Actor 모델은 더욱 확장하기 쉬운데 어떤 액터나 프록시가 될수있고 다른 액터와 분산해서 작업부하를 가질       수있다.

-  대부분의  AOM 구현들은 정해진 서번트와 함께 쓰레드풀에 의존적이다.  
-  양쪽 모델모두 가능한 이미 존재하는 Actor 나 ActiveObject 를 재사용한다는걸 알아둬라. 
-  보통 Actor 모델은 큐가 제한이 없으며, ActiveObject 는 큐의 크기에 제한이 있다. 


자 Active Object 패턴은 한마디로 말하면

"비동기 메세지를 처리할수있는 능동적 객체" 이다.

주목해야할 단어가 2개있다. 


첫째, "비동기" 

둘째, "능동적" 


비동기란 무엇인가??  

내가 어떤일을 해야하는데 내가 하기는 싫고 , 다른 녀석한테 시키고 싶을때 저놈은 게다가 그 일에는 전문가다. 내가 일을  

- 다른녀석한테 슬쩍 넘기고 , 나는 내일을 하는게 비동기다. (논블럭이라고도한다. 미묘한 차이가 있다)

- 다른녀석한테 넘기고 , 그 녀석이 일을 마칠때까지 기다리고 있는건 동기이다.(블럭킹 되었다고한다)


비동기메세지란 결국 메세지를 다른녀석한테 던저주고 나서 , 바로 나는 내일을 하는것을 말한다. 

* 윈도우즈의 IOCP에서 말하는 비동기 입출력(Overlapped IO) 은 조금 또 다른 느낌이다..(병렬의 느낌) 


그럼 능동적은 무엇인가?? 

기술적으로 간단하게 말하면, 나도 쓰레드를 가지고있고, 다른녀석도 자신의 쓰레드 혹은 프로세스를 가지고있는걸 말한다. 둘이 같은 쓰레드를 공유하는게 아니라는 얘기이다.


결국 위에서 말한 "나" 와  "다른 녀석" 은 각각 자신만의 "쓰레드" 를 가지고있고, 자신만의 "논블록킹" 큐를 가지고있게된다. 

따라서 어떤 일을 상대방한테 넘길때 메세지를 상대방의 큐에 넣고 , 돌아오면 "비동기" 가 되는것이고 

상대방은 스스로 쓰레드에서  while 을 돌면서  자신의 큐를 확인해서 일거리를 가지고 옮으로 "능동적"이다.

전체 맥락이 이해됬는가??

이 능동형 객체가 Akka에서 Actor 이며 , Actor 들은 자신만의 큐와 스케쥴러를  가지고있을것이다.

근데 메세지를 객체로?? 메세지가 명령인데  명령을 객체화 해서 보내는것은 알다시피 Command 패턴이다. 그리고  Future 패턴이나  큐에 잡을 던진다는것에서  producer-consumer  패턴을 포함한다.

패턴은 조금씩 서로와 연관된다.  

클라이언트측에서 어떤 “메소드 실행” 을  클라이언트와는 별개의 쓰레드 혹은 다른 컴퓨터의 큐에 던저서 그쪽에서실행되게 하자!! 값은 나중에 받자!! RPC 랑도 연관되네? Vert.x/ RabbitMQ / ActiveMQ 랑도 연관되네? 

(하둡RPC : http://brad2014.tistory.com/175)

근데 객체복사를 꽤 하니깐 작은규모에선 오버헤드가 좀 있고 이런 사소한것보단 저런 나름 복잡한 패턴을 실제 구현하는 피로감이..(그래서 akka  라이브러리가 나옴)

(Akka 는 비동기적으로 업무처리를 분담한다는것 이외에도 무상태를 지향하므로 , 데드락이나 레이스컨디션등 일반적인 객체지향언어의 멀티쓰레딩 코딩에서 나타나는 문제에 대한 보완을 한다. 물론 객체를 메세지로 보낼때 객체의 상태를 바꿀수있게 코딩한다면 의미없어 지겠지만) 



위의 ActiveObject UML 을 살펴보면  

1. 클라이언트가  ActiveObject 의 프록시로  메세지를 전달한다 ( 함수를 호출한다. 명령을 한다.)

2. ActiveOjbect 는 스케쥴러를 가지고 있다. ( 이게 "능동형" 라는 말인데, 쓰레드가  while 을 돈다는 말이다)

3. ActiveObject 는 MessageQueue 를 가지고있다. ( 이게 비동기를 위한 논블럭킹 큐이다) 

4. 클라이언트는 ActiveOjbect 를 통해 메세지를 만들고 메세지큐에 넣은다음에 퓨처 객체를 받고 자기할일함

5. 스케쥴러는 메세지큐에서 하나꺼내서 서번트를 통해서 작업을 하고, 퓨처객체에 완료 알림을 한다

6. 클라이언트는 퓨처객체에서 get 을 해서 실제 객체 (원하는 작업의 결과) 를 얻는다. 


첫번째 그림을 이해했으면 이것도 마찬가지라는걸 알수있다. Akka 의 Actor 는 다른 Actor의 메일박스에 메세지를 던지고 자기할일하고, QA 라는 Actor 는 디스패처를 통해서 일감을 가지와서 실행.

   

  여러개의 Actor 가 서로 다른 Actor 들에게 동시에 메세지(할일) 을 넘겨줄수도 있겠지..

Akka ( http://akka.io/ ) 에서 Actor   란 ?  (홈페이지 발췌) 

Actors

액터는 매우 가벼운 동시성  엔터티들이다. 이놈들은 메세지들을 event-driven receive loop  를 이용하여 비동기적으로 실행한다.  메세지에 대응되는 패턴매칭은 액터의 행동을 나타내는 굉장히 편리한 방법이며 , 추상레벨을 높혀서 Akka 를 가져다가 사용하는 개발자들이  분산/병렬  코드를 작성하기 굉장히 쉽게 해준다.

You focus on workflow—how the messages flow in the system—instead of low level primitives like threads, locks and socket IO. Learn More.

Akka with ActiveObject 패턴 

// 프록시와 서번트의 공통 인터페이스 
public interface  Printer {
  Future<String> printNonBlocking ( String msg) ;
  Option<String> print Blocking ( String msg) ;
}

// 공통인터페이스 기반으로 서번트를 만든다.
class PrinterImpl implements Printer {
  private String name ;
    public void PrinterImpl ( String name ) {
    this.name = name ;
  }
  public Futures<String>printNonBlocking ( String msg ) {
    System.out.println (msg) ;
    return Futures.successful(”Worked”);
  }
  public Option<String> print Blocking (String msg) {
    System.out.println (msg) ;
    return Option.some(”Worked”) ;
  }
} 


public class Main{
  public static void main (String [] args ) {
    ActorSystem system = ActorSystem.create (”testSystem ” ) ;

    // typedActor 가 사용되었다.
    // 첫번째 인자는 프록시에 사용될 인터페이스이고, 두번째 인자는 서번트가 들어간다 
    PrinterhelloWorld = TypedActor.get (system) .typedActorOf (
    new TypedProps<PrinterImpl>( Printer.class , PrinterImpl.class)) ;

    // 비동기호출을 하여 Futre 객체를 즉시 받는다.
    Future<String> resultNB = helloWorld.printNonBlocking ( ”Hello World!) ;
    Option<String> result  B = helloWorld.printBlocking (”Hello World !);
  }
} 


Akka with JAVA   

import akka.actor.UntypedActor;

public class PrintActor extends UntypedActor {

    @Override

    public void onReceive(Object message) throws Exception {

        Thread.sleep(500); // 테스트를 위해 0.5초 sleep

        System.out.println(message);

    }

}


ActorRef actor = Actors.actorOf(PrintActor.class);

actor.start();

// actor를 이용해서 액터에 메시지를 전달

ActorRef의 start() 메서드는 액터를 시작하며, 액터가 시작된 이후부터 액터에 메시지를 전달할 수 있게 된다.

액터에 메시지 전달하기

Actors.actorOf()를 이용해서 액터를 생성했다면, 이후 ActorRef가 제공하는 메서드를 이용해서 액터에 메시지를 전달할 수 있다. 

다음의 세 가지 방법으로 액터에 메시지를 전달할 수 있다.

1.Fire-And-Forget: 메시지를 전달하고 메시지에 대한 응답을 기다리지 않는다. 병행 및 확장에 적합한 메시지 전달 방식이다.

2.Send-And-Receive-Eventually: 메시지를 전달하고 응답을 받는다. 응답을 받을 때 까지 블록킹된다.

3.Send-And-Receive-Future: 메시지를 전달하고 응답을 받기 위한 Future를 리턴한다.


ActorRef actor = Actors.actorOf(PrintActor.class);

actor.start();

actor.sendOneWay("받아라");  //Fire-And-Forget 예  

actor.sendOneWay("받아라2");

actor.sendOneWay("받아라3");

System.out.println("비동기로 실행");


sendRequestReply() 메서드를 이용한 Send-And-Receive-Eventually 방식 메시지 전달

ActorRef.sendRequestReply() 메서드는 액터에 메시지를 전달하고, 그 메시지에 대한 응답이 올 때 까지 대기하고 싶을 때 사용된다. 액터 구현 클래스는 getContext().replyUnsafe() 메서드를 이용해서 메시지에 대해 응답할 수 있는데, ActorRef.sendRequestReply() 메서드는 이 응답을 리턴하게 된다. 예를 들어, 다음과 같이 메시지에 대해 응답하는 액터가 있다고 하자.

public class PingActor extends UntypedActor {

    @Override

    public void onReceive(Object message) throws Exception {

        getContext().replyUnsafe("응답: "+ message); // 메시지 sender에 응답

    }

}

이 경우 다음과 같이 sendRequestReply() 메서드를 이용함으로써 액터에 전달한 메시지에 대한 응답이 도착할 때 까지 대기할 수 있다.

ActorRef actor = Actors.actorOf(PingActor.class);

actor.start();

Object res = actor.sendRequestReply("헬로우"); // 액터로부터 응답이 도착할 때 까지 대기


sendRequestReplyFuture() 메서드를 이용한 Send-And-Receive-Future 방식 메시지 전달


sendRequestReplyFuture() 메서드는 메시지를 전달한 뒤 응답을 받기 위한 Future를 리턴한다. Future는 자바가 제공하는 Future가 아닌 Akka가 제공하는 akka.dispatch.Future 타입이다. Future는 주로 다음과 같은 형식으로 주로 사용된다.

Future future = actor.sendRequestReplyFuture("하이");

future.await(); // 응답을 대기. 대기 시간을 초과하면 예외 발생

..

..

어떤작업..

..

..

if (future.isCompleted()) { // 완료되었다면

    Option resultOption = future.result(); // 응답 구함

    if (resultOption.isDefined()) { // 응답 데이터가 있다면,

        Object result = resultOption.get(); // 응답 데이터 구함

        System.out.println(result);

    }

}


sendRequestReplyFuture()가 리턴한 Future의 await() 메서드는 시간이 초과될 때 까지 대기한다. 시간이 초과되기 전에 응답이 도착하면 다음으로 넘어가고, 시간이 초과되면 ActorTimeoutException 예외를 발생시킨다.



akka 홈페이지 발췌) 

  1. public class Greeting implements Serializable {
  2. public final String who;
  3. public Greeting(String who) { this.who = who; }
  4. }
  5.  
  6. public class GreetingActor extends UntypedActor {
  7. LoggingAdapter log = Logging.getLogger(getContext().system(), this);
  8.  
  9. public void onReceive(Object message) throws Exception {
  10. if (message instanceof Greeting)
  11. log.info("Hello " + ((Greeting) message).who);
  12. }
  13. }
  14.  
  15. ActorSystem system = ActorSystem.create("MySystem");
  16. ActorRef greeter = system.actorOf(Props.create(GreetingActor.class), "greeter");
  17. greeter.tell(new Greeting("Charlie Parker"), ActorRef.noSender());



Akka with Scala

akka 홈페이지 발췌) 

  1. case class Greeting(who: String)
  2.  
  3. class GreetingActor extends Actor with ActorLogging {
  4. def receive = {
  5. case Greeting(who) log.info("Hello " + who)
  6. }
  7. }
  8.  
  9. val system = ActorSystem("MySystem")
  10. val greeter = system.actorOf(Props[GreetingActor], name = "greeter")
  11. greeter ! Greeting("Charlie Parker")


글을 쓰고나니 좀 웃음이 나는데 양심고백을 하자면  나는 요즘도 멀티쓰레드 프로그래밍을 할때 바로 쓰레드를 만들거나 자바 executors  서비스를 이용하는데 만들고나서 한참을 데드락이 있을까봐 코드를 째려보거나 가끔은 설마 하며 운에 맞기는 코딩을 한다. 아예 akka 는 생각안한다. ;;  작은 시스템에서도 충분히 아니 강력히 써야 한다고 생각은 하는데 실천이 부족했던것같다. 이 글을 계기로  당장 개발해야 하는 수요조절기능을 위한 스케쥴러에 첫삽을 떠보려한다.


처리율(throughput)   임백준님의 Akka 시작하기에서 발췌 

아카를 이용한 리팩토링을 끝마쳤을 때, 똑같은 컴퓨터 위에서 전과 동일한 몬테 카를로 시나리오를 수행하는데 걸리는 시간이 6시간에서 2시간으로 단축되었다. 66%의 시간이 절약된 것이다. 결과를 확인한 사람들은 깜짝 놀랐다. 단순히 자바 스레드에서 아카로 라이브러리를 바꾸었을 뿐인데 그렇게 엄청난 차이가 있을 수 있냐며 고개를 갸웃거렸다. 물론 이런 차이를 일반화할 수는 없다. 이런 결과 하나를 가지고 아카가 자바 스레 드보다 3배 빠르다고 말하는 어리석은 사람은 없을 것이다. 아카도 내부적으로 자 1 아카에 대하여 - 017 바 스레드를 사용하기 때문에 그런 비교 자체가 성립하지 않는다. 하지만 일반적 인 차원에서 짚고 넘어갈만한 부분도 있다. 이렇게 커다란 차이가 어디에서 비롯 되었는지 이해하려면 우선 암달의 법칙Amdahl’s law을 생각해볼 필요가 있다. 암달의 법칙은 이렇다. “멀티코어를 사용하는 프로그램의 속도는 프로그램 내부에 존재하는 순차적sequential 부분이 사용하는 시간에 의해서 제한된다.” Thread나 Task를 만들어서 ExecutorService에게 제출하는 식으로 동시성 코드를 작성하면 여러 개의 스레드가 동시에 작업을 수행한다. 하지만 프로그램 안에는 Thread나 Task가 포함하지 않는 코드가 존재한다. 여러 개의 스레드가 동시에 작업을 수행하더라도 synchronized 블록이나 데이터베이스, 네트워크 API 호출 등을 만날 때 다른 스레드와 나란히 줄을 서서 순차적으로 작업을 수행 해야 하는 경우도 있다. 암달의 법칙은 프로그램이 낼 수 있는 속도의 상한이 이런 순차적 코드가 사용하는 시간에 의해서 제한된다고 말하는 것이다. 이러한 순차적 코드의 또 다른 이름은 블로킹blocking 콜이다. 문제는 스레드 자체 가 아니라 스레드를 사용하면서 자기도 모르게 만들어내는 블로킹 콜이다. 조금 과장해서 말하자면 자바 개발자가 스레드를 이용해서 만들어내는 ‘동시성’ 코드는 일종의 신기루다. 사실은 코드 곳곳에 존재하는 블로킹 콜, 순차적 코드 때문에 전 체적인 프로그램의 처리율은 이미 상한이 정해져 있지만 여러 개의 스레드가 ‘동 시에’ 동작한다는 사실로부터 위안을 받을 뿐이다.


1편 언어에서 강력함 과 대중성  그리고 스칼라 

이글은 스칼라에 대한 전문적인글이 아니며 ,  스칼라의 모든부분을 말하는글은 아닙니다.
스칼라의 함수자(Functor) , 컬렉션 및 유틸리티 에 대해서 한정되있으며,  
먼저 스칼라말고 다른 여러가지 언어들에 대해서 말하고도 있습니다. 제목에 일반적이라고 붙힌이유는 , 스칼라의 다른 기능들 , 소위 Active Object &  Actor Pattern 기반의 동시성 구현라이브러리로 알려진 아카라든지,콤비네이터를 이용한 인터프리터 개발같은것들을 대중적이라고 보기엔 무리라고 판단하였기에 그런것들을 제외한 극소수의 내용들로 이루어져있기때문입니다. 그리고 아래 글에서 예시로 보여지는 (슈도)코드는 문법적으로 정확치 않은 예 이며, 글에 쓰여진 모든 지식은 저의 것이아니라 다른 사람으로부터 나온것입니다. (저도 틈틈히 공부중이며,  이번 사태를 계기로 수준높은 글들이 앞으로 많이 나올것을 기대합니다.)

강력함 과 대중적이라는것은 전혀 별개의 말입니다. 
강력하다고 해서 대중적으로 선택받지 못함은 역사를 통해 다들 잘 알고있을터입니다. 


본글에서는 단지 간단한 예제들을 나열함으로서  글을 읽는 여러분들이 알아서 생각하게끔 구성하였습니다.  
자 아래에는 어떤 목록에서 원하는 값을 찾는 과정에 대한  코드입니다.

예제1 in C++ ) 


string guy = null;

for(  Groups::iterator i = Groups.iterator();  i != Groups.end() ; ++i){
	
	player p = *i;
	
	if(p.id == 1){
		guy =  p.name;
	}
}

return guy;

위의 코드는 그룹에서 ID 1 번 선수의 이름을 찾아서 리턴해주는 로직입니다. 보는바와 같이 길게 늘어써져 있습니다.   코드를 읽어야지만 멀하자는건지 알수있습니다.

아래의 코드를 보면 지원되는 함수 util 을 이용하여 조금 짧게 이루어져있습니다. 

예제2 in JAVA )


player = Groups.findbyId(1);
if( player ){
	return player.name;
}
elsle{
	return null;
}

findbyId 함수를 이용하여  보다 간략하게 코딩하였음을 알수있습니다. 멀하자는건지 금방 알수있습니다.
 저 코드를 짜려면 findbyId 라는 함수를 알고있어야겠지요. 

에제3  in Haskell )   * 헤스켈 ( 순수 함수형 언어)  


fmap (getName)  (findId 1) 

( findId 1  <-- 이 부분을 functor 라고 합니다.)  findId 1 에서  값을 찾으면  getName 이 호출되어 , 원하는 선수 이름을 얻을수있으며 값을 못찾으면 Nothing 이 되면서, 아무것도 하질 않습니다. 
1줄로 줄어들었으며, Null 참조 및  잘못된 배열참조에 대한 불안감으로부터 해방되었습니다.
가독성 좋아졌는지는 잘모르겠고 , 코드가 강건해졌다고 볼수있습니다. 

스칼라를 사용하면, 추상정도를 높히는 코드를 C++,자바보다 더 자유롭게 표현할수 있게됩니다. 
추상정도를 높히면 강력합니다! 더 정확히 말하면 강력하게 변신하기 쉬워집니다. 

자 다들 저렇게 쓰도록 바꾸자!!! 고요 ?  저는 잘모르겠습니다. 독자분들마다 생각이 다를 것입니다.
대중들은 단순히 람다표현식 혹은 함수자 유틸의 사용으로 라인수 줄어든것만으로 굳이 공부하거나 바꿀 필요성을 느끼지 못할거 같습니다. 대중이 판단할때  현재 '진짜'  강력해야 선택받습니다.


제가 그렇게 생각하는  이유를 전에 쓴 글에서 가져오고 제 경험을 말씀드리면 
 c++ 은 10년전부터 다른언어의 특성들을  차용하며 소위 Modern C++ 이라는 이름으로 나름 발전을 이루어왔습니다. 주로 c++ 언어특성으로 발전을 하여온것은 아니고 STL 이름의 라이브러리등을  통하여 발전하였습니다. 언어 창조자의 철학이, 먼가가 필요하면 굳이 언어를 변경하는것보다는 라이브러리 차원으로 제공하는게 낫다라는 입장입니다. 아마도 자바 또한 그러한 보수적인 관점에서 신기능의 추가 (제너릭,병렬, 함수형등) 에 조심성을 가지고 발전한거겠지요. 

c++ 에 다양한 함수자유틸들, 템플릿특화,  일반화함수자, 타입추론 , 템플릿메타프로그래밍등 많은 개념들이 10년도 훨씬 전에 생겨났지만, 대중은 그닥 관심이 없습니다. (물론 몇몇 게임등 쓰이는곳도 있습니다)
알필요도 없지요. 안쓰고 개발해도 충분했으니깐요.  라이브러리 제작자한테나 필수일까...

실제 사례로, 제가 이전프로젝트에서의 c++ 코드라인이 300백만줄이 넘고, 10년동안  8백만라인이상의 코드를 접해왔는데  저런것을 쓴 사례가 거의 없습니다.
너와 니네팀이 무식해서 그렇다고요? 네 반은 맞습니다. 대부분은 비판하는 분과 다르게 저 수준입니다.
(반은 C++ 의 함수형스타일이 그닥 어렵게 공부해서 적용할 필요성이 없었습니다.)
 
정말 간단한  C++ 예를 이전 글에서 가져와서 보여 들어드리자면 

bool is_cool (const Thing& x) { ... }
find_if(begin, end, not1(ptr_fun(is_cool)));

위의 예제는  어떤 컬렉션을 순회하며 , 쿨하지 않은것을 찾아서 리턴해주는 표준함수,부정자 및 바인더라고불리는 것들입니다.  굉장히 단순해 보여서 많이 쓰일거 같지요?  저는 적극적으로 사용했습니다만, 대부분 저런식의 코딩을 하지 않습니다. 그냥 대부분 for 문으로 순회하면서 내용에 비지니스 로직 적습니다. not1 이 부담스러워요, ptr_fun 이 부담스럽습니다. 대중화에 실패했습니다.  
먼가 부족했습니다.
덜 강력했습니다. 먼가 조금 아쉽습니다..
저거에 openMP나 PPL  같은 병렬을 쉽게 해주는 수준까지 지원됬다면 , 더 나아가 GPGPU 까지 흠흠...

스칼라 혹은 함수형패러다임언어가  대중화에 성공 할수있을까요?  
단순히 케이스클래스, 람다, 함수형들을 써서 간단,간편,쉽게하자라는 이유라면  No 라고 생각합니다.
대신 높은 추상화를 가진 표현을 통해, 응용개발자들이 신경쓰지 않아도 라이브러리혹은 언어내부에서의 업그레이드로 손쉽게 강력함을 얻을수있다는 보장이 되는것에 공감대를 얻게된다면  Yes 일거라고 봅니다. (자바,C++ 도 저런측면에서 강력함을 어필하기 위해 노력하고있습니다.  치열한 싸움이 예상됩니다. )


자 이제 다음 예제로 넘어가 볼까요? 

예제1 in SQL )

select * from player  where id = 1  

우와 엄청 간단합니다. 강력합니다. 실수할 여지가 별로 없습니다. 추상의 끝판왕입니다. 
SQL 은 대중적으로 성공했습니다. ^^

저것을 풀어쓰면 

예제 in JAVA)


public Table select (Selector where){

	Table resultTable = new ConcreteTable(null, columnNames.clone()};
	Results currentRow = (Results) rows();
	Cursor[] envelop = new Cursor[] { currentRow };

	while(currentRow.advance(){
		if(where.approve(envelope))
			resultTable.insert( currentRow.cloneRow());
	}
	
	return new UnmodifiableTable(resultTable);
}

더 풀어쓰면

예제 in C )

struct table {
	....
}

table t [][];
              ....


for(int i = 0 ; i <  rows ; i++){
	for( int j = 0 ; j < cols; j++){
		...
	}
}

한 몇백라인되겠지요. 
SQL 은 성공했습니다. !!!


자 이제 JQuery  를 보겠습니다.

$("div")     모든 div 태그를 가져오는것을 한방에 합니다.
document.getElementByTag("div")      조금 길어졌네요.

$("div.reply")   모든  div 중에서 reply 클래스들만 가져옵니다.
JQuery 안쓰면 ?  헬입니다.

$("div.reply:hidden").show()    우와 함수까지 한방에 해결입니다.

다른 값을 감싸고 한방에 해결해줍니다. (모나드라는 특성의 일부분을  가지고 있습니다)
JQuery 성공했습니다.!!!


자 이제 마지막으로 스칼라 (위에 썼듯이 스칼라의 일부분) 를 보겠습니다. 다양한 스칼라 예제를 살펴보고
과연 이것도 성공할수있을지 실제 코딩에서 사용할지...여러분이 알아서 생각하세요.
여러분이 선택하는게 정답입니다.  대중이 선택한것이 대부분 정답이니깐요. ( 물론 이부분은 상황에 따라서 시각이 다를수있겠습니다) 


+ Recent posts