관리 메뉴

HAMA 블로그

생산자-소비자 패턴 과 액터 본문

디자인패턴

생산자-소비자 패턴 과 액터

[하마] 이승현 (wowlsh93@gmail.com) 2016. 6. 1. 14:18

멀티쓰레드 디자인패턴의 꽃이라고 한다면 단연코 "생산자-소비자" 패턴이라고 할 수 있습니다. 그럼 Gof 의 디자인패턴의 꽃은 무엇인가? 라고 묻는다면   DI (Dependency Injection) 와 밀접한 "전략패턴" 이라고 대답하고 싶습니다. 물론 둘다 제 개인적인 생각이죠 :-) 

멀티쓰레드/서버코드를 작성할때 거의 무조건 "생산자-소비자" 패턴이 사용되기 마련이며, 다른 고차원 패턴들 (예를들면 node.js 의 기반패턴인 react 패턴) 의 기반이 되면서 동시에 멀티쓰레드 코어패턴을 포함하고 있는 , 즉  "허리" 역할을 제대로 하고 있는 패턴이라고 볼수 있기 때문에 아주 중요하다고 볼 수 있습니다. 

자 그럼 생산자-소비자 패턴이 무엇인지 살펴보도록 하죠. 이해하기 쉽게 코드를 쉽게 쉽게 간략히 만들었습니다. (예외등 빠짐)

* 제 글이 그렇듯이 폴리글랏으로 보여주며, 추가 기술들도 섞여있습니다. 


1.  싱글쓰레드상에서 생산자 - 소비자 

먼저 멀티쓰레드 말고 그냥 간단하게 코드로 살펴보자. 개발자들은 말보다는 코드가 더 이해하기 쉬우니~

class Producer {

    int id = 0

int produce(){

return nextid()

}

int nextId(){

return  id = id + 1

}

}

class Consumer{

void consume(int id){

        print ("ID : " + id)

}

}

void  test(){

     Producer  p = new Producer();

     Consumer c = new Consumer(); 

result = p.produce()

c.consume(result)

}

 1 씩 증가시키는 생산을 하는  Producer 클래스가 있고, 그것을 전달 받아 사용하는 Consumer 가 있습니다.
 즉 간단하게 생산하는 녀석 따로, 소비하는 녀석 따로 있으면 생산자-소비자 패턴입니다. 

 싱글쓰레드에서의 특징은 호출하면 바로 반응한다 입니다. produce 를 호출하면 다른짓을 하지 않고 바로 새로운 id 를 리턴해주죠.실시간 반응 (실시간 리턴은 아님) 입니다.  다만 produce 에서 먼가 오래 작업을 한다면 Consumer 객체는 오래 기다려야겠지요.

이것을 멀티쓰레드로는 어떻게 만들까요? 


2.  멀티쓰레드상에서 생산자 - 소비자 (by JAVA core) 


이 이미지는 좀 복잡한데..



     사실 매우 단순하다. Thread 1이 생산자 Thread 2가 소비자가 된다.



멀티쓰레드에서도 생산자-소비자 따로입니다만은 싱글쓰레드와 다른 이 소스의 특징은 

1.  Producer 와 Consumer 가 각각 쓰레드를 가지고있다. (내부에 루프를 가지고 있다) 
2.  전달 매개체 (보통 큐로 구현) 가 생겼다. (아래에서 Table 클래스)

입니다.  소스를 보시죠. 

public class main {

    public static void main(String[] args) {

        

    Table table = new Table(100);     

        new producerThread(table).start();

        new consumerThread(table).start();

    }

}


public class producerThread extends Thread {

    private static int id = 0; 

    Table table;

   

    public producerThread(Table table) {

        this.table = table;

    }

    public void run() {

            while (true) {

            Thread.sleep(1000);

                String packet = "No : " + nextId();

                table.put(packet);  // 큐에 추가 

            }

    }

    private static synchronized int nextId() {

        return id++;

    }

}



public class consumerThread extends Thread {

    private final Table table;

    public consumerThread(Table table) {

        this.table = table;

    }

    public void run() {

            while (true) {

            String packet = table.take();   // 큐에서 가져옴                      

                System.out.println("consumer : " + packet);

            }

    }

}


public class Table {

    private final String[] buffer;

    private int tail;  

    private int head;  

    private int count; 

    public Table(int count) {

        this.buffer = new String[count];

        this.head = 0;

        this.tail = 0;

        this.count = 0;

    }

    public synchronized void put(String packet)  {

        while (count >= buffer.length) {   // 버퍼가 가측 차면 대기!

            wait();

        }

        buffer[tail] = packet;   // 후입하라!

        tail = (tail + 1) % buffer.length;  // Circular 큐라서 tail 의 위치가 바뀜!

        count++;

        notifyAll();  // 버퍼에 먼가가 들어 갔으니 take 해도 된다고 이벤트 날림!!

    }

    public synchronized String take()  {

        while (count <= 0) {   // 버퍼에 아무것도 없으면 대기!

            wait();

        }

        String packet = buffer[head];  // 선출하라!

        head = (head + 1) % buffer.length;  // Circular 큐라서 header 의 위치가 바뀜!

        count--;

        notifyAll();  // 버퍼에서 하나를 가져갔으니 put 해도 된다고 이벤트 날림!!!

        return packet;

    }

}



코드의 중요 부분을 설명해보면 (주석과 함께 살펴보세요)

Producer 클래스는  1초 에 한번씩 table 에 id 를 집어 넣습니다. 
Consumer 클래스는 table 에서 id 를 가져와서 출력합니다.

Table 클래스는 좀 눈여겨 봐야합니다.
String 배열을 가지고 큐를 구현한 코드이며. 더불어 생산자와 소비자 클래스에서 그 배열에 동시접근을 막기위한 장치들이 있습니다. synchronized 와 wait() 인데요. 이 두가지는 매우 코어적인  방식이며 바른 이해가 필요합니다. 이것을 직접 조작하면서 멀티쓰레드 프로그래밍을 하는것에 대한 위험성이 공감대를 얻으면서, 좀 더 하이레벨에서 조작하는 방식들이 생겨납니다. 
그것을 위해 등장한것이 STM , Actor 등이 있으며, 아래에서는 2번의 큐를 훌륭한 개발자들이 미리 구현한 java.util.concurrent 를 사용합니다.


3.  멀티쓰레드상에서 생산자 - 소비자 (by JAVA Concurrent 패키지) 

위의 Table 클래스를  java.util.concurrent 패키지를 이용하여 구현해보면 


public class Table {

private final BlockingQueue<String> buffer; 

public Table(int count) {

 this.buffer = new ArrayBlockingQueue<String>(10);

}

public  void put(String packet) {

Thread.sleep(1000);

buffer.put(packet);

}

public  String take()  {

String packet = buffer.take();

return packet;

}

}

이렇게 BlockingQueue 를 사용함으로써 매우 간단해졌으며 실수의 여지를 줄였습니다.

하지만!! 이것도 동기화 객체를 사용하며, 상태변경에 대한 책임소재가 명확하지 않습니다. 결국 한층 더 동기화 문제를 줄일수있는 Actor 패턴이라는것으로 진화됩니다.   (조금있다가 설명합니다) 


4.  멀티쓰레드상에서 생산자 - 소비자 (by Python) 

* java 코드와 기능이 동일합니다. 설명은 주석으로 대신합니다.

#-*- coding: utf-8 -*-

from threading import Thread, Condition

import time

import random

 

queue = []  # 리스트입니다만 동기화가 기본적으로 됩니다. 

condition = Condition()  #  java 의 wait() 및 synchronized  혼종 

 

class ProducerThread(Thread):

    i = 0

    def run(self):

        global queue

        while True:

            condition.acquire() 

            if len(queue) == 100: // 버퍼가 가측 차면 대기!

                condition.wait()

            num = self.nextId()  // 생산 

            queue.append(num)  // 큐에 후입하라!

            condition.notify()      // 버퍼에 먼가가 들어 갔으니 take 해도 된다고 이벤트 날림!!

            condition.release()

            time.sleep(1)

 

    def nextId(self):

        self.i = self.i + 1

        return self.i

 

class ConsumerThread(Thread):

    def run(self):

        global queue

        while True:

            condition.acquire()

            if not queue:   // 버퍼가 비어있으면  대기!

                condition.wait()

            num = queue.pop(0)  // 선출하라!

            print "Consumed", num  // 소비~~~~~~

            condition.notify()  // 버퍼가 비었으니 put 해도 된다고 이벤트 날림!!

            condition.release()

            time.sleep(1)

 

 

ConsumerThread().start()  

ProducerThread().start()

* 참고로 하나의 쓰레드가 wait() 되는 순간에 condition.acquire() 는 해제됩니다. 

자. 이렇게 생산자-소비자 패턴에 대해서 알아봤는데요.
위에 3번에서 말해다시피 java.util.concurrent 를 사용하는것도 쉽지 않기 때문에 (물론 복잡도에 따라서 코어를 사용하는것이 문제점을 더 잘 파악하는 길이기도 합니다)  결국 한층 더 동기화 문제를 줄일수있는 Actor 패턴이라는것으로 진화됩니다.  


5.  Actor  

Actor 란 말 그대로 '행동자' 입니다. 능동적으로 비동기 메세지를 처리하는 녀석인데요.
능동적이라는 말은 쉽게 말해 쓰레드 하나가 할당되어 있다는 얘기입니다. 살아 숨쉬고 있죠. 
또한 비동기 적으로 메세지를 처리할수있습니다. 이건 메세지를 담아둘수있는 자신만의 큐를 가지고 있다는 얘기입니다.

즉 Actor = 객체+루프를 가진 쓰레드+큐 로 이루어진 것 이라고 보면됩니다.

이왕 Actor 가 나온김에 이것으로부터 나온 패턴들을 간단히 살펴보면 Reactor 패턴이 있습니다.

6.  Reactor 

 Node.js 의 기반패턴이라고 알려져있으며 , 쉽게 말해 Actor + 이벤트 핸들러 라고 보시면됩니다.
 우리말로 해석해보면 "반응로" 정도 될거 같은데요.  즉 Actor 안에다가 어떤 swich 문을 두고서 어떤 이벤트가 날라오면 
 그것과 연결된 어떤 핸들러를 디스패칭하라입니다.  보통 단일쓰레드로 이루어져있기때문에 하나의 핸들러가 시간을 많이 잡  아먹게 되면 전체적으로 성능이 급격히 저하됩니다.

 성능면이라든지 몇몇가지 단점이 있습니다. 이 얘기는 Node.js 도 그런 단점이 있다는 얘기겠지요. 
 물론 페라리보다 더 빠른 것들이 많이 있다고해서 페라리가 후진차가 되는것은 아닙니다. 

 다시 돌아와서 , Reactor 패턴 보다 더 나은 방식은 무엇이냐 라고 한다면 Proactor 패턴이라고 있습니다.

 7.  Proactor 

  Proactor 패턴 을 우리말로 바꾸면 음 잘 떠오르질 않네요. Reactor 랑 비교해서 생각해보면 Reactor 는 어떤 이벤트가 날라오   면 그 이벤트에 해당하는 행동을 하는것이라고 말씀드렸습니다.
  Proactor 는 먼저 행동을 디스패치하고 , 그 행동에 따른 결과가 날라오게 됩니다. 
  예를들면, Reactor 는  "너 지금 버퍼에 메세지를 담을 수 있어"   라고 메세지가 날라오면 그때서야 메세지 담는 행동을 하고
  Proactor 는 먼저 메세지에 담는 행동을 요청하면  "버퍼에 메세지가 모두 담겼어"  라는 결과 메세지가 날라오게 됩니다.
  사실 이해하기가 쉽지 않긴할텐데요. 윈도우 네트워크 프로그래밍을 해보셨다면 Select 가 react , IOCP 가 proact 라고 생각하   시면 얼추 들어 맞습니다. 



8.  Actor 패턴으로 해보는 생산자 - 소비자 (by scala) 

* 참고로 스칼라 액터는 deprecated 되었습니다. 밑에 설명할 akka 를 대신 사용해야합니다. 

* 액터를 사용한다면 , 어떤한 객체 및 값의 상태관리는 단 하나의 액터에서 전담시키는 방향으로 해야합니다. 내가 무엇인가를 수정하고 싶으면 그 해당 액터에 메세지를 전달하라는 뜻이지요. 

* 액터는 위치투명성을 갖기때문에 , 네트워크 넘어에 있는 액터와도 동일한 방식으로 사용 할 수 있게됩니다.

* 여기서는 actor 안에 Thread.sleep 으로 강제로 멈춤을 시도했지만,  액터안에서 저런식으로 흐름을 막는것은 지양해야합니다.

굳이 흐름을 막고 싶으면, 다른 전용 액터를 만들어서 그쪽 메세지를 보내고 받는 타이밍을 이용해야합니다.

import scala.actors.Reactor


object Test {

  case class Stop()

  case class Get(from: Reactor[Any])

  case class Put(x: Int)


  class UnboundedBuffer extends Reactor[Any] {

    def act() {

      react {

        case Get(from) =>

          val consumer = from

          react {

            case Put(x) =>

              consumer ! x

              act()

          }

      }

    }

  }


  class Producer(buf: UnboundedBuffer) extends Reactor[Any] {

    

    def act() {

      var i = 0

      while (i < 10) {

        i += 1

        Thread.sleep(1000)

        buf ! Put(i)

      }

    }

  }


  class Consumer(buf: UnboundedBuffer) extends Reactor[Any] {

    

    def act() {

      Thread.sleep(1000)

      buf ! Get(this)

      react {

        case res =>

            println(res)

          act()

      }

    

    }

  }


  def main(args: Array[String]) {

    val parent = new Reactor[Any] {

      def act() {

        val buffer = new UnboundedBuffer

        buffer.start()

        val producer = new Producer(buffer)

        producer.start()

        val consumer = new Consumer(buffer)

        consumer.start()

       

      }

    }

    parent.start()

  }

}


9.  Akka 로 해보는 생산자 - 소비자 (by scala) 

object HelloWorld extends App {

    val system = ActorSystem("ProConSystem")

    val con = system.actorOf(Props[ConsumerActor])

    val pro =  system.actorOf(Props(new ProducerActor(con)))

    pro ! "start"


}


class ProducerActor (con: ActorRef) extends Actor {

  

  def receive = {

    case "start" => 

      var i = 0

      

      while (i < 10){

           con ! i  

           i+=1

           Thread.sleep(1000)

      }

  }

}


class ConsumerActor extends Actor {

  def receive = {

    case id : Integer =>

        println("ID : " + id)

  

  }

}


10.  Akka 로 해보는 생산자 - 소비자 (by java) 

  public static void main(String... args) throws Exception {


        final ActorSystem actorSystem = ActorSystem.create("procon-System");

        final ActorRef actorRef = actorSystem.actorOf(Props.create(ProducerActor.class), "ProducerActor");


        actorRef.tell(new Command("START"), null);

        Thread.sleep(10000);

        System.out.println("Actor System Shutdown Starting...");

        actorSystem.shutdown();

    }


public class ProducerActor extends UntypedActor {


    private final ActorRef conActor;

    public ProducerActor() {

        conActor = getContext().actorOf(Props.create(ConsumerActor.class), "ConsumerActor");

    }


    @Override

    public void onReceive(Object msg) throws Exception {


        if (msg instanceof Command) {

            final String data = ((Command) msg).getData();

            if (data.equals("START")){

            int i = 0;

           

            while (i < 10){

            conActor.tell(i, null);

            System.out.println("send: "  + i );

            i++;

            Thread.sleep(1000);

            }

           

            }


        } 

    }

}


public class ConsumerActor extends UntypedActor {


    @Override

    public void onReceive(Object msg) {

     System.out.println("Received Event: " + msg);

    }



}


12.  Vertx 로 해보는 생산자 - 소비자 (by java) 

public class VertxApp {


    public static void main(String[] args) {

        Vertx vertx = Vertx.vertx();

        vertx.deployVerticle(new ConsumerVerticle());

        vertx.deployVerticle(new ProducerVertcle());

    }


}


public class ProducerVertcle extends AbstractVerticle {

    

private static int id = 0; 

    

@Override

    public void start() {

        System.out.println("ProducerVertcle started!");

        while (true) {

            String packet = "No : " + nextId();

            vertx.eventBus().send("anAddress", packet);

        }

       

    }


    private static synchronized int nextId() {

        return id++;

    }


}


public class ConsumerVerticle extends AbstractVerticle {


    public void start() {

       

    vertx.eventBus().consumer("anAddress", message -> {

            System.out.println(" received message: " + message.body());

        });

    }

    

}


13. Akka 와 Vert.x 의 선택 이슈 

왜 우리는 클라우드 시스템에  Akka 를 사용하게 되었나?   참고 

Comments