Python
파이선 동시성 프로그래밍 - (3) Queue
[하마] 이승현 (wowlsh93@gmail.com)
2017. 4. 27. 16:07
연재 순서
1. threading
2. Condition & Semaphore
3. Queue
4. multiprocessing
5. 비동기 (gevent)
6. 분산 (celery)
7. GPGPU (PyCUDA)
8. 코루틴,asyncio,async/awiat
9. concurrent.future
3. Queue
다음 예 에서는 생산자-소비자 패턴의 가장 중요한 큐를 다룰 것이다. 생산자가 아이템을 큐에 넣어주면 소비자들은 그것을 가져와서 사용하는데 , 그것들 사이의 동기화는 Queue 에서 모두 해결해준다. 즉 더이상 생산자가 task 를 추가할 수 없으면 put 에서 대기를 하게되고, 소비자가 더 이상 가져올 task 가 없으면 get 에서 기다리게 된다.
import threading import time import logging import random import Queue logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-9s) %(message)s',) BUF_SIZE = 10 q = Queue.Queue(BUF_SIZE) class ProducerThread(threading.Thread): def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, verbose=None): super(ProducerThread,self).__init__() self.target = target self.name = name def run(self): while True: if not q.full(): item = random.randint(1,10) q.put(item) logging.debug('Putting ' + str(item) + ' : ' + str(q.qsize()) + ' items in queue') time.sleep(random.random()) return class ConsumerThread(threading.Thread): def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, verbose=None): super(ConsumerThread,self).__init__() self.target = target self.name = name return def run(self): while True: if not q.empty(): item = q.get() logging.debug('Getting ' + str(item) + ' : ' + str(q.qsize()) + ' items in queue') time.sleep(random.random()) return if __name__ == '__main__': p = ProducerThread(name='producer') c = ConsumerThread(name='consumer') p.start() time.sleep(2) c.start() time.sleep(2)
* 주의 사항 : multiprocessing 의 경우에는 이 Queue 말고 , from multiprocessing import Queue 를 사용함.
LifoQueue
참고로 멀티프로세서용 Queue 는 FIFO 인데, LIFO 로 하려면 ( 블로그 독자님이 메일로 물어보셔서 찾아봄)
아래는 LifoQueue 예제입니다. LifQueue 자체는 멀티프로세서에서 사용못하지만
managers 를 통해서 사용 할 수 있게 하는거 같습니다. 테스트를 통해 확인 하고 사용하세요.
managers 를 통해서 사용 할 수 있게 하는거 같습니다. 테스트를 통해 확인 하고 사용하세요.
from multiprocessing import Process
from multiprocessing.managers import BaseManager
from time import sleep
from queue import LifoQueue
def run(lifo):
"""Wait for three messages and print them out"""
num_msgs = 0
while num_msgs < 3:
# get next message or wait until one is available
s = lifo.get()
print(s)
num_msgs += 1
# create manager that knows how to create and manage LifoQueues
class MyManager(BaseManager):
pass
MyManager.register('LifoQueue', LifoQueue)
if __name__ == "__main__":
manager = MyManager()
manager.start()
lifo = manager.LifoQueue()
lifo.put("first")
lifo.put("second")
# expected order is "second", "first", "third"
p = Process(target=run, args=[lifo])
p.start()
# wait for lifoqueue to be emptied
sleep(0.25)
lifo.put("third")
p.join()
레퍼런스: