파이썬 동시성 프로그래밍 - (5) 비동기 (gevent)
연재 순서
1. threading
2. Condition & Semaphore
3. Queue
4. multiprocessing
5. 비동기 (gevent)
6. 분산 (celery)
7. GPGPU (PyCUDA)
8. 코루틴,asyncio,async/awiat
9. concurrent.future
5. gevent
Welcome to cooperative scheduling
최신 IO 시스템 (asyncio) 과 마찬가지로, gevent는 스케줄링 개념으로 작동합니다. 이것은 이전과 같은 몽키패치 덕분에 코드에서 거의 숨겨질 수있는 방식으로 promise 와 같은 시스템 (비동기 요청이 완료된 후에 수행해야 할 일들을 요구)으로 수행 할 수 있습니다. 스케줄러는 greenlet 컨텍스트를 신속하고 빈번하게 전환 할 수 있도록 작성되었으므로 각 greenlet 에 충분한 대기 시간을 허용합니다 (기억해야 할 중요한점). 이 greenlet 중 하나가 IO 바운드 작업에 부딪 칠 때마다 libevent로 전송 한 다음 컨텍스트 전환을 허용하도록 스케줄러에 양보합니다. 그 외에도, 내부는 매우흥미롭지만, 당장은 이해 할 필요가 없습니다.
Getting started
먼저 웹 크롤러를 만들어보며 시작하겠습니다. 우리는 시드 URL을 가져 와서 그 페이지로 부터 다음 링크를 계속 따라갈 것입니다.
import sys
import re
import requests
# Track the urls we've found
crawled = 0
def crawler(u):
'''A very simple web crawler'''
global crawled
# Crawl the page, print the status
response = requests.get(u)
print response.status_code, u
# Extract some links to follow using a *really* bad regular expression
for link in re.findall('<a href="(http.*?)"', response.content):
# Limit to 10 pages
if crawled < 10:
crawled += 1
crawler(link)
# Read the seed url from stdin
crawler(sys.argv[1])
가장 먼저 주의해야 할 점은 이 크롤러가 깊이우선재귀라는 것입니다. 이는 안좋을 수 있지만 단일 스레드 시스템에서 구현하기가 가장 쉽습니다. O (n)의 시간 복잡성은 각 웹 페이지를 순차적으로 로드하기 때문이죠. 이제 기반을 확보 했으니까 더 좋게 만들어 봅시다. 모든 gevent 응용 프로그램은 monkey 로 시작해야합니다.
import gevent.monkey
gevent.monkey.patch_all()
gevent 스케줄러를 설정하는 데 필요한 모듈을 임포트 합니다.
Let’s get restructuring
gevent 크롤러의 첫 번째 기본 접근 방식은 다음과 같습니다.
# monkey-patch
import gevent.monkey
gevent.monkey.patch_all()
import gevent
import sys
import re
import requests
# List for holding our greenlets
greenlets = []
def crawler(u):
'''A very simple gevented web crawler'''
global crawled
# Crawl the page, print the status
response = requests.get(u)
print response.status_code, u
# Extract some links to follow
for link in re.findall('<a href="(http.*?)"', response.content):
# Limit to 10 pages
if len(greenlets) < 10:
greenlets.append(gevent.spawn(crawler, link))
# Read the seed url from stdin
greenlets.append(gevent.spawn(crawler, sys.argv[1]))
# Wait until we've spawned enough url requests
while len(greenlets) < 10:
gevent.sleep(1)
# Wait for everything to complete
gevent.joinall(greenlets)
이미 훨씬 좋아 보입니다. breadth-first 은 웹 크롤링과 관련하여 훨씬 더 좋은 생각인데, 이론적인 시간 복잡성은 O(2)로 낮아졌습니다.
이 크롤러는 greenlets 의 비용이 싸다는 좋은 특징을 보여줍니다. 일반적으로 IO 작업당 추가하여 병렬 처리 할 수 있으며 스케줄러는 이를 체계적으로 처리합니다. 안타깝게도 아직 이것은 유용한 크롤러와는 거리가 있습니다. greenlet 당 하나의 HTTP 요청을 수행하기 때문에 조금 더 넓게 열면 약간의 네트워크 문제가 발생할 수 있습니다. 그래서 좀 더 다른 방식도 생각해 봅시다.
Introducing worker pools
이제 액션 당 greenlet를 실행시키는 대신에, 우리는 greenlet 풀을 사용할 수 있습니다. 정해진 개수의 풀에 액션을 위임합니다.
# monkey-patch
import gevent.monkey
gevent.monkey.patch_all()
import gevent.pool
import sys
import re
import requests
# Prepare a pool for 5 workers
pool = gevent.pool.Pool(5)
# Crawl tracker is back
crawled = 0
def crawler(u):
'''A very simple pooled gevent web crawler'''
global crawled
# Crawl the page, print the status
response = requests.get(u)
print response.status_code, u
# Extract some links to follow
for link in re.findall('<a href="(http.*?)"', response.content):
# Limit to 10 pages (ignores links when the pool is already full)
if crawled < 10 and not pool.full():
crawled += 1
pool.spawn(crawler, link)
# Read the seed url from stdin
pool.spawn(crawler, sys.argv[1])
# Wait for everything to complete
pool.join()
풀은 joinable 합니다. 즉, 요청된 모든 작업이 처리 될 때까지 대기하도록 응용 프로그램에 지시 할 수 있습니다 (즉, 풀이 준비 상태에 있음).
실제로 이것은 개별적인 풀 동작이 설명 할 수 없게 (infinte loop, anyone?) 도달 할 수 없기 때문에 때로는 두통을 일으킬 수 있습니다. 그 결과 전체 풀이 결코 안전하게 결합되지 않을 수 있습니다
Pool spawning is a blocking action
gevent 풀에 스레드를 생성하려고하면 gevent는 풀이 꽉 찼는지를 확인하고 그렇지 않은 경우 가용성을 기다립니다. 이것은 몇 가지 문제를 일으킵니다. 각 크롤러 풀은 pool.spawn을 호출하기 때문에 어떤 시점에 5 개의 풀 크기가 활성 상태이며, 링크를 찾으면 모두 pool.spawn을 동시에 호출합니다. 배고픈 철학자를 위해 포크를 남겨주어야 할텐데요..(식사하는 철학자 알고리즘)
But we want to crawl everything!
다행스럽게도, 풀링을 사용하는 것과는 별개로 우리가 잊고 있었던 한 가지 데이터 구조를 통해서 바보 같은 장난감 같은 크롤러를 강력한 웹 크롤링 응용 프로그램으로 바꿔 줄 것입니다.::
queueing.
# monkey-patch
import gevent.monkey
gevent.monkey.patch_all()
import gevent.pool
import gevent.queue
import sys
import re
import requests
# Prepare a pool for 5 workers and a messaging queue
pool = gevent.pool.Pool(5)
queue = gevent.queue.Queue()
crawled = 0
def crawler():
'''A very simple queued gevent web crawler'''
global crawled
while 1:
try:
u = queue.get(timeout=1)
response = requests.get(u)
print response.status_code, u
# Extract some links to follow
for link in re.findall('<a href="(http.*?)"', response.content):
# Limit to 10 pages (ignores links when the pool is already full)
if crawled < 10:
crawled += 1
queue.put(link)
except gevent.queue.Empty:
break
queue.put(sys.argv[1])
# Read the seed url from stdin
for x in xrange(0, 5):
pool.spawn(crawler)
# Wait for everything to complete
pool.join()
좋아 보이네요? 일단 먼저 한꺼번에 5개의 crawler 를 스폰합니다. 그 후 각 crawler 들은 큐에 데이터가 들어오길 기다리고 있으며 결과적으로 서로 공평하게 작업을 분배햐여 처리하고 있습니다. 싸울 일이 없어요. 특히 우리는 시드 URL에 의존하여 로드하는 데 1 초도 채 걸리지 않습니다.
Workers are now their own loops
이전에는 URL을 크롤링 할 때마다 새 작업자가 생성되었습니다.동일한 시스템에서 각 작업자는 정확히 하나의 대기열 메시지를 소비 한 다음 종료합니다. 그렇다고 이것이 세계에서 가장 효율적인 시스템은 아닙니다 우리는 각각의 greenlets을 생성하고 대기열 메시지가 더 이상 존재하지 않을 때까지 (또는 대기열이 1 초 이상 비어있을 때까지) 대기하게합니다.
Let’s do one better
gevent의 사용을 최적화하기 위해 (컨텍스트 스위치는 많이 들지 않지만 비용은 여전히 있습니다), 우리는 다음을 원합니다.
1.queue에있는 모든 메시지를 소비하거나
2.Workers 들과 함께 pool 을 가득 채우다.
가능한 경우 대기열에 이미 우리가 허용하는 것보다 많은 메시지가있는 경우에만 풀을 사용하여 # 2를 수행 할 수 있다는 명백한 주의 사항이 있습니다.
# monkey-patch
import gevent.monkey
gevent.monkey.patch_all()
import gevent.pool
import gevent.queue
import sys
import re
import requests
# Prepare a pool for 5 workers and a messaging queue
pool = gevent.pool.Pool(5)
queue = gevent.queue.Queue()
crawled = 0
def crawler():
'''A very simple queued gevent web crawler'''
print 'starting crawler...'
global crawled
while 1:
try:
u = queue.get(timeout=0)
response = requests.get(u)
print response.status_code, u
# Extract some links to follow
for link in re.findall('<a href="(http.*?)"', response.content):
# Limit to 10 pages (ignores links when the pool is already full)
if crawled < 10:
crawled += 1
queue.put(link)
except gevent.queue.Empty:
break
print 'stopping crawler...'
queue.put(sys.argv[1])
pool.spawn(crawler)
while not queue.empty() and not pool.free_count() == 5:
gevent.sleep(0.1)
for x in xrange(0, min(queue.qsize(), pool.free_count())):
pool.spawn(crawler)
# Wait for everything to complete
pool.join()
이 작업을 실행하면 정확히 한 명의 worker 가 생성되고 나머지 worker 는 대기열에 푸시됩니다. 그러면 다른 4 명의 worker 가 시작되어 크롤링 합니다.!
And now you can Gevent
이러한 패턴은 IO 바인딩 multiprocessing 시스템의 기초를 형성하며 시스템 IO 호출 (로컬 파일 시스템에 대한 많은 읽기 / 쓰기), 로컬 스레딩 및 모든 네트워크 IO에서 대기하는 모든 것에 적용될 수 있습니다.
번역:
http://blog.hownowstephen.com/post/50743415449/gevent-tutorial