Python

파이선 동시성 프로그래밍 - (3) Queue

[하마] 이승현 (wowlsh93@gmail.com) 2017. 4. 27. 16:07

연재 순서 

1. threading
2. Condition & Semaphore
3. Queue
4. multiprocessing
5. 비동기 (gevent) 
6. 분산 (celery)
7. GPGPU (PyCUDA)
8. 코루틴,asyncio,async/awiat
9. concurrent.future


3. Queue

다음 예 에서는 생산자-소비자 패턴의 가장 중요한 큐를 다룰 것이다. 생산자가 아이템을 큐에 넣어주면 소비자들은 그것을 가져와서 사용하는데 , 그것들 사이의 동기화는 Queue 에서 모두 해결해준다. 즉 더이상 생산자가 task 를 추가할 수 없으면 put 에서 대기를 하게되고, 소비자가 더 이상 가져올 task 가 없으면 get 에서 기다리게 된다.

import threading
import time
import logging
import random
import Queue

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-9s) %(message)s',)

BUF_SIZE = 10
q = Queue.Queue(BUF_SIZE)

class ProducerThread(threading.Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, verbose=None):
        super(ProducerThread,self).__init__()
        self.target = target
        self.name = name

    def run(self):
        while True:
            if not q.full():
                item = random.randint(1,10)
                q.put(item)
                logging.debug('Putting ' + str(item)  
                              + ' : ' + str(q.qsize()) + ' items in queue')
                time.sleep(random.random())
        return

class ConsumerThread(threading.Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, verbose=None):
        super(ConsumerThread,self).__init__()
        self.target = target
        self.name = name
        return

    def run(self):
        while True:
            if not q.empty():
                item = q.get()
                logging.debug('Getting ' + str(item) 
                              + ' : ' + str(q.qsize()) + ' items in queue')
                time.sleep(random.random())
        return

if __name__ == '__main__':
    
    p = ProducerThread(name='producer')
    c = ConsumerThread(name='consumer')

    p.start()
    time.sleep(2)
    c.start()
    time.sleep(2)


* 주의 사항 : multiprocessing 의  경우에는 이 Queue 말고 , from multiprocessing import Queue 를 사용함.



LifoQueue 

참고로 멀티프로세서용 Queue 는 FIFO 인데, LIFO 로 하려면 ( 블로그 독자님이 메일로 물어보셔서 찾아봄) 

multiprocessing-managers 라는 것을 통해서 해결 할 수 있을거 같다.
아래는 LifoQueue 예제입니다. LifQueue 자체는 멀티프로세서에서 사용못하지만 
managers 를 통해서 사용 할 수 있게 하는거 같습니다. 테스트를 통해 확인 하고 사용하세요.

from multiprocessing import Process
from multiprocessing.managers import BaseManager
from time import sleep
from queue import LifoQueue


def run(lifo):
    """Wait for three messages and print them out"""
    num_msgs = 0
    while num_msgs < 3:
        # get next message or wait until one is available
        s = lifo.get()
        print(s)
        num_msgs += 1


# create manager that knows how to create and manage LifoQueues
class MyManager(BaseManager):
    pass
MyManager.register('LifoQueue', LifoQueue)


if __name__ == "__main__":

    manager = MyManager()
    manager.start()
    lifo = manager.LifoQueue()
    lifo.put("first")
    lifo.put("second")

    # expected order is "second", "first", "third"
    p = Process(target=run, args=[lifo])
    p.start()

    # wait for lifoqueue to be emptied
    sleep(0.25)
    lifo.put("third")

    p.join()



레퍼런스:

http://www.bogotobogo.com/python/Multithread/python_multithreading_Synchronization_Condition_Objects_Producer_Consumer.php