파이썬의 함정 - 3 


참조,얕은 복사,깊은 복사


모든 언어가 동일한 정책을 취하지 않기 때문에, 이 문제는 어떤 문제에서나 뒷목을 잡게 만들 수 있다. 개인적으로 여러 언어를 다루는 사람들은 이런 참조 문제를 외우지 말고, 항상 테스트를 해봐야 한다고 생각한다. 뒷통수 맞기 싫으면~

파이썬도 마찬가지로 함정이 숨어있는데 , 사실 이게 어떤 깊은 이해를 필요로 하는 문제가 아니기 때문에 그냥 코드를 보고 느껴보자.  (물론 call by value, call by reference , call by share 등에 대한 기본 이해는 있다고 가정)


코드1) 

a = [1,2,3]
b = a
a.append(4)

print b # 결과 [1,2,3,4]

자, 파이썬에서 변수는 값을 담는 그릇이 아니다. 그냥 값에 대한 라벨링 정도? 
그래서 a , b 는 둘다 어떤 (여기선 [1,2,3] ) 이것을 가르키고 있으며, a 가 그것에 추가하면 당연히 b 도 같은 것을 바라보기 때문에 [1,2,3,4] 로 출력된다.


코드2) 

#coding=utf-8
a = [1,2,3]
b = a

if a == b:
print "a b 는 값이 같다"

if a is b:
print "a b 는 정체성이 같다"

c = [1,2,3]
if a == c:
print "a b 는 값이 같다"

if a is c:
print "a b 는 정체성이 같다"

파이썬에서 값이 같은지 비교하는것은 == 이고, is 의 경우는 같은 객체인지를 검토한다.
(스칼라는 == 는 값을 비교하고, 자바는 == 이것이 같은 객체인지 비교한다. 언어마다 다르니 확인해야함)


코드3

#coding=utf-8

a = [1,[2,3],(4,5,6)]
b = list(a) # 또는 a[:]

if a == b:
print ("a b 는 값이 같다")

if a is b:
print ("a b 는 같은 객체이다")

그냥 b = a 하면 같은 객체가 되나, list 나 a[:] 를 통해서 복사생성하면 다른 객체가 된다.


* 참고로 파이썬에서는 import copy 를 하고 copy.copy() 를 통해서는 얕은 복사를 지원하고, copy.deepcopy() 를 통해서는 깊은 복사를 지원한다.


코드4

class Bus:

def __init__(self, passengers=None):
if passengers is None:
self.passengers = []
else:
self.passengers = list(passengers)

def pick(self, name):
self.passengers.append(name)

def drop(self, name):
self.passengers.remove(name)

bus1 = Bus(['Alice', 'Bill', 'Claire', 'David'])
bus2 = copy.copy(bus1)
bus3 = copy.deepcopy(bus1)
bus1.drop('Bill')
print bus2.passengers
#['Alice', 'Claire', 'David']
print bus3.passengers
#['Alice', 'Bill', 'Claire', 'David']

원본에서 하나의 요소가 삭제되었을때,
얕은복사를 한 것 (bus2) 은 영향을 받고 있고, 깊은 복사 (bus3) 를 한 것은 영향을 받지 않는다.


코드5)

가변형을 매개변수 기본값으로 사용 했을 때의 문제점을 살펴보자.


class HauntedBus:
def __init__(self, passengers=[]): # <1>
self.passengers = passengers # <2>

def pick(self, name):
self.passengers.append(name) # <3>

def drop(self, name):
self.passengers.remove(name)

__init__ 생성자의 매개변수로 [] 가 기본값으로 사용되었다. 

bus1 = HauntedBus(['Alice', 'Bill'])
bus1.passengers
['Alice', 'Bill']
bus1.pick('Charlie')
bus1.drop('Alice')
bus1.passengers
#['Bill', 'Charlie']
bus2 = HauntedBus()
bus2.pick('Carrie')
bus2.passengers
#['Carrie']
bus3 = HauntedBus()
print bus3.passengers
#['Carrie'] # 이거 뭡미??? 아무것도 넣은게 없는데 Carrie 라 빡~~~ 나옴
bus3.pick('Dave')
bus2.passengers
#['Carrie', 'Dave']
bus2.passengers is bus3.passengers
#True
bus1.passengers
#['Bill', 'Charlie']


dir(HauntedBus.__init__) # doctest: +ELLIPSIS
#['__annotations__', '__call__', ..., '__defaults__', ...]
HauntedBus.__init__.__defaults__
#(['Carrie', 'Dave'],)
HauntedBus.__init__.__defaults__[0] is bus2.passengers # True

여기서 문제는 각 기본값이 함수가 정의 될 때 (즉,일반적으로 모듈이 로딩 될 때) 평가되고 기본값은 함수 객체의 속성이 된다는 것이다. 따라서 기본값이 가변 객체고, 이 객체를 변경하면 변경 내용 이 향후에 이 함수의 호출에 영향을 미친다. 

가변 기본값에 대한 이러한 문제 때문에, 가변 값을 받는 매개변수의 기본값으로 None 을 주로 사용하며,  __init__ 메서드는 passengers 인수가 None 인지 확인하고 새로 만든 빈 리스트를 할당한다. 

즉 더 방어적으로 프로그래밍 하기 위해서는 아래처럼 코딩해야 한다.

class TwilightBus:

def __init__(self, passengers=None):
if passengers is None:
self.passengers = [] # <1>
else:
self.passengers = passengers #<2>

def pick(self, name):
self.passengers.append(name)

def drop(self, name):
self.passengers.remove(name) # <3>


레퍼런스:

전문가를 위한 파이썬







    파이썬 Asyncio 를 이해하기
위한 여정[번역]

 [원제: 파이썬 ASYNCIO / nODE.JS  와 함께 비동기 io 이해하기]





소개

이번 여름에 Node.js에서 실행되는 웹 플랫폼에서 작업했었습니다. Node.js 를 가지고 풀 타임으로 일한 것은 이번이 처음이었고, 몇 주 동안 작업 한 결과 꽤 클리어해 진 것은 그 당시 나 자신을 비롯한 많은 개발자들이 Node의 비동기 기능에 대해 정확히 설명하지 못했다는 점이었습니다. 이 글에서는 js 작업 및 하위 수준에서의 구현 방법에 대해 설명합니다. 라이브러리를 효율적으로 사용하는 유일한 방법은 그것이 어떻게 작동하는지 명확하게 이해하는 것이고 그러기 위해서 깊이 파고 들기로 결정했습니다. 이 호기심은 다른 언어, 특히 파이썬에서 유사한 비동기 기능을 구현하는데도 나에게 도움을 주었습니다. 파이썬은 내가 관심을 가지고 있어서 공부하는 언어로 , 이번 경험으로  Python 3.4의 비동기 IO 라이브러리 asyncio 에 대한 이해도 깊어 졌습니다. 이 라이브러리에서는 코루틴을 적극적으로 사용하므로 코루틴에 대한 기존의 관심사도 믹스되었습니다. 이 게시물은 이 주제에 대해  더 많은 것들을 배우면서 제기된 질문 및 답변을 찾기 위한 여정의 결과물입니다. 다른 사람들이 이 글을 읽고서 몇 가지 의문점을  분명히 하는데 도움이되기를 바랍니다.

모든 파이썬 코드는 파이썬 3.4 용입니다. Python 3.4가 asyncio 뿐만 아니라 selectors 모듈을 도입했기 때문입니다. Python의 3.4 이전 버전에서는 Twisted, gevent 및 tornado와 같은 라이브러리를 통해 유사한 기능을 제공 받을 수 있습니다.

아래의 초기 예제에서는 학습을 단순화하기  위해 오류 처리 및 예외 문제를 거의 완전히 무시했지만, 예외를 적절하게 처리하는 것은 앞으로 보게될 코드 유형들의 매우 중요한 부분이 되어야한다는 점에 유의해야합니다. 이 글의 마지막으로 파이썬 3.4의 asyncio 모듈이 예외를 처리하는 방법에 대한 몇 가지 예를 제공 할 것입니다.

시작하기 : Hello World 

아주 간단한 문제를 해결하기 위한 프로그램 부터 작성해 보겠다. 앞으로 이 문제를 해결하기 위해 코드를  요리조리 변경해 가며 설명해 나갈 것이다.

"Hello world!"를 출력 하기위한 프로그램을 작성하라. 3 초마다 출력되며 이와 동시에 사용자로부터의 입력을 기다린다. 각 사용자 입력 행에는 하나의 양수 n이 포함되며 , 입력되면 피보나치 수 F (n)을 계산하여 출력하고  다시 입력을 기다린다. 즉 3초마다 출력 & 입력받아서 어떤 계산 2가지 일을 하게 만드는 것이다

"Hello world!" 문자열이 사용자 입력의 중간에 삽입되지만 그것에 대해 신경쓰지말라. 중요한것은 문자열이 찍히는 타이밍이 어떻게 다른지에 대해 촛점을 맞추라.

Node.js와 JavaScript에 익숙한 사용자라면 다음과 같은 해결책을 생각할 수 있지 않을까?

log_execution_time = require('./utils').log_execution_time; var fib = function fib(n) { if (n < 2) return n; return fib(n - 1) + fib(n - 2); }; var timed_fib = log_execution_time(fib); var sayHello = function sayHello() { console.log(Math.floor((new Date()).getTime() / 1000) + " - Hello world!"); }; var handleInput = function handleInput(data) { n = parseInt(data.toString()); console.log('fib(' + n + ') = ' + timed_fib(n)); }; process.stdin.on('data', handleInput); setInterval(sayHello, 3000);

보다시피,  Node.js에서는 매우 쉽게 할 수 있다. 우리가 해야 할 일은 "Hello world!"를 출력 할 인터벌 타이머를 설정하는 것 뿐이며  process.stdin의 데이터 이벤트에 이벤트 핸들러를 첨부하면 완료된다. 추상적인 수준에서 이해하기 쉽고 사용하기 쉬우며 잘 작동한다! 그러나 어떻게? 이 질문에 답하기 위해 파이썬 코드로 똑같은 일을 해보려 한다.

먼저 log_execution_time 데코레이터를 사용하여 피보나치 수를 계산하는 데 걸리는 시간을 출력하는 방식을 파이썬에서는 다음 처럼 만든다.  (역주: 우리의 목적에는 별로 의미 없는 것들이다. 굳이 파보나치를 이해 할 필요도 없고 functools 같은 것을 알 필요도 없다. 그냥 어떤 것을 계산하는 do_somthing() 함수라고만 이해하고 넘어가도 된다) 

from functools import wraps from time import time def log_execution_time(func): @wraps(func) def wrapper(*args, **kwargs): start = time() return_value = func(*args, **kwargs) message = "Executing {} took {:.03} seconds.".format(func.__name__, time() - start) print(message) return return_value return wrapper 

(역주:partial이 그 함수가 할 수 있는 모든 것중 특정 부분을 하는 함수로 변환 시키는 느낌이라면 wraps 는 기능을 추가/꾸미는느낌이다. wraps 는 함수를 래핑할 때 원래 함수의 정보들을 유지 시킬 수 있다.  

자바스크립트에서도 유사하게 코딩 할 수 있다.

// We do not care about handling the "this" parameter correctly in our examples.
// Do not use this decorator where that's needed!
module.exports.log_execution_time = function log_execution_time(func) {
    var wrapper = function() {
        start = (new Date()).getTime();
        return_value = func.apply(this, arguments);
        message = "Calculation took " + ((new Date()).getTime() - start) / 1000 + " seconds";
        console.log(message);
        return return_value;
    };
    return wrapper;
};

여기서 사용 된 피보나치 수를 계산하는 알고리즘은 의도적으로 모든 것 중 가장 느린 것 (지수 실행 시간)으로 선택되었다. 이것은 이 게시물이 피보나치 수에 관한 것이 아니기 때문이다.

from log_execution_time import log_execution_time def fib(n): return fib(n - 1) + fib(n - 2) if n > 1 else n timed_fib = log_execution_time(fib)

이제 당면 과제로 돌아가보자. 파이썬은 자바스크립트와 같은 setInterval 또는 setTimeout을 제공하지 않기 때문에 가능한 첫 번째 해결 방법은 OS 수준의 동시성을 사용하는 것이다. 즉 두 개의 쓰레드를 사용해서 해결해보자.

from threading import Thread from time import sleep from time import time from fib import timed_fib def print_hello(): while True: print("{} - Hello world!".format(int(time()))) sleep(3) def read_and_process_input(): while True: n = int(input()) print('fib({}) = {}'.format(n, timed_fib(n))) def main(): # Second thread will print the hello message. Starting as a daemon means # the thread will not prevent the process from exiting. t = Thread(target=print_hello) t.daemon = True t.start() # Main thread will read and process input read_and_process_input() if __name__ == '__main__': main()

아주 간단하다. 근데 스레드 기반 Python 솔루션과 Node.js 솔루션은 동등한가? 실험을 해보자. 위에 언급했듯이 피보나치 수 계산 코드는 매우 느리기 때문에  파이썬에 대해서는 37 개, Node.js에 대해서는 45 개 (자바 스크립트는 숫자 계산시 일반 파이썬보다 상당히 빠르다) 라는 다소 큰 숫자를 사용해 보려한다.

$ python3.4 hello_threads.py
1412360472 - Hello world!
37
1412360475 - Hello world!
1412360478 - Hello world!
1412360481 - Hello world!
Executing fib took 8.96 seconds.
fib(37) = 24157817
1412360484 - Hello world!

파이썬은 계산이 끝나기까지 대략 9 초가 걸리며 "Hello world!" 출력은 3초마다 꾸준히 찍히고 있다.
즉 피보나치 계산을 하는 동안에도 출력은 멈추지 않았다.이제 Node.js로 해보자.

$ node hello.js
1412360534 - Hello world!
1412360537 - Hello world!
45
Calculation took 12.793 seconds
fib(45) = 1134903170
1412360551 - Hello world!
1412360554 - Hello world!
1412360557 - Hello world!

반면에 Node.js를 사용하면  피보나치 수를 계산하는 동안 "Hello world!" 메시지가 일시 중지되었다.
왜 그런것일까? 이것이 어떤 의미가 있는지 살펴 보자.

이벤트 루프와 쓰레드 


(역주:  앞부분의  회색 문장은 일반 쓰레드 얘기니깐 잘 알고 있다면 건너뛰어도 된다.)

이전 섹션 두 솔루션(node, python)의 동작 차이를 이해하려면 스레드 및 이벤트 루프를 간단히 이해해야한다. 쓰레드 부터 시작해 보자. 쓰레드를 명령 시퀀스와 그것들을 실행할 때의 CPU의 현재 상태 (CPU 상태는 예를 들어 레지스터 값, 특히 다음 인스트럭션 레지스터를 참조 함) 라고 생각하자.

간단한 동기 프로그램의 경우 보통 단일 스레드에서 실행되기 때문에 작업이 IO 작업이나 타이머와 같은 무언가를 기다려야하는 경우 그 작업이 끝날 때까지 프로그램 실행이 일시 중지된다. 가장 간단하게 실행을  차단 할 수 있는 작업 중 하나는 sleep 이다. 사실, 모든 sleep 은 주어진 시간 동안 실행되는 쓰레드를 차단한다. 여러 스레드가 하나의 프로세스에서 실행될 수 있으며 동일한 프로세스의 쓰레드는 메모리와 그 주소 공간, 파일 기술자 등과 같은 프로세스 레벨 자원을 공유한다.

운영 체제는 쓰레드 처리를 담당하고 있으며 운영 체제의 스케줄러는 하나의 쓰레드를 일시 중지하고 다른 스레드로 CPU를 제어하여 실행되도록 선택한다. 이를 컨텍스트 스위치라고하며, 현재 쓰레드 (예 : CPU 레지스터 값)의 컨텍스트를 저장 한 다음 살아 날 대상 스레드의 상태를 로드하는 작업을 한다. 컨텍스트 스위칭은  다소 CPU 를 소비하는 비싼 계산이다.

하나의 쓰레드에서 다른 스레드로 전환되는 많은 이유가 있다. 예를 들어, 우선 순위가 높은 또 다른 프로세스 나 스레드가 즉각적인 주의 (예 : 하드웨어 인터럽트를 처리하는 코드)가 필요하거나 스레드 자체가 일시 중지 (예 :  sleep)를 요청하거나 스레드가 전용 시간을 사용했기 때문일 수 있습니다 할당되었지만 (이것은 또한 thread quantum 이라고도 함) 실행을 계속하기 위해 대기열로 돌아 가야합니다.

이제 일반적인 쓰레드 이야기는 여기까지 하고 위의 코드로 돌아가 보자. 
위의 파이썬 코드는 멀티 쓰레드였으며  그것은 두 작업이 동시에 실행될 수 있는 이유와 CPU 집중적인 피보나치 계산이 다른 스레드의 실행을 막지 않는 이유를 설명한다. 즉 CPU 집중적인 계산이 이루어지면서 중간 중간 쓰레드 제어권이 "Hello World" 를 찍는 쓰레드로도 변경된 다는 것이다.

하지만 Node.js는 어떤가? 피보나치 계산이 다른 작업("Hello world" 출력) 을 차단 한다는 사실에 미루어 짐작컨데 우리 코드가 단일 스레드에서 실행되고 있는 것을 알 수 있다. 즉 피보나치 계산이 이루어지는 동안에는 다른 어떠한 작업도 실행 될 수 없다는 것이다. 이것은 실제로 Node.js가 구현되는 방법이다. 

지금까지 운영 체제가 응용 프로그램을 단일 스레드로 실행하고 있다는 것을 염두에 두면 (플랫폼에 따라 libuv가 일부 IO 이벤트에 스레드 풀을 사용할 수 있지만 즉 로우레벨에서 쓰레드가 여러개가 작동하지만 우리가 코딩하고 실행을 하는 위치에서의  자바 스크립트 코드는 여전히 단일 스레드에서 실행되고 있다는 사실이다)

특정 상황에서 스레드를 피하려고 하는 몇 가지 이유가 있는데 하나는 쓰레드가 계산적으로나 자원적으로 비싸고, 다른 하나는 여러 스레드가 동작할 때 공유 메모리와 함께 교착 상태 및 경쟁 조건과 같은 동시성 문제로 인해 더 복잡하고 위험한 코드가 발생할 소지가 높아진다는 점이다.

그럼 "파이썬 코드"를 멀티 쓰레딩을 사용하지 않고 "단일 쓰레드" 로 문제를 해결할 수 있는지 생각해 보자. 그렇게 하기 위해 우리는 Node.js가 내부에서 사용하는 방식을 모방 할것이다. 장면을 떠올려 보자. 

첫번째  이벤트 루프. 

먼저, 표준 입력을 폴링(poll) 하는 방법, 즉 파일 디스크립터 (이 경우 stdin)에 입력 가능 여부를 묻는 시스템 호출이 필요하며 운영 체제에 따라 poll, select, kqueue 등과 같은 다양한 시스템 호출이 있어서 사용 할 수 있다. Python 3.4에서 selectors 모듈은 이러한 시스템 호출에 대한 추상화를 제공하므로 다양한 머신 위에서 (다소) 안전하게 사용할 수 있을 것이다. 

폴링 기능이 갖추고 나서 (코드에서는 selectors 를 이용함)  , 단일쓰레드 하에서 반복되는 이벤트 루프를  매우 간단하게 만들어서 루프를 각각 반복하는 동안 읽을 수 있는 입력이 있는지 확인한다. 그 다음, "Hello world!"의 마지막 출력 이후 3 초 이상 지 났는지 확인하고  "예" 인 경우 출력한다.  코드를 보면 명확해 질 것이다. 

(역주: 간단히 말해 이벤트루프는 그냥 while 이다. while 문 돌면서 ,입력이 있으면 입력 받은 값으로 피보나치 계산해주고, 입력받은게 없고 이전에 "Hello world" 찍은지 3초 지났으면 또 다시  찍는 것이다. 입력이 있는지 확인받는 방법을 selector 를 활용한 것. 참고로 selector 는 대략적으로 이렇다. OS 에게 말하길 "혹시 입력을 받았으면 나한테 알려줘" 라는 부탁을 한다. 코드에서는 EVENT_READ 이다. 그리고 나서 selector.select() 로 그 부탁이 실행 됬는지 확인하기 위해 잠시 확인하는 방식이다.  python 의 selector 는 SelectSelector / PollSelector / EpoolSelector / KqueueSelector 등 OS 맞춤으로 제공한다. 아쉽게도 윈도우즈는 소켓만 가능하고 파이프 (stdio등) 은 지원되지 않고 있는 듯하다. 따라서 이 예제들은 윈도우에서는 안된다.) 

import selectors import sys from time import time from fib import timed_fib def process_input(stream): text = stream.readline() n = int(text.strip()) print('fib({}) = {}'.format(n, timed_fib(n))) def print_hello(): print("{} - Hello world!".format(int(time()))) def main(): selector = selectors.DefaultSelector() # Register the selector to poll for "read" readiness on stdin selector.register(sys.stdin, selectors.EVENT_READ) last_hello = 0 # Setting to 0 means the timer will start right away while True: # Wait at most 100 milliseconds for input to be available for event, mask in selector.select(0.1): process_input(event.fileobj) if time() - last_hello > 3: last_hello = time() print_hello() if __name__ == '__main__': main()

결과는 아래와 같다.

$ python3.4 hello_eventloop.py
1412376429 - Hello world!
1412376432 - Hello world!
1412376435 - Hello world!
37
Executing fib took 9.7 seconds.
fib(37) = 24157817
1412376447 - Hello world!
1412376450 - Hello world!

예상대로, 단일 스레드를 사용하기 때문에 이 코드는 Node.js와 같은 방식으로 작동한다. 즉 피보나치 계산을 하는 동안 "Hello world!" 실행을 차단. 깔끔하다.  그러나 우리의 코드는 특정 문제에 대해 다소 하드 코딩되어 있는데 다음 섹션에서는 이벤트 루프 코드를 좀 더 강력하고 프로그램하기 쉽도록 일반화하는 방법을 살펴볼 것이다. 먼저 콜백을 사용하고 그 다음엔  coroutines를 사용해서~ 겁먹지 마시라~

(역주:  하지만 앞으로의 얘기는 python 에 대한 약간의 지식이 필요 할 것이다.) 

이벤트 루프와 Callbacks

이전 섹션의 이벤트 루프를 자연스럽게 일반화하면 일반 이벤트 핸들러를 사용해서 구현 할 수 있게 된다. 즉  콜백을 사용하여 상대적으로 쉽게 달성 할 수 있는데  각각의 이벤트 유형 (지금 경우에는 표준 입력과 타이머 입력 두 개) 에 대해 사용자가 임의의 함수를 이벤트 핸들러로 추가 할 수 있을 것이다. 코드는 꽤 간단하여 바로 코드를 봐도 되지만 하나가 조금 까다로운데 타이머 이벤트를 처리하기 위해 bisect.insort를 사용하는 부분이다. 이 알고리즘은 타이머 이벤트 목록을 정렬 된 상태로 유지하고 타이머를 가장 먼저 실행한다. 이러한 방법으로 이벤트 루프를 반복 할 때마다 타이머가 있는지 확인하고 타이머가 있는 경우 처음부터 시작하여 만료된 타이머를 모두 실행하는 식이다. bisect.insort를 이용하여 항목을 목록의 올바른 색인에 삽입함으로써 이 작업을보다 쉽게 수행할 수 있다. 이것에 대한 여러 가지 다른 접근 방식이 있지만 이것은 그냥 내가 선택한 방식이다.

(역주:  다시 언급하지만 이벤트 루프는 그냥 while 문 도는거라고 생각하면 된다. 동일하다. while 문 내부에서 어떤 조건을 체크해서 a 에 해당하는 이벤트가 발생하면 a_func 함수, 즉 콜백을 호출해준다는 간단한 이야기이다.
동시성 패턴에 자주 사용되는 Actor 라는 것도 사실 비슷하다. 자신의 쓰레드 안에서 while 문 도는 것일 뿐) 

from bisect import insort
from collections import namedtuple
from fib import timed_fib
from time import time
import selectors
import sys


Timer = namedtuple('Timer', ['timestamp', 'handler'])


class EventLoop(object):
    """
    Implements a callback based single-threaded event loop as a simple
    demonstration.
    """
    def __init__(self, *tasks):
        self._running = False
        self._stdin_handlers = []
        self._timers = []
        self._selector = selectors.DefaultSelector()
        self._selector.register(sys.stdin, selectors.EVENT_READ)

    def run_forever(self):
        self._running = True
        while self._running:
            # First check for available IO input
            for key, mask in self._selector.select(0):
                line = key.fileobj.readline().strip()
                for callback in self._stdin_handlers:
                    callback(line)

            # Handle timer events
            while self._timers and self._timers[0].timestamp < time():
                handler = self._timers[0].handler
                del self._timers[0]
                handler()

    def add_stdin_handler(self, callback):
        self._stdin_handlers.append(callback)

    def add_timer(self, wait_time, callback):
        timer = Timer(timestamp=time() + wait_time, handler=callback)
        insort(self._timers, timer)

    def stop(self):
        self._running = False


def main():
    loop = EventLoop()

    def on_stdin_input(line):
        if line == 'exit':
            loop.stop()
            return
        n = int(line)
        print("fib({}) = {}".format(n, timed_fib(n)))

    def print_hello():
        print("{} - Hello world!".format(int(time())))
        loop.add_timer(3, print_hello)

    def f(x):
        def g():
            print(x)
        return g

    loop.add_stdin_handler(on_stdin_input)
    loop.add_timer(0, print_hello)
    loop.run_forever()


if __name__ == '__main__':
    main()

(역주:  bisect 는 이름이 말하듯이 이진트리로 정렬을 해 놓는 컬렉션이다. 삽입하면 바로 정렬해 준다는 것이다.  insort(정렬될 리스트, 값)  이런식으로 활용한다.) 

매우 단순한 방식의 이 코드는 실제로는 Node.js 에서 가장 일반적으로 사용되는 방법이다. 그러나 좀 더 복잡한 응용 프로그램에서는 비동기 코드를 작성하는 이러한 스타일이 특히 오류 처리가 추가되면 매우 빠르게 콜백 지옥으로 알려진 모양으로 악화 될 수 있다. Guido van Rossum의 콜백 인용 :

콜백으로 이루어진 복잡한 코드를 읽는 것은 미친짓이다. 슈퍼 울트라 교육을 받은 인간만이 가능할거다. 내 말을 못믿겠거든 널려 있는 자바 스크립트 코드를 보라 - 귀도 반 로섬


promises 와  코루틴 (각 대안에 대해 헤아릴 수 없을 만큼의 NPM 라이브러리) 과 같은 여러 가지 대체 접근법이 있다. 내가 가장 선호하는 것은 coroutines를 사용하는 것이며  다음 섹션에서는 coroutines를 사용해서 유사한 이벤트 루프를 구현하는 방법에 대해 설명 할 예정이다.


이벤트 루프와 코루틴


(역주:  이 게시물에서 가장 복잡한 코드가 될 것이다. 이유는 기반 라이브러리를 설계하는 모양새의 코드가 될 것이기 때문이다. 말하자면 이런 코드를 언어제작자들이 내놓는것이고 우리는 그것을 편하게 사용할 것이다. node 와 python 에서의 비동기 개발을 위한  라이브러리적 인사이드가 이런 느낌으로 구현된다고 생각하면 된다.) 

코루틴 - 한글블로그에도 휼륭한 글이 많으니 참고 합시다  -> 참고 

코루틴은 상태 (로컬 변수의 값 및 다음 명령의 의미)를 기억하면서 "return" 할 수있는 함수이다. 이렇게 하면 코 루틴을 다시 반복 호출 할 수있게 되어 결과가 중단된 부분부터 계속됩니다. 이러한 형태의 "return"은 특별히 yielding 이라고 한다. 즉 자신이 잠깐 다른 일을 해야 하니깐 제어를 양보한다는 의미이다. 값을 받는데 사용하는 yield 키워드는 함수 내부의 "=" 오른쪽에 있는 표현식으로 사용 할 수도 있다.  send() 메서드를 사용하여 값을 코루틴 함수로 다시 전달할 수 있다. 이때가 양보가 끝마치는 시점이다. 즉 블럭이 해소되는 시점. 


"코루틴은 주거니 받거니 하는 함수이다. 먼가 쓰레드 처럼 살아숨쉬는 듯한 함수

좀 헥깔릴 수 있는데. 함수와 제네레이터 , 코루틴 모두 def 를 통해 만든다. 이것에 대해 모두 다르게 표시해야 한다고 주장하는 사람들이 있으며, 파이썬의 창조자 귀도는 이에 반대하여 def 로 통일되어 있다고 한다. 아무튼 중요한것은 코루틴은 반복이 목적이 아니라. 코루틴은 외부와의 상호 작용이다.

def repeater():
while True:
received = yield
print('Echo:', received)

rp = repeater()

next(rp) # 코루틴 시작
rp.send('Hello')
rp.send('World')

#Echo: First
#Echo: Second

(비동기에서 코루틴은 요긴하게 사용되는데 잠시 생상을 해보세요. 어떻게 사용 될까요? 네 어떤 비동기 작업 (주로 i/o 많을듯) 에 대한 결과를 받아서 전달하는 매개함수 역할을 합니다. 매우 중요합니다. )


Python에서는 yield 키워드를 사용하여 코루틴을 만들 수 있다.  yield value 와 같은 간단한 명령문으로 사용되면 주어진 값이 내보내지고 호출자에게 제어가 되돌려진다.  yield 문 다음 명령에서 시작하여 coroutine을 계속하려면 호출자는 내장 된 next 함수를 사용하면 된다. y = yield x와 같은 표현식으로 사용될 때는  x 값이 yielded 되고 , coroutine을 계속하려면 coroutine의 send 메소드를 사용할 수 있다.이 경우 send에 지정된 값이 coroutine으로 다시 전송된다.(이 예에서 y에 할당 됨)

이것은 우리가 코루틴으로 비동기 코드를 쓸 수 있고 (역주: 이벤트 핸들링을 위해) 비동기 작업을 기다릴 필요가 있을 때 간단히 yield 할 수 있음을 의미한다. 이렇게 하기 위해서,  단순히 우리가 계속해야 할 가치가 있는 작업이나 다른 코루틴을 yield 하게 된다. 그러면 코드는 매우 순차적이며 동기 코드와 유사하게 보이게 될 것이다. 우리 코드의 피보나치 부분이 코루틴이 된다면  어떻게 생겼는지에 대한 간단한 예가 있다 :
(역주: 글로는 더 헥깔릴 수 있으니 코드를 반복적으로 돌려 보면서 익히자) 

def read_input():
    while True:
        line = yield sys.stdin
        n = int(line)
        print("fib({}) = {}".format(n, timed_fib(n)))

물론 이것이 작동하려면 코루틴을 처리 할 수있는 이벤트 루프가 필요하다. 이를 달성하기 위해 우리는 이벤트 루프에 의해 실행될 태스크 큐를 유지할 것인데 입력 이벤트가 발생했을 때와 타이머가 꺼지면 (또는 더 일반적으로 우리가 신경 쓰이는 다른 이벤트), 우리는 계속해야 할 코루틴 목록을 가지고있을 것이다.  각 작업에는 체인에서 실행할 코루틴 스택을 추적하는 바운드 스택 변수가 있는데  각 변수는 다음 종료마다 다를 것이다. 이것은 PEP 342에서 제공하는 "Trampoline"예제를 기반으로 하는데   JavaScript에서 Function.prototype.bind와 동일한 Python 방식인 functools.partial을 사용한다. 즉, 매개 변수 값을 바인딩하여 함수를 Curry 한다.

다음은 전체 코드이다.

from bisect import insort
from collections import deque
from collections import namedtuple
from fib import timed_fib
from functools import partial
from time import time
import selectors
import sys
import types


Timer = namedtuple('Timer', ['timestamp', 'handler'])


class sleep_for_seconds(object):
    """
    Yield an object of this type from a coroutine to have it "sleep" for the
    given number of seconds.
    """
    def __init__(self, wait_time):
        self._wait_time = wait_time


class EventLoop(object):
    """
    Implements a simplified coroutine-based event loop as a demonstration.
    Very similar to the "Trampoline" example in PEP 342, with exception
    handling taken out for simplicity, and selectors added to handle file IO
    """
    def __init__(self, *tasks):
        self._running = False
        self._selector = selectors.DefaultSelector()

        # Queue of functions scheduled to run
        self._tasks = deque(tasks)

        # (coroutine, stack) pair of tasks waiting for input from stdin
        self._tasks_waiting_on_stdin = []

        # List of (time_to_run, task) pairs, in sorted order
        self._timers = []

        # Register for polling stdin for input to read
        self._selector.register(sys.stdin, selectors.EVENT_READ)

    def resume_task(self, coroutine, value=None, stack=()):
        result = coroutine.send(value)
        if isinstance(result, types.GeneratorType):
            self.schedule(result, None, (coroutine, stack))
        elif isinstance(result, sleep_for_seconds):
            self.schedule(coroutine, None, stack, time() + result._wait_time)
        elif result is sys.stdin:
            self._tasks_waiting_on_stdin.append((coroutine, stack))
        elif stack:
            self.schedule(stack[0], result, stack[1])

    def schedule(self, coroutine, value=None, stack=(), when=None):
        """
        Schedule a coroutine task to be run, with value to be sent to it, and
        stack containing the coroutines that are waiting for the value yielded
        by this coroutine.
        """
        # Bind the parameters to a function to be scheduled as a function with
        # no parameters.
        task = partial(self.resume_task, coroutine, value, stack)
        if when:
            insort(self._timers, Timer(timestamp=when, handler=task))
        else:
            self._tasks.append(task)

    def stop(self):
        self._running = False

    def do_on_next_tick(self, func, *args, **kwargs):
        self._tasks.appendleft(partial(func, *args, **kwargs))

    def run_forever(self):
        self._running = True
        while self._running:
            # First check for available IO input
            for key, mask in self._selector.select(0):
                line = key.fileobj.readline().strip()
                for task, stack in self._tasks_waiting_on_stdin:
                    self.schedule(task, line, stack)
                self._tasks_waiting_on_stdin.clear()

            # Next, run the next task
            if self._tasks:
                task = self._tasks.popleft()
                task()

            # Finally run time scheduled tasks
            while self._timers and self._timers[0].timestamp < time():
                task = self._timers[0].handler
                del self._timers[0]
                task()

        self._running = False


def print_every(message, interval):
    """
    Coroutine task to repeatedly print the message at the given interval
    (in seconds)
    """
    while True:
        print("{} - {}".format(int(time()), message))
        yield sleep_for_seconds(interval)


def read_input(loop):
    """
    Coroutine task to repeatedly read new lines of input from stdin, treat
    the input as a number n, and calculate and display fib(n).
    """
    while True:
        line = yield sys.stdin
        if line == 'exit':
            loop.do_on_next_tick(loop.stop)
            continue
        n = int(line)
        print("fib({}) = {}".format(n, timed_fib(n)))


def main():
    loop = EventLoop()
    hello_task = print_every('Hello world!', 3)
    fib_task = read_input(loop)
    loop.schedule(hello_task)
    loop.schedule(fib_task)
    loop.run_forever()


if __name__ == '__main__':
    main()

(역주:  parital 이란 어떤 함수가 있을 때 그 함수가 가진 매개변수들을 미리 채워 넣어서 만들어진 부분함수라고 보면된다. 그리고 이 코드를 번역된 혹은 원본을 읽고 이해하려고 하지말자. 글로 이해하기는 매우 복잡하다. 코드를 반복해서 읽고 이해하자. 그게 더 쉽다. 전체적으로 간략히 설명하면 스케쥴 함수에 내가 해야할 일을 넣어주고 있으며, run_forever 함수가 while 을 돌면서 스케쥴 함수를 통해 들어온 일을 처리하는 모양새이다. 주의 깊게 볼 부분은 resume_task 로 , 코루틴에 값을 전달해서 실행시키기도 하고, 값을 리턴(yield) 받아서 다시 스케쥴함수에 넣어주기도 한다. 즉 계속 task 를 재생산하고 있다는 점

이 구현은 Node.js에서 process.nextTick 이 하는 일을 어느 정도 수행하는 간단한 do_on_next_tick 함수를 추가하게 한다. 기능을 종료하기 위해 간단한 타입 exit 를 구현하는 데 사용한다. loop.stop ()을 직접 호출 할 수도 있었으나 do_on_next_tick을 사용하였다.또 다른 흥미로운 점은 재귀 호출 대신 coroutines를 사용하여 재귀 피보나치 알고리즘을 다시 구현할 수 있다는 점인데 그렇게 하면 hello를 포함하여 다른 coroutine과 "parallel" 하게 실행할 수 있게 된다. 

위의 코드를 라이브러리 처럼 활용한 다음 코드를 살펴보자. 

from event_loop_coroutine import EventLoop
from event_loop_coroutine import print_every
import sys


def fib(n):
    if n <= 1:
        yield n
    else:
        a = yield fib(n - 1)
        b = yield fib(n - 2)
        yield a + b


def read_input(loop):
    while True:
        line = yield sys.stdin
        n = int(line)
        fib_n = yield fib(n)
        print("fib({}) = {}".format(n, fib_n))


def main():
    loop = EventLoop()
    hello_task = print_every('Hello world!', 3)
    fib_task = read_input(loop)
    loop.schedule(hello_task)
    loop.schedule(fib_task)
    loop.run_forever()


if __name__ == '__main__':
    main()

결과: (중간에 블럭되지 않았다.)

$ python3.4 fib_coroutine.py
1412727829 - Hello world!
1412727832 - Hello world!
28
1412727835 - Hello world!
1412727838 - Hello world!
fib(28) = 317811
1412727841 - Hello world!
1412727844 - Hello world!



바퀴를 재발명 할 필요없다.

(역주:  이제 우리가 구현한 이벤트 루프말고, 파이썬에서 구현한 이벤트 루프를 사용해보자!!) 

앞의 두 섹션에서 우리는 콜백이나 코루틴을 사용하여 비동기 코드를 작성할 수 있도록 이벤트 루프를 구현하는 일반적인 아이디어를 검토했다. 이것은 아이디어에 대해 실험하고 배우기 위한 목적 이었지만 실제로는 이미 이벤트 루프를 제공하는 Python용 라이브러리가 이미 성숙해 있는데 Python 3.4에는 IO,네트워킹 작업을 하기 위한 코루틴과 이벤트 루프을 가진 asyncio 모듈이 제공된다.

우선 asyncio를 사용하여 위의 문제를 해결하는 모습을  살펴 보자.

import asyncio
import sys
from time import time
from fib import timed_fib


def process_input():
    text = sys.stdin.readline()
    n = int(text.strip())
    print('fib({}) = {}'.format(n, timed_fib(n)))


@asyncio.coroutine
def print_hello():
    while True:
        print("{} - Hello world!".format(int(time())))
        yield from asyncio.sleep(3)


def main():
    loop = asyncio.get_event_loop()
    loop.add_reader(sys.stdin, process_input)
    loop.run_until_complete(print_hello())


if __name__ == '__main__':
    main()

@ asyncio.coroutine이 coroutine을 장식하는 데 사용되는 방식에 주목하고 yield 와 반대되는 즉 다른 coroutine으로 부터의 값을 사용하는 yield from 을 눈여겨보자.

yield -> 주는 
yield from -> 받는  (받기 위해 기다림)

예외 핸들링

파이썬 코루틴은 코루틴의 스택 프레임에 예외가 던져 지도록 허용하고, 코루틴이 일시 중지 한 지점에서 예외를 포착하도록 한다. 간단한 예를 살펴 보자.

def coroutine(): print("Starting") try: yield "Let's pause until continued." print("Continuing") except Exception as e: yield "Got an exception: " + str(e) def main(): c = coroutine() next(c) # 처음 yield 까지 실행 # Now throw an exception at the point where the coroutine has paused value = c.throw(Exception("Have an exceptional day!")) print(value) if __name__ == '__main__': main()

결과:

Starting
Got an exception: Have an exceptional day!

이벤트 루프가 예외를 적절하게 포착하고 전파하는 경우, 동기 및 비동기 코드 모두에서 예외를 사용하여 오류를 처리하는 통일 된 방법을 얻는 것은 꽤 간단하다.

체인화 된 coroutine과 이벤트 루프를 사용한 예를 살펴 보자.

import asyncio @asyncio.coroutine def A(): raise Exception("Something went wrong in A!") @asyncio.coroutine def B(): a = yield from A() yield a + 1 @asyncio.coroutine def C(): try: b = yield from B() print(b) except Exception as e: print("C got exception:", e) def main(): loop = asyncio.get_event_loop() loop.run_until_complete(C()) if __name__ == '__main__': main()

결과:

C got exception: Something went wrong in A!

이 예에서, 코루틴 C는 B의 결과에 의존하며, 쭈욱 거슬러 올라가 보면 예외를 던지기로 결정하는 A의 결과에 최종 의존한다. 보시다시피, 예외는 C 에게 전달되어 메시지를 캐치하고 메시지를 출력 하게 된다. 반갑게도 동기식 코드와 거의 똑같이 작동한다. 즉 수동으로 콜백을 통해 오류를 포착하고 전달할 필요가 없게 되는 것이다!!

이 예제가 너무 간단해서 만족스럽지 않은 분들을 위한 실제 예를 살펴 보았다. ipify를 사용하여 컴퓨터의 외부 IP 주소를 비동기적으로 잡아내는 코드를 작성해 보려한다. asyncio는 HTTP 클라이언트와 함께 제공되지 않기 때문에 (아직 어쨌든!) TCP 수준으로 가서 HTTP 요청을 작성하고 응답을 직접 구문 분석하려 한다. 실무적으로는 aiohttp과 같은 라이브러리를 사용하는 것은 당연히 더 좋은 생각일 것이다. 
이제 코드를 살펴보자.

import asyncio import json host = 'api.ipify.org' request_headers = {'User-Agent': 'python/3.4', 'Host': host, 'Accept': 'application/json', 'Accept-Charset': 'UTF-8'} @asyncio.coroutine def write_headers(writer): for key, value in request_headers.items(): writer.write((key + ': ' + value + '\r\n').encode()) writer.write(b'\r\n') yield from writer.drain() @asyncio.coroutine def read_headers(reader): response_headers = {} while True: line_bytes = yield from reader.readline() line = line_bytes.decode().strip() if not line: break key, value = line.split(':', 1) response_headers[key.strip()] = value.strip() return response_headers @asyncio.coroutine def get_my_ip_address(verbose): reader, writer = yield from asyncio.open_connection(host, 80) writer.write(b'GET /?format=json HTTP/1.1\r\n') yield from write_headers(writer) status_line = yield from reader.readline() status_line = status_line.decode().strip() http_version, status_code, status = status_line.split(' ') if verbose: print('Got status {} {}'.format(status_code, status)) response_headers = yield from read_headers(reader) if verbose: print('Response headers:') for key, value in response_headers.items(): print(key + ': ' + value) # Assume the content length is sent by the server, which is the case # with ipify content_length = int(response_headers['Content-Length']) response_body_bytes = yield from reader.read(content_length) response_body = response_body_bytes.decode() response_object = json.loads(response_body) writer.close() return response_object['ip'] @asyncio.coroutine def print_my_ip_address(verbose): try: ip_address = yield from get_my_ip_address(verbose) print("My IP address is:") print(ip_address) except Exception as e: print("Error: ", e) def main(): loop = asyncio.get_event_loop() try: loop.run_until_complete(print_my_ip_address(verbose=True)) finally: loop.close() if __name__ == '__main__': main()

다시 말하지만 동기 코드와의 유사점 (콜백 없음, 복잡한 오류 처리 없음, 간단하고 읽기 쉬운 코드)을 확인하라.  오류없이 어떻게 잘 작동지에 대해 살펴보자.

$ python3.4 ipify.py
Got status 200 OK
Response headers:
Content-Length: 21
Server: Cowboy
Connection: keep-alive
Via: 1.1 vegur
Content-Type: application/json
Date: Fri, 10 Oct 2014 03:46:31 GMT
My IP address is:
<my IP address here, hidden for privacy!>

물론 인터넷에 연결되어 있지 않은 경우 다음과 같이 출력 될 것이다.

$ python3.4 ipify.py
Error:  [Errno 8] nodename nor servname provided, or not known

필자의 견해로는 비동기 코드에 coroutines를 사용하면 얻을 수있는 주요 강력한 장점 중 하나으로 생각한다. 오류 처리가 동기 코드와 완벽하게 일치하는 것 아름다운 모습을 보라. 예를 들어 위의 경우 체인화 된 coroutines 중 하나가 실패하거나 동기 호출 중 하나가 실패하면 예외는 잡히고 똑같은 방식으로 처리될 것이다.


여러개의 독립 코루틴들의 결과 다루기 

위의 예제에서는 본질적으로 순차적인 비동기 코드를 작성했다. 즉, 코루틴의 각 명령문은 계속 진행하기 전에 완료 한 이전 명령문에 의존한다. 때로는 일련의 독립적인 작업을 실행하고 실행 순서를 신경 쓰지 않고 완료된 상태로 사용하기를 원한다. 예를 들어 웹 크롤러처럼 웹 페이지의 모든 링크에 비동기 요청을 보내고 결과를 받아서 처리 할 대기열에 응답을 추가하자.

코루틴은 매우 순차적으로 흐르는 비동기 코드를 작성할 수 있지만, 독립적인 작업을 실행하고 결과를 한꺼번에 처리하거나 콜백을 수행 할 때 처음에는 콜백이 더 나은 것처럼 보일 수 있다. 그러나 Python 3.4의 asyncio에는 정확하게 asyncio.as_completed 및 asyncio.gather라는 두 가지 시나리오에 대한 내장 함수가 있는데 3 개의 URL을로드해야하는 간단한 예제를 통해 살펴 보겠다. 먼저 asyncio.as_completed를 사용하여 결과를 처리하고 두 번째 방법에서는 asyncio.gather를 사용하여 로드를 완료 한 후 두 가지 방법으로 결과를 처리하려 한다. 실제로 URL을 로드하는 대신에 임의의 초 동안 일시 중지되는 간단한 동시 루틴을 선택했다. 코드는 다음과 같다.

import asyncio import random @asyncio.coroutine def get_url(url): wait_time = random.randint(1, 4) yield from asyncio.sleep(wait_time) print('Done: URL {} took {}s to get!'.format(url, wait_time)) return url, wait_time @asyncio.coroutine def process_as_results_come_in(): coroutines = [get_url(url) for url in ['URL1', 'URL2', 'URL3']] for coroutine in asyncio.as_completed(coroutines): url, wait_time = yield from coroutine print('Coroutine for {} is done'.format(url)) @asyncio.coroutine def process_once_everything_ready(): coroutines = [get_url(url) for url in ['URL1', 'URL2', 'URL3']] results = yield from asyncio.gather(*coroutines) print(results) def main(): loop = asyncio.get_event_loop() print("First, process results as they come in:") loop.run_until_complete(process_as_results_come_in()) print("\nNow, process results once they are all ready:") loop.run_until_complete(process_once_everything_ready()) if __name__ == '__main__': main()

결과

$ python3.4 gather.py
First, process results as they come in:
Done: URL URL2 took 2s to get!
Coroutine for URL2 is done
Done: URL URL3 took 3s to get!
Coroutine for URL3 is done
Done: URL URL1 took 4s to get!
Coroutine for URL1 is done

Now, process results once they are all ready:
Done: URL URL1 took 1s to get!
Done: URL URL2 took 3s to get!
Done: URL URL3 took 4s to get!
[('URL1', 1), ('URL2', 3), ('URL3', 4)]


더 알아보기

아직 다루지 않은 것들이 많이있다. Futures and libuv 등과 파이썬 3.4에서 비동기 IO에 대한 Guido의 강연( Guido's talk on asynchronous IO in Python 3.4도 있다. 심각한 오역이나 오류에 대해서 알려 주시면 반영하겠다. 


(역주: 여기까지 읽고 이해가 안가신다면 두어번 더 읽어보시길 바랍니다. 그래도 이해가 안간다면 WINDOWS SELECT, Java nio 의 selector, node.js 를 먼저 공부해 보셨으면 하구요.내용도 많을 것이고 좀 더 근본에 대해서 공부한다면 상위층에 대한 이해도 빨라질 것이기 때문입니다.)


번역 : http://sahandsaba.com/understanding-asyncio-node-js-python-3-4.html




파이썬과 동시성


*본 글은 대략 동시성 프로그래밍에 대해서 알고 있는데 파이썬은 시작 단계이며 어떤것들이 있는지 빠르게 훑어보고 싶은 분들을 위해 눈높이가 맞추어져 있음을 알려드립니다. 기본 쓰레드에서부터 시작해서 전반적으로 살펴볼 것 이나 버전별 차이는 다루지 않고 있습니다. 해당 키워드에 대한 버전별 구분 및 세부적인 튜토리얼은 구글링을 통해 찾아보시길 바랍니다. (참고 링크는 추가해두었습니다)


리액티브 및 동시성이라는 화두가 파도치고 있는 요즘 파이썬도 뒤떨어 질 수는 없겠지요? 데이터분석이나 쉘스크립팅의 대안정도로만 생각했던 분들도 계실것이지만 파이썬도 다양한 기술들을 제공하며 서버개발의 메인 언어로서 그 대열에 함께 하고 있습니다. 개인적으로는 동시성에 대한 추상층 api 지원에 있어서 파이썬은 스칼라,Go 정도까지는 아니더라도 자바, C++, 자바스크립트 정도와는 대등한 만큼의 언어/라이브러리적 지원 및 발전을 하고 있는 것으로 느껴집니다. 본격적으로 시작하기 전에 동시성,병렬성,분산등에 대한 정의부터 내리고 시작하겠습니다.


동시성이 무엇인지 병렬성이 무엇인지는 딱히 규정된것은 아니며 사람마다 조금씩 다르게 규정하기도 합니다.
여기서는 저의 정의(혹은 제가 참고한) 라는 점을 알려드립니다. 



동시성

이것은 구성의 문제입니다. 즉 여러가지의 일들이 어떤식으로 전개되어 있느냐, 어떻게 구조화 할 것이냐 에 관한 내용입니다. 즉 a 라는 일과 , b 라는 일이 같이 어울어져 발생하며 어떻게 그 일에 대한 순서를 블럭킹/시퀀셜하지 않게 만드느냐에 대한 문제입니다. 현실적으로는 가장 많이 사용되는 것은  I/O 가 일어나는 곳이구요.

위의 그림은 스칼라의 Future 설명을 하기 위한 포스팅에서 가져온 그림인데요.  
메인쓰레드가 흘러가는 와중에 다른 일들이 그림 상단에서 벌어지고 있으며, 일들이 완료 되는 순간 다시 합쳐지고 그러한 일들이 구성되어 있습니다. 원격웹서버를 통해 i/o 처리도 하고 있구요.


이러한 구성은 보통 로우레벨에서 멀티플렉싱/ 비동기입출력 / 쓰레드 / 프로세스등을 사용해서 처리합니다. 우리가 알 필요가 없는 부분입니다. 근데 말입니다. 아래 살펴 볼 병렬성도 쓰레드와 프로세스등을 사용하는데요 즉 쓰레드와 프로세스를 사용하는것은 어느쪽에서든 도구로 이용된다는 뜻입니다. 그 자체로 구분의 지표가 될 수 없다는 말. 


각 언어/라이브러리마다 이러한 동시성에 대한 구성을 단순화 시켜주기 위한 Future, Promise, async, goroutine, Observable, Iteratee , deferred , Akka 등 다양한 기술들이 제공되고 있습니다.


그냥 얘기하나 해볼께요. (동기와 비동기에 대한 이야기입니다)

우체국이 있습니다.
저는 소포를 보내려는 손님이죠.
손님들 100명이 우체국에 일렬로 줄을 서있고, 한명씩 처리하는것이 -> 싱글스레드 / 동기 처리입니다.
손님들 100명이 우체국의 100명의 직원에게 각각 처리하는 것이 -> 멀티쓰레드 / 동기 처리입니다.

여기까진 명쾌하죠. 
그럼 손님들 100명이 우체국에서 번호표를 받아서 각각 자기 할일 하다가, 우체국에서 스마트폰으로 니 차례야 하고 알려주면 그 때 우체국에 순간이동(할 수 있다고 합시다)해서 소포를 보낼 수 있을 만큼 보내는것은??

네 이것은 싱글쓰레드 / 동기 처리입니다. 동기? 왜 동기 일까요? 사실 이것을 비동기로 봐도 되긴 합니다.
왜냐면 손님은 일단 기다리지 않고 (블럭되지 않고) 자기 할일을 할 수 있기 때문에, 비동기로 봐도 됩니다.
다만 하마님아~~~ 님 차례 됬다. 라고 알려주면 하마는 자기 소포를 보내게 되는데! 바로 이 순간은 동기의 순간입니다. 즉 내가 너무 많은 소포를 보내면 , 우체국은 다른 사람들에게 니 차례야 라고 말해줄 수 없는거에요. 병목이 생긴다는 의미입니다. (이게 Windows 에서 SELECT 이자, Java NIO 이며, Node.js 가 왜 비지니스로직이 길면 문제가 생기는지에 대한 대답입니다. 또한 node 나 자바,파이선의 selector 가 비동기 방식이라고 말하고 있는데 생각하기 나름입니다. 하이레벨에서의 구분이냐? 로우레벨에서의 구분이냐에 따라서 달리 볼수 있기 때문에~) 

하지만 
손님들 100명이 우체국 뒷마당에다가 소포를 던져두고, 이벤트 알림표를 받고 집에가서 자기 할일을 합니다.
그러다가 우체국에서 알림이 오겠죠. "니 소포 다 보냈어" , 다른 손님에게도 알림이 갑니다. "니 소포도 다 보냈어"  자~~~ 이렇게 되면 모두 병목에서 해방됩니다. 대신 우체국(OS) 가 더 많은 일을 하게 되겠죠.
이게 바로 진정한(?) 비동기 입니다. Windows 의 IOCP 입니다. 

간단하게 차이를 정리하면
- 일을 할 수 있는지 알려주는 방식 (react)
- 일의 완료를 알려 주는 방식(proact)

입니다. 

초기 각 언어의 라이브러리들은 반동기식(?)의 i/o를 지원해줬었고, 이제 몇몇 라이브러리들은 Proactor 패턴식의 비동기 I/O 입출력도 지원해 주기 시작했습니다. 파이썬도 EpollSelector / KqueueSelector / SelectSelector 등 지원함.



병렬성

이것은 계산의 문제입니다. 동일한 시간에 (동일하다는 기준은 조금씩 달라짐) 여러 계산들이 동시에 이루어 진다는 느낌입니다. 예를들어  1부터~1억을 더하는데 , 하나의 쓰레드에서 모두 더 할 수도 있지만, 10개의 쓰레드에서 양을 나누어서 계산한 후 최종적으로 더할 수도 있을 것입니다. 그때 10개의 쓰레드는 병렬성을 가지고 있는 것입니다. (병렬이 어느 수준으로 이루어지느냐는 별개) 

방법으로
1.일반 쓰레드를 사용하는 방식
2.일반 프로세스를 사용하는 방식
3.여러 컴퓨터에 분산해서 사용하는 방식 
4. GPGPU 를 통한 방식 (그래픽카드의 수만개 이상의 쓰레딩모듈) 
5. CPU 를 통한 방식 (SIMD 명령방식 : 단일명령으로 다중데이터처리) 

AVX,SSE,CUDA, openMP,AMP,PPL,openCL,TBB 같은 언어 및 벤더별 혹은 표준라이브러리들이 많이 있습니다. 

GPU vs CPU 에 대한 논쟁도 있습니다.


대규모 병렬화에 적합한 흥미로운 예를 하나 들어 보겠습니다.

CUDA 를 통한 Volumne Rendering 에서 가져왔는데요.



이러한 입체영상을 가시화 하기 위해서는 사람이 보는 각도에 따라서  모든 픽셀에 대한 재계산이 필요합니다. 하나 하나의 픽셀을 얻기 위해서 어떠한 수학(직선상의 매칭되는 모든 값들을 가지고 입체적 느낌을 살리기 위한 특정 중간값 계산)이 필요한데 계산량이 좀 됩니다. 이때 픽셀이 수천만개라면 마우스를 가지고 이리 저리 돌려보는 순간 일일이 하나씩 계산하고 있을 순 없자나요? 수만개 이상의 쓰레드가 동시에 계산합니다. 계산하는데 다른 픽셀의 값이 필요하지 않기 때문에 완전병렬이라고도 합니다. 물론 파이썬으로도 구현됩니다.



파이썬에서의 동시성


멀티 쓰레드


이제 파이썬(CPython 기준)에서의 동시성에 대해 알아보도록 하죠.  먼저 쓰레드를 살펴봅시다. 우리 모두가 알고 있는 그 쓰레드 입니다. 다만 파이썬에서 특이사항은 파이썬은 GIL이라는것이 존재합니다. 이것은 한 순간에 하나의 쓰레드만 작동하도록 만드는 것인데요. 따라서 CPU,쓰레드가 여러개 일 지라도 CPU를 하나 만 사용하는것과 마찬가지입니다. 쓰레드를 해당 순간에 하나만 사용하기 때문에 락은 필요 없을거 같다고 얼핏 생각할 수 도 있으나, Lock, Event 객체들이 있으며 베타적 제어를 해주지 않으면 파이썬일 지라도 문제가 생기는 것은 마찬가지입니다.


CPU 를 하나만 사용하기 때문에, 동시에 병렬적으로 계산하는 업무에는 맞지 않습니다만 , I/O 기반의 업무라면 동시성을 통해서 충분히 고효율을 보장 받을 수 있습니다. 자 기억합시다!! 파이썬에서 쓰레드활용은 I/O , 비동기에서만 활용하자, 병렬 계산을 위해서라면 프로세스,GPU,분산을 활용하자!!


아주 간단한 예제를 보시죠.

# python 2.7 에서 테스트
import
threading
from time import sleep

def myThread(name,nsec):
print ("---- do somthing ----")
sleep(nsec)

if __name__ == '__main__' :
t = threading.Thread(target=myThread, args=("Thread-1", 3))
t.start()
t.join()
print ("---- exit ----")

threading 모듈을 이용해서 쓰레드를 myThread 함수를 통해 실행시키고 있습니다. 인자로 이름하고 몇초 동안 일을 할 것인지 알려주고 있구요.


제 경험상 대부분의 쓰레드 태스크는 i/o 를 동반하고 있습니다. 즉 위의 코드에서 myThread 에서는 보통 웹이나 TCP 소켓등을 통해 remote에 접속해서 무엇인가 가져 온다든지 하는 작업이 주를 이루고요. 그때 myThread 에서 생성 혹은 가져 온 데이터는 메인쓰레드에게 그 데이터를 전달해 줍니다. 그때 queue 를 활용하는데요.
네 표준 queue 는 내부적으로 베타제어를 하고 있기 때문에 쓰레드에 안전합니다.


queue 를 활용하는 다음 예를 보시죠.

#coding=utf-8
# python 2.7 에서 테스트
import threading
import queue
from time import sleep

def myThread(name,q):
i = 0
while True:
sleep(1)
q.put(i)
i +=1

if __name__ == '__main__' :

BUF_SIZE = 10
q = queue.Queue(BUF_SIZE)

t = threading.Thread(target=myThread, args=("Thread-1", q))
t.start()

while True:
num = q.get()
print (str(num) + " 이 생성되었습니다")


print ("---- exit ----")

queue 모듈을 임포트하여 사용했습니다. myThread 에서 큐에 값을 put 하고 메인쓰레드에서 get 해서 사용하네요. 매우 간단히 생성자-소비자 패턴이 해결 되었습니다.



멀티 프로세스

위에 언급했다시피 파이썬에서 병렬적으로 계산하는 상황에서의 멀티쓰레드는 오히려 더 성능이 안좋아 집니다.
이때 멀티 프로세스를 활용하여 해결 할 수 있는데요. (이 이유로 다른 언어보다 프로세스를 적극 활용합니다.)자 겁내지 마세요. 멀티쓰레드와 아주 흡사하게 만들 수 있습니다.

import multiprocessing
from time import sleep

def myProcess(name,nsec):
print ("---- do somthing ----")
sleep(nsec)

if __name__ == '__main__' :
t = multiprocessing.Process(target=myProcess, args=("Process-1", 3))
t.start()
t.join()
print ("---- exit ----")

똑같죠? 파이썬의 힘입니다. 너무 간단합니다. threading.Thread 를 multiprocessing.Process 로 바꾸었을 뿐입니다.그럼 둘간의 통신은 어떻게 하냐구요? 네 마찬가지로 Queue 를 통해서 할 수 있습니다. 다만 주의 하실 점은
import Queue 모듈이 아니라 from multiprocessing import Queue 모듈을 사용해야 한다는 점! 잊지마세요.




eXECUTORS 와 퓨처(fUTURE)

이제 좀 추상층을 높여 봅시다. 현재 각종 언어들 마다 쓰레드를 직접적으로 사용자가 만들어서 사용하는 것을 지양하고 있으며, 추상층을 쌓아올려서 보다 쉽게 하지만 좀 더 적극적으로 사용 할 수 있게 끔 유도하고 있는데요. 그것은 쓰레드를 직접 사용하는데에서 오는 어려움이기 때문일 것입니다. 자바의 경우 5버전부터 더그리(Doug Lea) 에 의해 강력한 동시성 라이브러리들이 추가 되기 시작했는데요. 많이들 사용하시는 Executor 과 같은 것이 파이썬에도 있습니다.

Executors / ThreadPoolExecutor / ProcessPoolExecutor

Executors 를 상속받은 2개의 구현체입니다. 이름에서 나타나듯이 하나는 쓰레드풀이고 하나는 프로세스 풀입니다. 둘은 거의 동일한 구문으로 사용되기 때문에 둘 중에 하나(프로세스풀)만 살펴보죠.

from concurrent.futures import ProcessPoolExecutor
from time import sleep


def return_after_5_secs(message):
sleep(5)
return message

if __name__ == '__main__' :
pool = ProcessPoolExecutor(3)

future = pool.submit(return_after_5_secs, ("hello"))
print(future.done())
sleep(5)
print(future.done())
print("Result: " + future.result())

역시 코드가 모든것을 잘 설명해주고 있습니다. 파이썬은 정말 위대합니다.ㅎㅎ
쓰레드 3개를 운용하는 풀을 만들어주고, 하나의 일(Task) 를 제출한 후에 바로 future 를 리턴 받습니다.

리턴 받은 future 에 진짜 값이 들어 올 때까지 대기하다가 실제 값이 있을 경우 (코드에서는 future.done() 이 True일 경우) result 함수를 호출해서 가져옵니다.  동일한 작업을 ThreadPoolExecutor 를 통해서도 쓰레드레벨로 가능합니다.


* 퓨처이야기 



비동기와 Asyncio

지금까지 것들은 주로 여러 쓰레드를 활용해서 처리하는 내용들이 었습니다. 이제 살펴 볼 것은 하나의 쓰레드를 가지고 어떻게 효율적으로 CPU 를 다룰 수 있는지에 관한 이야기입니다.(무조건 하나의 쓰레드만 사용한다 라는 것은 아닙니다)  Node.js 에 대해서 알고 있는 분이라면 쉽게 생각하실 수 있을거 같네요. 네! 비동기적으로 코드를 다루는 방법을 말하려 합니다.


파이썬은 자신은 GIL 에 의해 한번에 하나의 쓰레드 위주로 작동하지만, 로우레벨로 내려가면 GIL 을 무시하고 자체적인 I/O 실행환경을 활용하게 됩니다. 하나의 쓰레드가 I/O 작업은 OS에게 맞겨 두고 자신의 일을 하다가, OS 가 어떤 이벤트를 알려오면 ( 나 일 끝났어요~~ 등등) 그때 제어권을 살짝 바꾸어서 그 이벤트에 해당하는 일을 하게 하는 것입니다. 이렇게 되면 I/O 작업이 끝나기만을 무작정 기다리는 지고지순하지만 매우 답답한 파이썬에서 벗어 날 수 있지요. 물론 이러한 처리는 여러개의 쓰레드를 만들어서 (멀티쓰레드) 로 할 수도 있겠지만, 쓰레드를 무작정 늘리는것도 바람직하지 않다는것은 이제 다 알지 않습니까? 그렇습니다. 대세는 비동기입니다.


파이썬에서의 비동기를 알아 보기 위해서 제네레이터와 코루틴에 대해서 먼저 알아봐야 합니다. 생소한 분도 많겠지만 잠시 살펴보지요.



제네레이터

제네레이터는 말그래도 값을 생성하는 함수입니다. 알다시피 함수는 값을 반환한 다음 자신의 스코프를 소멸시키고 , 다시 함수를 호출하면, 처음부터 다시 시작됩니다. 즉 한 번 실행됩니다. 그러나 제네레이터 함수는 값을 생성하고 함수의 실행을 일시 중지 할 수 있습니다. 컨트롤이 호출 스코프로 반환되며 ,원하는 경우 실행을 다시 시작하여 다른 값 (있는 경우)을 얻을 수 있습니다. 이 예제를 보시죠.

def simple_gen():
yield "Hello"
yield "World"


gen = simple_gen()
print(next(gen))
print(next(gen))

제네레이터 함수는 어떤 값도 직접 반환하지 않고, 호출되면 반복자와 같은 제네레이터 객체를 건네줍니다. 그 후 제네레이터 객체에 대해 next ()를 호출하여 값을 반복 할 수 있습니다. 또는 for 루프를 실행하여 처리합니다.


요약 : 제네레이터 함수는 하나의 값을 반환하는 대신 실행을 일시 중지하고 여러 값을 생성 할 수있는 함수입니다.


코루틴

이제 제네레이터를 사용하여 함수 컨텍스트에서 데이터를 가져올 수 있고 실행을 일시 중지 할 수 있음을 알게 되었습니다. 근데 제네레이터에 데이터를 푸시하고 싶다면 어떻게 해야 할까요? 즉 제네레이터 자신이 계속 데이터를 만드는 것이 아니라, 외부에서 제공되는 데이터를 소비하는 역할을 하고 싶습니다. 이때 코루틴이 등장 할 때입니다. 값을 받는데 사용하는 yield 키워드는 함수 내부의 "=" 오른쪽에 있는 표현식으로 사용 할 수도 있습니다. 제네레이터 객체에 대해 send() 메서드를 사용하여 값을 함수로 다시 전달할 수 있는데요. 이를 "제네레이터 기반 코루틴" 이라고 합니다. 아래는 그 예입니다.


"코루틴은 주거니 받거니 하는 함수이다. 먼가 살아숨쉬는 듯한 녀석

좀 헥깔릴 수 있는데요. 함수와 제네레이터 , 코루틴 모두 def 를 통해 만듭니다. 이것에 대해 모두 다르게 표시해야 한다고 주장하는 사람들이 있으며, 파이썬의 창조자 귀도는 이에 반대하여 def 로 통일되어 있다고 합니다. 중요한것은 코루틴은 반복이 목적이 아닙니다. 반복이 목적인 것은 분명히 제네레이터의 역할입니다. 코루틴은 외부와의 상호 작용입니다.

def coro():
hello = yield "Hello"
yield hello


c = coro()
print(next(c))
print(c.send("World"))

제네레이터 예제에서는 제테레이터 함수가 모든 값을 생성했지만, 여기서는 보시다시피 코루틴 함수로 값을 넣어주기도 합니다. send 를 이용해서 말이죠. 


집중!!! 코루틴 함수는 값을 생성하는게 아니라~ 값을 어디선가 받는 역할이 중요한 함수입니다. 여기서는 제가 직접 send 로 값을 주었습니다만, 나중에 설명 할 비동기에서 코루틴은 요긴하게 사용되는데 잠시 생상을 해보세요. 어떻게 사용 될까요? 네 어떤 비동기 작업 (주로 i/o 많을듯) 에 대한 결과를 받아서 전달하는 매개함수 역할을 합니다. 매우 중요합니다.  



Async I/O

Python 3.4부터 일반적인 비동기 프로그래밍을 위한 멋진 API를 제공하는 새로운 asyncio 모듈이 생겼습니다. 이제  asyncio 모듈과 함께 coroutines를 사용하여 비동기 작업을 쉽게 수행 할 수 있게 됬습니다. 다음은 공식 문서의 예입니다  다시 말씀드리지만 전체 시스템이 블러킹이 안되게 하는 방법으로 첫째, 멀티쓰레드를 통해 하나만 블럭되게 한다 2. 비동기 방식을 사용한다.  이렇게 2개로 크게 볼 수 있다고 말했죠? asycnio 는 비동기 방식에 대한 이야기 입니다.

import asyncio
import datetime
import random


@asyncio.coroutine
def display_date(num, loop):
end_time = loop.time() + 50.0
while True:
print("Loop: {} Time: {}".format(num, datetime.datetime.now()))
if (loop.time() + 1.0) >= end_time:
break
yield from asyncio.sleep(random.randint(0, 5))


loop = asyncio.get_event_loop()

asyncio.ensure_future(display_date(1, loop))
asyncio.ensure_future(display_date(2, loop))

loop.run_forever()

결과:

Loop: 1 Time: 2017-05-01 16:15:20.801473
Loop: 2 Time: 2017-05-01 16:15:20.801473
Loop: 1 Time: 2017-05-01 16:15:21.803184
Loop: 2 Time: 2017-05-01 16:15:23.803627
Loop: 1 Time: 2017-05-01 16:15:25.804033
...


위의 코드 역시 스스로를 잘 설명해 주고 있습니다.  (저도 파이썬에서 이벤트루프가 어떻게 동작하는지에 대한 깊은 이해가 부족합니다. 먼저 WINAPI 의 이벤트루프를 떠올려 봅니다. 멀티플렉싱I/O 도 떠올려보고 자바의 NIO 도 떠올려보고 node.js 도 떠올려보고...다음 링크는 파이선과 node.js 에서의 비동기에 대한 매우 괜찮은 글입니다. 아마 node.js 비동기에 대한 글이 훨씬 더 많기 때문에 이해를 돕기 위해 node.js 에 대한 글을 찾아서 읽는것도 도움이 될 듯하네요. http://sahandsaba.com/understanding-asyncio-node-js-python-3-4.html)  

코드를 살펴보면 이 함수(display_date) 는 주어진 시간(초) 후에 완료되는 코루틴 (coroutine)입니다. 식별자 수(num)와 이벤트 루프(loop)를 매개변수로 받아 현재 시간을 계속 출력하는 coroutine 인 display_date (num, loop) 을 만듭니다. 코루틴이기 때문에 외부로 부터 값을 받아드리는 성질이 있다는 것은 예상 할 수 있겠지요?  즉 다음 asyncio.sleep () 함수 호출의 결과를 기다리기 위해 키워드yield from 를 사용했습니다. 그래서 우리는 그것에 임의의 초를 보내고 asyncio.ensure_future()를 사용하여 기본 이벤트 루프에서 코루틴의 실행을 스케쥴합니다. 그런 다음 루프가 계속 실행되도록 요청 합니다.

출력을 보면 두 개의 coroutine이 동시에 실행되는데요. yield from 를 사용할 때, 이벤트 루프는 코루틴의 실행을 일시 중지하고 다른 루틴을 실행합니다. 따라서 두 개의 코루틴이 동시에 실행됩니다 (그러나 잊지 말아야 할 것은 이벤트 루프가 단일 스레드이기 때문에 병렬로 실행되지 않습니다).

 yield from 는for x in asyncio.sleep(random.randint(0, 5)): yield x 에 대한 멋진 syntactic sugar 입니다. 그것은 비동기 코드를 좀 더 간략히 만들어 주죠. 


아래 링크는 asyncio 에 대한 훌륭하고 간략한 튜토리얼 글입니다.

https://hackernoon.com/asyncio-for-the-working-python-developer-5c468e6e2e8e


아래 링크는 제가 번역한 결과물입니다. 정말 휼륭한 글이며, 번역하느라 고생 마이~했습니다.

파이썬 asyncio 를 이해 하기 위한 여정 



 ASYNC/AWAIT

이것은 위에 살펴본 asyncio 와 동일합니다. 다만 키워드가 좀 바뀌었을 뿐이에요.. 겁먹지 마세요.
위의 코드와 똑같죠? 네 async 와 awit 키워드만 바뀌었습니다. 코드에 대한 설명으로 더 적절해 보입니다.

import asyncio
import datetime
import random

async def display_date(num, loop, ): # <----- 요기
end_time = loop.time() + 50.0
while True:
print("Loop: {} Time: {}".format(num, datetime.datetime.now()))
if (loop.time() + 1.0) >= end_time:
break
await asyncio.sleep(random.randint(0, 5)) # <----- 요기


loop = asyncio.get_event_loop()

asyncio.ensure_future(display_date(1, loop))
asyncio.ensure_future(display_date(2, loop))

loop.run_forever()

아래 링크는 async/await 에 대한 훌륭하고 간략한 튜토리얼 글입니다.

https://snarky.ca/how-the-heck-does-async-await-work-in-python-3-5/



gevant

asyncio 와 개념적으로 동일한 역할을 하는 비동기 라이브러리입니다.


- gevent 사용하는 방식에 대한 글이며
gevent for the Working Python Developer,

- gevent 와 asyncio 에 대한 비교글입니다.

http://youngrok.com/asyncio%EC%97%90%EC%84%9C%20gevent%EB%A1%9C

http://www.getoffmalawn.com/blog/playing-with-asyncio



celery

셀러리는 분산환경에서 동시성을 갖기 위해 제작된 라이브러리 입니다. 웹환경에서 예를 들면 클라이언트가 웹서버에 어떤 요청을 할 때 웹서버레벨에서 모든것을 다 처리하기엔 부담이 크거나, 외부의 모듈과 협업해야 할 때, 즉 비동기적으로 요청에 대한 부하를 외부에 전가시키고,완료됬다는 이벤트가 발생 했을 경우 사용자에게 응답을 날리는 구조에서 사용됩니다.

요즘 스칼라의 Play / Akka Http 나 자바의 RxJava 처럼 리액티브 스타일의 웹개발이 앞으로 많이들 활용 될 거라 생각해 볼때, 파이썬의 경우 대표적인 웹툴인 장고는 전통적인 멀티쓰레드 기반으로 알고 있는데 파이썬은 어떻게 Reactive 파도에 올라탈 것인지, 어떤 파이썬 리엑티브 대표주자가 떠오를지 궁금하긴 합니다.


* Python reactive programming 책이 2017년 여름에 발매될 예정.

* 아래 링크는 파이썬 리엑티브 함수형 프로그래밍에 관한 글

https://jakubturek.com/functional-reactive-programming-in-python/



numpy

이것은 동시성 하고는 조금 다른 얘기지만 한마디 언급하겠습니다. Numpy 는 파이썬에서 데이터에 대한 계산(벡터,행렬등) 을 다룰 때 주로 사용합니다. 파이썬 list 등을 그대로 사용해서 계산할 수 도 있지만, Numpy 를 활용하면, 네이티브C 수준의 속도를 얻을 수 있습니다. 산술 계산에서 특히 중요한 역할을 하는 메모리의 지역성과 CPU가 지원하는 벡터화된 연산의 이점도 얻게 됩니다. 수십배 빨라진다고 보면 됩니다.



PYCUDA

글 서두에 병렬성에 대한 얘기를 하다가 GPGPU 를 통한 방식을 이용해 엄청난 계산을 빠르게 수행하는 예를 들어보았습니다. (쉐이더 볼륨랜더링) 네 파이썬도 CUDA를 사용해서 대규모 병렬 계산에 활용할 수 있습니다.

PyCUDA 홈페이지 바로가기



tensorflow (텐서플로우)

대세는 머신러닝 아니겠습니까? 대규모 병렬데이터 처리를 텐서플로우로 한다면 CUDA 를 통한 기본적인 병렬도 가능하며 텐서플로우 자체적으로 제공하는 다양한 머신러닝 API 도 익혀서 나중에 딥러닝이 필요할때 빠르게 적용 할 수 있는 장점이 있지 않을까요





먼가 굉장히 서두르면서 글이 끝마쳐지는 느낌이네요. 뭐 방대한 내용을 하나의 페이지에 모두 담긴 힘들다는 점..제 내공이 많이 부족하다는 점...이해해주시구요. 이상 글을 마칩니다.




레퍼런스:

- 고성능 파이썬
- 전문가를 위한 파이썬
- 프로그래머가 몰랐던 멀티코어 CPU 이야기
- CUDA volume rendering example 
- https://hackernoon.com/asyncio-for-the-working-python-developer-5c468e6e2e8e
- http://www.getoffmalawn.com/blog/playing-with-asyncio
- https://snarky.ca/how-the-heck-does-async-await-work-in-python-3-5/
https://www.blog.pythonlibrary.org/2016/08/02/python-201-a-multiprocessing-tutorial/
http://sahandsaba.com/understanding-asyncio-node-js-python-3-4.html

연재순서

1. threading

2. Condition & Semaphore

3. Queue

4. multiprocessing

5. 비동기 (gevent) 

6. 분산 (celery)

7. GPGPU (PyCUDA)
8. concurrent.future

9. 코루틴,asyncio,async/awit



제네레이터,코루틴,네이티브 코루틴과
 ASYNC/AWAIT [번역]




참고 : 이 게시물은 주로 Python 3.4 에서 소개 된 기능에 대해 설명하며, 네이티브 코루틴과 async / await 구문은 Python 3.5에서 제공됩니다. 따라서 파이썬 3.5 이상을 사용하여 코드를 시험해 보는 것이 좋을 것 입니다.



제네레이터


제네레이터는 말그래도 값을 생성하는 함수입니다. 알다시피 함수는 값을 반환한 다음 자신의 스코프를 소멸시키고 , 다시 함수를 호출하면, 처음부터 다시 시작됩니다. 즉 한 번 실행됩니다. 그러나 제네레이터 함수는 값을 생성하고 함수의 실행을 일시 중지 할 수 있습니다. 컨트롤이 호출 스코프로 반환되며 ,원하는 경우 실행을 다시 시작하여 다른 값 (있는 경우)을 얻을 수 있습니다. 이 예제를 보시죠.

제네레이터 함수는 어떤 값도 직접 반환하지 않고, 호출되면 반복자와 같은 제네레이터 객체를 건네줍니다. 그 후 제네레이터 객체에 대해 next ()를 호출하여 값을 반복 할 수 있습니다. 또는 for 루프를 실행하여 처리합니다.


그렇다면 제네레이터는 어떨때  유용할까요? 독자분의 상사가 최대 100 개의 숫자 시퀀스를 생성하는 함수를 작성하도록 요청했다고 가정 해 봅시다 ( range() 의 단순한 간략화 된 버전). 빈 리스트를 가져 와서 번호를 계속 추가 한 다음 번호가 있는 리스트을 반환하는 방식을 택했습니다. 그러나 요구 사항이 변경되고 최대 1억개의 숫자가 생성 되어야 하는것으로 변경 한다고 할 때. 이 번호를 리스트에 저장하면 곧 메모리가 부족해질 것입니다. 그런 상황에서 제네레이터가 활약하는데요. 즉 이 번호는 리스트에 모두 저장하지 않고 생성 할 수 있습니다. 다음과 같이 해봅시다.


굳이 메모리에 적재해두지 않고서도 원하는 만큼 번호를 출력 할 수 있으며, 위에서는  번호 9 이상을 출력하지 않습니다만, 함수 컨텍스트를 다시 시작하면 다시 시작됩니다.


요약 : 제네레이터 함수는 하나의 값을 반환하는 대신 실행을 일시 중지하고 여러 값을 생성 할 수있는 함수입니다. 또한 호출되면 iterable처럼 작동하는 제네레이터 객체를 제공하며 반복적으로 값을 얻을 수 있습니다.


코루틴

이제 제네레이터를 사용하여 함수 컨텍스트에서 데이터를 가져올 수 있고 실행을 일시 중지 할 수 있음을 알게 되었습니다. 근데 제네레이터에 데이터를 푸시하고 싶다면 어떻게 해야 할까요? 즉 제네레이터 자신이 계속 데이터를 만드는 것이 아니라, 외부에서 제공되는 데이터를 소비하는 역할을 하고 싶습니다. 이때 코루틴이 등장 할 때입니다. 값을 받는데 사용하는 yield 키워드는 함수 내부의 "=" 오른쪽에 있는 표현식으로 사용 할 수도 있습니다. 제네레이터 객체에 대해 send() 메서드를 사용하여 값을 함수로 다시 전달할 수 있는데요. 이를 "제네레이터 기반 코루틴" 이라고 합니다. 아래는 그 예입니다.

(역주: 함수와 제네레이터 , 코루틴 모두 def 를 통해 만듭니다. 이것에 대해 모두 다르게 표시해야 한다고 주장하는 사람들이 있으며, 파이썬의 창조자 귀도는 이에 반대하여 def 로 통일되어 있습니다. 그리고 코루틴은 반복이 목적이 아닙니다. 반복이 목적인 것은 분명히 제네레이터의 역할입니다. 코루틴은 외부와의 상호 작용입니다.

소스를 살펴보시면 먼저 next () 함수를 사용하여 평소와 같이 값을 가져옵니다. 이것은 "Hello"를 얻습니다. 그런 다음 send () 메소드를 사용하여 값을 제네레이터에 보냅니다. 함수를 다시 시작하고 우리가 hello에 보낸 값을 할당하고 다음 줄로 이동하여 명령문을 실행합니다. 결국 우리는 send () 메소드의 반환 값으로 "World"를 얻습니다. 먼가 한바퀴 돈 느낌이네요. 


우리가 제네레이터 기반 코루틴을 사용할 때 "제네레이터"와 "코루틴"이라는 용어는 일반적으로 같은 의미입니다. 그것들은 정확히 똑같은 것은 아니지만, 많은 경우에 종종 혼용됩니다. 그러나 Python 3.5에서는 네이티브 코루틴과 함께 async/await키워드가 주로 언급됩니다. 이 게시물의 후반부에서 논의 될 것 입니다.


파이썬 코루틴에 대한 개념은 요기 참고 -> Haerakai's Lab(파이썬의 코루틴)

Async I/O and the asyncio module

Python 3.4부터 일반적인 비동기 프로그래밍을 위한 멋진 API를 제공하는 새로운 asyncio 모듈이 생겼습니다. 우리는 asyncio 모듈과 함께 coroutines를 사용하여 async io를 쉽게 수행 할 수 있게 됬는데요. 다음은 공식 문서의 예입니다.

(
역주: 전체 시스템이 블러킹이 안되게 하는 방법으로 첫째, 멀티쓰레드를 통해 하나만 블럭되게 한다 2. 비동기 방식을 사용한다.  이렇게 2개로 크게 볼 수 있습니다. 네 맞습니다. asycnio 는 비동기 방식에 대한 이야기 입니다. 자바스크립트가 그러하듯이~

위의 코드 역시 스스로를 잘 설명해 주고 있습니다. 이 함수(display_date) 는 주어진 시간(초) 후에 완료되는 코루틴 (coroutine)입니다. 식별자 수(num)와 이벤트 루프(loop)를 매개변수로 받아 현재 시간을 계속 출력하는 coroutine 인 display_date (num, loop) 을 만듭니다. 코루틴이기 때문에 외부로 부터 값을 받아드리는 성질이 있다는 것은 예상 할 수 있겠지요?  즉 다음 asyncio.sleep () 함수 호출의 결과를 기다리기 위해 키워드yield from 를 사용했습니다. 그래서 우리는 그것에 임의의 초를 보내고 asyncio.ensure_future()를 사용하여 기본 이벤트 루프에서 코루틴의 실행을 스케쥴합니다. 그런 다음 루프가 계속 실행되도록 요청 합니다.

출력을 보면 두 개의 coroutine이 동시에 실행되는데요. yield from 를 사용할 때, 이벤트 루프는 코루틴의 실행을 일시 중지하고 다른 루틴을 실행합니다. 따라서 두 개의 코루틴이 동시에 실행됩니다 (그러나 잊지 말아야 할 것은 이벤트 루프가 단일 스레드이기 때문에 병렬로 실행되지 않습니다).

 yield from 는for x in asyncio.sleep(random.randint(0, 5)): yield x 에 대한 멋진 syntactic sugar 입니다. 그것은 비동기 코드를 좀 더 간략히 만들어 주죠. 

Native Coroutines and async/await

파이썬 3.5에서 우리는 async / await 구문을 사용하는 새로운 네이티브 coroutine 을 사용합니다. 이전 함수는 다음과 같이 작성 할 수 있습니다.

강조 표시된 선(노랑)을 살펴보십시오. def 키워드 앞에 async 키워드를 사용하여 네이티브 코루틴을 정의하고 있으며 네이티브 코루틴에서는 yield 대신 await 키워드를 사용합니다.



Native vs Generator Based Coroutines: Interoperability

구문의 차이점을 제외하고는 네이티브 와 제네레이터 기반 코루틴간에 기능상의 차이점은 없습니다. 구문을 혼용하는 것은 허용되지 않습니다. 그래서 제네레이터 기반 코루틴에서  await   혹은 네이티브 코루틴 내부에서 yield/yield from을 사용할 수 없습니다.


이런 차이점에도 불구하고 이들 간의 상호 운영이 가능합니다. 우리는 오래된 제네레이터 기반의 것들에
@ types.coroutine 데코레이터를 추가하기만 하면 되는데요. 그런 다음 다른 유형의 내부에서 사용할 수 있습니다. 네이티브 코루틴 (coroutine) 내부의 제네레이터 기반 코루틴을 통해
await  할 수 있으며, 제네레이터 기반 코루틴안에서 네이티브 코루틴으로 부터 yield from  할 수 있습니다. 다음은 예입니다.








번역:

 http://masnun.com/2015/11/13/python-generators-coroutines-native-coroutines-and-async-await.html



연재 순서 


1. threading

2. Condition & Semaphore

3. Queue

4. multiprocessing

5. 비동기 (gevent) 

6. 분산 (celery)

7. GPGPU (PyCUDA)

8. 코루틴,asyncio,async/awit

9. concurrent.future



 CONCURRENT.FUTURES 모듈 퀵 가이드
 [번역]

concurrent.futures 모듈은 비동기 작업을 시작하기 위한 높은 수준의 API를 제공하는 표준 라이브러리의 일부입니다. 이 모듈의 일반적인 사용법에 대한 코드 샘플을 살펴 보겠습니다.



Executors

이 모듈은 추상클래스인 Executor 클래스를 제공하며, 직접 사용할 수는 없으며 대신 ThreadPoolExecutor와 ProcessPoolExecutor라는 매우 유용한 두 개의 서브 클래스를 사용 할 수 있습니다. 이름에서 알 수 있듯이 하나는 멀티 스레딩을 사용하고 다른 스레드는 멀티프로세스를 사용합니다. 두 경우 모두 스레드 또는 프로세스 풀을 가져 와서 이 풀에 작업을 제출하는 방식으로 작업을 합니다. 


ThreadPoolExecutor

코드를 보시죠

먼저 스레드 3개를 가진 ThreadPoolExecutor를 만듭니다. 그런 다음 풀에 작업을 제출하여  5초 후에 첫 번째 인수로 전달 된 메시지를 다시 반환 받게 됩니다. 키 포인트는 태스크를 제출하고 나서 future를 되돌려 받는 코드에 있는데요. Doc에서 볼 수 있듯이 Future 객체에는 태스크가 해결되었는지, 즉 해당 future 객체에 대한 값이 설정되었는지 알려주는 done () 메서드가 있습니다. 태스크가 완료되면 (값을 리턴하거나 예외로 인터럽트 된 경우), 스레드 풀 실행 프로그램은 값을 future 오브젝트에 설정합니다.


위의 예에서 작업은 5 초가 경과 할 때까지 완료되지 않으므로 done()을 처음 호출하면 False가 반환됩니다. 그리고 잠시 대기한 후에 result () 메소드를 호출하여 future의 결과를 얻을 수 있습니다.


Future 객체를 잘 이해하고 그것의 메소드를 아는 것은 Python에서 비동기 프로그래밍을 이해하고 수행하는 데 아주 중요합니다. 그래서 문서를 꼼꼼히 읽어보시는게 좋을 겁니다.


ProcessPoolExecutor

이것은 이전에 설명한 ThreadPoolExecutor 와 매우 유사한 API를 가지고 있습니다. 이제 이전 예제를 수정해서 ProcessPool을 사용해 봅시다.

 완벽하게 작동합니다! 물론 CPU 집약적인 작업을 위해 ProcessPoolExecutor를 사용하는게 좋습니다. ThreadPoolExecutor는 네트워크 작업 또는 I / O에 더 적합합니다. (역주: 파이썬의 GIL 때문에) 


API는 비슷하지만 ProcessPoolExecutor는 다중 처리 모듈을 사용하며 Global Interpreter Lock의 영향을 받지 않습니다. 그러나 picklable이 아닌 객체는 사용할 수 없습니다. 따라서 우리는 process pool executor에 전달 된 callable 내부에서 무엇을 사용하고 리턴하는지 대해서 신중하게 선택할 필요가 있습니다.



Executor.map()

두 executors 는 일반적인 방법인 map ()을 사용합니다. 내장 함수와 마찬가지로 map 메서드는 제공된 함수에 대한 여러 호출을 허용하여 iterable의 각 항목을 해당 함수에 전달합니다. 이 경우를 제외하고는 함수가 동시에 호출됩니다. 다중 처리의 경우,이 반복 가능은 청크로 분리되고 이러한 청크는 각각 별도의 프로세스에서 함수로 전달됩니다. chunk_size 매개 변수를 전달하여 청크 크기를 제어 할 수 있습니다. 기본적으로 청크 크기는 1입니다.


다음은 공식 문서의 ThreadPoolExample입니다.

그리고 ProcessPoolExecutor 예제 :


as_completed() & wait()

concurrent.futures 모듈은 executor에 의해 반환 된 future를 다루는 두 개의 함수를 가지고 있습니다. 하나는 as_completed ()이고 다른 하나는 wait ()입니다.

as_completed() 함수는 future 객체를 반복하여 가져오고 future가 resolving을 시작되면 즉시 값을 yielding 하기 시작합니다. 앞서 언급 한 map 메소드와 as_completed의 주요 차이점은 map이 iterable을 전달한 순서대로 결과를 리턴한다는 점이다. 첫 번째 항목에 대한 결과는 map메서드의 첫 번째 결과입니다. 반면, as_completed 함수의 첫 번째 결과는 가장 먼저 완료된 future의 결과입니다.

예제를 보시죠.



wait () 함수는 두 세트를 포함하는 명명된 튜플을 반환합니다. 하나의 세트에는 완료된 futures (결과 또는 예외가 있음)과 완료되지 않은 것을 포함하는 다른 세트가 포함됩니다.

여기에 예제가 있습니다.


우리는 그것이 반환되어야 할 때를 정의함으로써 wait 함수의 행동을 제어 할 수 있습니다. FIRST_COMPLETED, FIRST_EXCEPTION 및 ALL_COMPLETED 함수의 return_when 매개 변수에 이러한 값 중 하나를 전달할 수 있습니다. 기본적으로 ALL_COMPLETED로 설정되므로 모든 future가 완료 될 때 만 wait 함수가 반환됩니다. 그러나 이 매개 변수를 사용하여 첫 번째 future가 완료되거나 첫 번째 예외가 발생할 때 반환하도록 선택할 수 있습니다.




번역:

http://masnun.com/2016/03/29/python-a-quick-introduction-to-the-concurrent-futures-module.html


연재 순서 

1. threading
2. Condition & Semaphore
3. Queue
4. multiprocessing
5. 비동기 (gevent) 
6. 분산 (celery)
7. GPGPU (PyCUDA)
8. 코루틴,asyncio,async/awit
9. concurrent.future


 PyCUDA


설치 ( 우분투 14.04 에 PyCUDA 설치)


1.  CUDA 가 제대로 인스톨 되있는지 확인 합니다.

나는 우분투 14.04에 CUDA 7.5 로 설치하였다. 

우분투에 CUDA 설치 관련은 여기 참고 


2. gcc4.8.4

Ubuntu 14.04의 경우 기본 gcc 버전은 4.8이기 때문에 아래와 같이 따로 업그레이드등을 할 필요가 없다.

  • As of the CUDA 4.1 release, gcc 4.5 is now supported. gcc 4.6 and 4.7 are unsupported.
  • As of the CUDA 5.0 release, gcc 4.6 is now supported. gcc 4.7 are unsupported.
  • As of the CUDA 6.0 release, gcc 4.7 is now supported.
  • As of the CUDA 7.0 release, gcc 4.8 is fully supported, with 4.9 support on Ubuntu 14.04 
  • As of the CUDA 7.5 release, gcc 4.8 is fully supported, with 4.9 support on Ubuntu 14.04
  • As of the CUDA 8 release, gcc 5.3 is fully supported on Ubuntu 16.06 



3. Boost C++ 라이브러리 설치 

$ sudo apt-get install libboost-all-dev

Boost.Python은 2009 년부터 Python 2.x와 3.x를 모두 지원합니다.

개인적으로 위에 명령어로 하면 문제가 생겨서  직접설치 

wget -O boost_1_55_0.tar.gz http://sourceforge.net/projects/boost/files/boost/1.55.0/boost_1_55_0.tar.gz/download
tar xzvf boost_1_55_0.tar.gz
cd boost_1_55_0/

Boost's bootstrap setup:

./bootstrap.sh --prefix=/usr/local

Then build it with:

./b2

and eventually install it:

sudo ./b2 install 



4. numpy 인스톨 

$ sudo apt-get install python-numpy -y 또는 $ sudo pip install numpy



5. PyCUDA 다운로드,인스톨 

필요라이브러리들 인스톨 

$ sudo apt-get install build-essential python-dev python-setuptools libboost-python-dev libboost-thread-dev -y

Download PyCUDA 다운로드하고 언팩.  (2016.1.2 버전) 

$ tar xzvf pycuda-2016.1.2.tar.gz

PyCUDA 설정 및 make 

$ cd pycuda-2016.1.2

파이썬 2.x의 경우

$ ./configure.py --cuda-root=/usr/local/cuda-7.5 --cudadrv-lib-dir=/usr/lib/x86_64-linux-gnu --boost-inc-dir=/usr/include --boost-lib-dir=/usr/lib --boost-python-libname=boost_python --boost-thread-libname=boost_thread --no-use-shipped-boost 

$ make -j 4 $ sudo python setup.py install $ sudo pip install .


확인:

1. PyCUDA 0.93 이상을 설치하지 않으려면 --boost-thread-libname = boost_thread-mt를 제거하십시오.

2. cuda가 /usr/local/ cuda-7.5에 설치되어 있지 않으면 --cuda-root = 를  수정하십시오.

3. -lboost_python-mt 또는 -lboost_thread-mt를 찾을 수 없다는 오류가 발생하지만 해당 파일이 / usr / lib 또는 어디에 있든 알 수 있다면 해당 파일에 대한 심볼릭 링크가 있는지 확인하십시오. 예를 들어, Ubuntu 9.10에서 libboost_python-mt-py26에 libboost_python-mt 링크를 만들어야 할 수도 있습니다 (또는 --boost-python-libname = boost_python-mt-py26 :

$ sudo ln -s / usr / lib / libboost_python-mt-py26 / usr / lib / libboost_python-mt

4. 'nvidia-current'패키지 (예 : 우분투 10.04 Lucid Lynx)를 사용하는 우분투 릴리스에서 다음 링크를 추가해야합니다.

$ ln -sf /usr/lib/nvidia-current/libcuda.so /usr/lib/libcuda.so

5. 우분투 릴리스에서 'cuda3.1'패키지 (우분투 10.04 Lucid Lynx)를 사용하면 다음 링크를 추가해야합니다.

$ sudo ln -s /usr/lib/libboost_python-mt-py26.so /usr/lib/libboost_python-mt.so

6.실행 중일 때 : sudo python setup.py install 다음과 같은 오류가 발생하면 : *** CUDA_ROOT가 설정되지 않았고 경로에 nvcc가 없습니다. 포기해라., sudo는 다른 PATH 값을 가질 수도있다. 다음을 시도해 보라. sudo env PATH = $ PATH python setup.py install

7. cuda 라이브러리 디렉토리 '$ {CUDA_ROOT} / lib', '$ {CUDA_ROOT} / lib64'를 LD_LIBRARY_PATH 변수에 포함하는 것을 잊지 마십시오.



6. PyCUDA 시작 

import 가 잘 되는지 확인하라.

import pycuda.driver as cuda
import pycuda.autoinit
from pycuda.compiler import SourceModule

pycuda.autoinit 를 사용할 필요가 없다. 초기화, 컨텍스트 생성 및 리소스 정리는 원하는 경우 수동으로 수행 할 수도 있다.

Data 전달 

다음 단계로는 데이터를 장치로 전송하는 것이다. PyCuda에서는  주로 호스트의 numpy 배열에서 데이터를 전송한다. (하지만 실제로 파이썬 버퍼 인터페이스를 만족시키는 모든것은 str이 될 것이다.) 4x4 난수 배열을 만들어 보자.

import numpy
a = numpy.random.randn(4,4)

a는 배정도 숫자로 구성되어 있지만 대부분의 nVidia 장치는 단정밀도 만 지원합니다.

a = a.astype(numpy.float32)

마지막으로 데이터를 전송할 위치가 필요하므로 장치에 메모리를 할당해야합니다.

a_gpu = cuda.mem_alloc(a.nbytes)

마지막 단계로 데이터를 GPU로 전송해야합니다.

cuda.memcpy_htod(a_gpu, a)

Kernel  실행

이 튜토리얼에서는 간단한 것을 가지고 놀아보자: a_gpu의 각 항목을 두 배로하는 코드를 작성하는데 이를 위해 우리는 상응하는 CUDA C 코드를 작성한다

pycuda.compiler.SourceModule:

mod = SourceModule("""
  __global__ void doublify(float *a)
  {
    int idx = threadIdx.x + threadIdx.y*4;
    a[idx] *= 2;
  }
  """)

오류가 없으면 코드가 컴파일되어 장치에 로드 된다. pycuda.driver.Function에 대한 참조를 찾고 이를 인수로 a_gpu를 지정하고 블록 크기를 4x4로 지정한다.

func = mod.get_function("doublify")
func(a_gpu, block=(4,4,1))

마지막으로 GPU에서 데이터를 다시 가져 와서 원본과 함께 표시합니다.

a_doubled = numpy.empty_like(a)
cuda.memcpy_dtoh(a_doubled, a_gpu)
print a_doubled
print a

결과는 아래와 같다

[[ 0.51360393  1.40589952  2.25009012  3.02563429]
 [-0.75841576 -1.18757617  2.72269917  3.12156057]
 [ 0.28826082 -2.92448163  1.21624792  2.86353827]
 [ 1.57651746  0.63500965  2.21570683 -0.44537592]]
[[ 0.25680196  0.70294976  1.12504506  1.51281714]
 [-0.37920788 -0.59378809  1.36134958  1.56078029]
 [ 0.14413041 -1.46224082  0.60812396  1.43176913]
 [ 0.78825873  0.31750482  1.10785341 -0.22268796]]


7. 마지막 

이제 본격적으로 하기 위해서는 공식 문서를 참고하면 될 거 같다. PyCUDA 공식문서 
CUDA 에 대한 공부는 예제로 배우는CUDA 프로그래밍을 권장한다.





레퍼런스

https://wiki.tiker.net/PyCuda/Installation/Linux/Ubuntu 
https://documen.tician.de/pycuda/tutorial.html

연재 순서 

1. threading
2. Condition & Semaphore
3. Queue
4. multiprocessing
5. 비동기 (gevent) 
6. 분산 (celery)
7. GPGPU (PyCUDA)
8. 코루틴,asyncio,async/awit
9. concurrent.future



6.celery

http://celery.readthedocs.io/en/latest/getting-started/first-steps-with-celery.html

https://spoqa.github.io/2012/05/29/distribute-task-with-celery.html

연재 순서 

1. threading
2. Condition & Semaphore
3. Queue
4. multiprocessing
5. 비동기 (gevent) 
6. 분산 (celery)
7. GPGPU (PyCUDA)
8. 코루틴,asyncio,async/awiat
9. concurrent.future


5. gevent 

Welcome to cooperative scheduling

최신 IO 시스템 (asyncio) 과 마찬가지로, gevent는 스케줄링 개념으로 작동합니다. 이것은 이전과 같은 몽키패치 덕분에 코드에서 거의 숨겨질 수있는 방식으로 promise 와 같은 시스템 (비동기 요청이 완료된 후에 수행해야 할 일들을 요구)으로 수행 할 수 있습니다. 스케줄러는 greenlet 컨텍스트를 신속하고 빈번하게 전환 할 수 있도록 작성되었으므로 각 greenlet 에 충분한 대기 시간을 허용합니다 (기억해야 할 중요한점). 이 greenlet  중 하나가 IO 바운드 작업에 부딪 칠 때마다 libevent로 전송 한 다음 컨텍스트 전환을 허용하도록 스케줄러에 양보합니다. 그 외에도, 내부는 매우흥미롭지만, 당장은 이해 할 필요가 없습니다.

Getting started

먼저 웹 크롤러를 만들어보며 시작하겠습니다. 우리는 시드 URL을 가져 와서 그 페이지로 부터 다음 링크를 계속 따라갈 것입니다.

import sys
import re
import requests

# Track the urls we've found
crawled = 0

def crawler(u):
    '''A very simple web crawler'''
    global crawled

    # Crawl the page, print the status
    response = requests.get(u)
    print response.status_code, u

    # Extract some links to follow using a *really* bad regular expression
    for link in re.findall('<a href="(http.*?)"', response.content):

        # Limit to 10 pages
        if crawled < 10:
            crawled += 1
            crawler(link)

# Read the seed url from stdin
crawler(sys.argv[1])

가장 먼저 주의해야 할 점은 이 크롤러가 깊이우선재귀라는 것입니다. 이는 안좋을 수 있지만 단일 스레드 시스템에서 구현하기가 가장 쉽습니다. O (n)의 시간 복잡성은 각 웹 페이지를 순차적으로 로드하기 때문이죠. 이제 기반을 확보 했으니까 더 좋게 만들어 봅시다. 모든 gevent 응용 프로그램은 monkey 로 시작해야합니다.

import gevent.monkey
gevent.monkey.patch_all()

gevent 스케줄러를 설정하는 데 필요한 모듈을 임포트 합니다.

Let’s get restructuring

gevent 크롤러의 첫 번째 기본 접근 방식은 다음과 같습니다.

# monkey-patch
import gevent.monkey
gevent.monkey.patch_all()

import gevent
import sys
import re
import requests

# List for holding our greenlets
greenlets = []

def crawler(u):
    '''A very simple gevented web crawler'''
    global crawled

    # Crawl the page, print the status
    response = requests.get(u)
    print response.status_code, u

    # Extract some links to follow
    for link in re.findall('<a href="(http.*?)"', response.content):

        # Limit to 10 pages
        if len(greenlets) < 10:
            greenlets.append(gevent.spawn(crawler, link))

# Read the seed url from stdin
greenlets.append(gevent.spawn(crawler, sys.argv[1]))

# Wait until we've spawned enough url requests
while len(greenlets) < 10:
    gevent.sleep(1)

# Wait for everything to complete
gevent.joinall(greenlets)

이미 훨씬 좋아 보입니다. breadth-first 은 웹 크롤링과 관련하여 훨씬 더 좋은 생각인데, 이론적인 시간 복잡성은 O(2)로 낮아졌습니다.

이 크롤러는 greenlets 의 비용이 싸다는 좋은 특징을 보여줍니다. 일반적으로 IO 작업당 추가하여 병렬 처리 할 수 있으며 스케줄러는 이를 체계적으로 처리합니다. 안타깝게도 아직 이것은 유용한 크롤러와는 거리가 있습니다. greenlet 당 하나의 HTTP 요청을 수행하기 때문에 조금 더 넓게 열면 약간의 네트워크 문제가 발생할 수 있습니다. 그래서 좀 더 다른 방식도 생각해 봅시다.

Introducing worker pools

이제 액션 당 greenlet를 실행시키는 대신에, 우리는 greenlet 풀을 사용할 수 있습니다. 정해진 개수의 풀에 액션을 위임합니다.

# monkey-patch
import gevent.monkey
gevent.monkey.patch_all()

import gevent.pool
import sys
import re
import requests

# Prepare a pool for 5 workers
pool = gevent.pool.Pool(5)

# Crawl tracker is back
crawled = 0

def crawler(u):
    '''A very simple pooled gevent web crawler'''
    global crawled

    # Crawl the page, print the status
    response = requests.get(u)
    print response.status_code, u

    # Extract some links to follow
    for link in re.findall('<a href="(http.*?)"', response.content):

        # Limit to 10 pages (ignores links when the pool is already full)
        if crawled < 10 and not pool.full():
            crawled += 1
            pool.spawn(crawler, link)

# Read the seed url from stdin
pool.spawn(crawler, sys.argv[1])

# Wait for everything to complete
pool.join()

풀은 joinable 합니다. 즉, 요청된 모든 작업이 처리 될 때까지 대기하도록 응용 프로그램에 지시 할 수 있습니다 (즉, 풀이 준비 상태에 있음).

실제로 이것은 개별적인 풀 동작이 설명 할 수 없게 (infinte loop, anyone?) 도달 할 수 없기 때문에 때로는 두통을 일으킬 수 있습니다. 그 결과 전체 풀이 결코 안전하게 결합되지 않을 수 있습니다

Pool spawning is a blocking action

gevent 풀에 스레드를 생성하려고하면 gevent는 풀이 꽉 찼는지를 확인하고 그렇지 않은 경우 가용성을 기다립니다. 이것은 몇 가지 문제를 일으킵니다. 각 크롤러 풀은 pool.spawn을 호출하기 때문에 어떤 시점에 5 개의 풀 크기가 활성 상태이며, 링크를 찾으면 모두 pool.spawn을 동시에 호출합니다. 배고픈 철학자를 위해 포크를 남겨주어야 할텐데요..(식사하는 철학자 알고리즘)

But we want to crawl everything!

다행스럽게도, 풀링을 사용하는 것과는 별개로 우리가 잊고 있었던 한 가지 데이터 구조를 통해서 바보 같은 장난감 같은 크롤러를 강력한 웹 크롤링 응용 프로그램으로 바꿔 줄 것입니다.::

 
queueing.

# monkey-patch
import gevent.monkey
gevent.monkey.patch_all()

import gevent.pool
import gevent.queue

import sys
import re
import requests

# Prepare a pool for 5 workers and a messaging queue
pool = gevent.pool.Pool(5)
queue = gevent.queue.Queue()
crawled = 0

def crawler():
    '''A very simple queued gevent web crawler'''
    global crawled

    while 1:
        try:
            u = queue.get(timeout=1)
            response = requests.get(u)
            print response.status_code, u

            # Extract some links to follow
            for link in re.findall('<a href="(http.*?)"', response.content):
                # Limit to 10 pages (ignores links when the pool is already full)
                if crawled < 10:
                    crawled += 1
                    queue.put(link)

        except gevent.queue.Empty:
            break

queue.put(sys.argv[1])

# Read the seed url from stdin
for x in xrange(0, 5):
    pool.spawn(crawler)

# Wait for everything to complete
pool.join()

좋아 보이네요? 일단 먼저 한꺼번에 5개의 crawler 를 스폰합니다. 그 후 각 crawler 들은 큐에 데이터가 들어오길 기다리고 있으며 결과적으로 서로 공평하게 작업을 분배햐여 처리하고 있습니다. 싸울 일이 없어요. 특히 우리는 시드 URL에 의존하여 로드하는 데 1 초도 채 걸리지 않습니다.

Workers are now their own loops

이전에는 URL을 크롤링 할 때마다 새 작업자가 생성되었습니다.동일한 시스템에서 각 작업자는 정확히 하나의 대기열 메시지를 소비 한 다음 종료합니다. 그렇다고 이것이 세계에서 가장 효율적인 시스템은 아닙니다  우리는 각각의 greenlets을 생성하고 대기열 메시지가 더 이상 존재하지 않을 때까지 (또는 대기열이 1 초 이상 비어있을 때까지) 대기하게합니다.

Let’s do one better

gevent의 사용을 최적화하기 위해 (컨텍스트 스위치는 많이 들지 않지만 비용은 여전히 있습니다), 우리는 다음을 원합니다.

1.queue에있는 모든 메시지를 소비하거나

2.Workers 들과 함께 pool 을 가득 채우다.

가능한 경우 대기열에 이미 우리가 허용하는 것보다 많은 메시지가있는 경우에만 풀을 사용하여 # 2를 수행 할 수 있다는 명백한 주의 사항이 있습니다.

# monkey-patch
import gevent.monkey
gevent.monkey.patch_all()

import gevent.pool
import gevent.queue

import sys
import re
import requests

# Prepare a pool for 5 workers and a messaging queue
pool = gevent.pool.Pool(5)
queue = gevent.queue.Queue()
crawled = 0

def crawler():
    '''A very simple queued gevent web crawler'''

    print 'starting crawler...'
    global crawled

    while 1:
        try:
            u = queue.get(timeout=0)
            response = requests.get(u)
            print response.status_code, u

            # Extract some links to follow
            for link in re.findall('<a href="(http.*?)"', response.content):
                # Limit to 10 pages (ignores links when the pool is already full)
                if crawled < 10:
                    crawled += 1
                    queue.put(link)

        except gevent.queue.Empty:
            break

    print 'stopping crawler...'

queue.put(sys.argv[1])
pool.spawn(crawler)

while not queue.empty() and not pool.free_count() == 5:
    gevent.sleep(0.1)
    for x in xrange(0, min(queue.qsize(), pool.free_count())):
        pool.spawn(crawler)

# Wait for everything to complete
pool.join()

이 작업을 실행하면 정확히 한 명의 worker 가 생성되고 나머지 worker 는 대기열에 푸시됩니다. 그러면 다른 4 명의 worker 가 시작되어 크롤링 합니다.!

And now you can Gevent

이러한 패턴은 IO 바인딩 multiprocessing  시스템의 기초를 형성하며 시스템 IO 호출 (로컬 파일 시스템에 대한 많은 읽기 / 쓰기), 로컬 스레딩 및 모든 네트워크 IO에서 대기하는 모든 것에 적용될 수 있습니다.



번역:

http://blog.hownowstephen.com/post/50743415449/gevent-tutorial

+ Recent posts