Python

RxPY 소개 및 시작하기

[하마] 이승현 (wowlsh93@gmail.com) 2017. 8. 18. 14:31



     RxPY 소개 및 시작하기 

* RxPY 소개글을 써보았습니다. 앞부분의 개념 설명 부분은 건너 뛰고 "RxPY 시작하기" 부터 보는것도 좋을 거 같습니다. 먼저 코딩을 해보는게 정신건강에...도움이 될지도..


Rx? RxPY? FRP?  

RxPY란 Reactive Programming을 하기위한 개념인 Rx(Reactive Extensions)의 파이썬 라이브러리입니다. Rx (반응형 확장) 는 FRP (함수형 반응형 프로그래밍) 개념을 가지고 있는 시스템으로써 (완전한 FRP 는 아니며, 사용자의 편의를 위한 장치가 많은 반면 표시적 의미론이 부족하여 합성성이 떨어진다는 의견도 있습니다.)  마이크로소프트가 닷넷용으로 개발한 라이브러리인 Rx.NET으로부터 시작되어서 자바판인 RxJava, 자바스크립트판인 RxJS, RxSwift,RxGo,RxScala 도 있으며, 기타 언어로 확장 구현 되였습니다. 즉 Rx 는 특정 언어에 얽매여 있지 않기 때문에 하나만 익히면 어떤 언어 환경에서든 쉽게 사용 할 수 있을 것 입니다. 

주요 개념을 간단히 살펴보면 Rx 는 주로 Observable 이라는 인터페이스를 중심으로 한 비동기 API 를 제공하는데 Observable 은 상태변경에 해당되는 이벤트를 발생/전파시키는 스트림으로써 onNext - 값, onError - 오류, onComplete - 스트림의 끝 이라는 세가지 이벤트를 지원합니다. (순수 FRP 에서는 나머지 2개는 불필요하다고 보며, 오직 한가지 이벤트 타입인 값만 지원) Rx 는 비동기 프로그래밍의 어려움도 완화 시켜 주지만 (덕분에 비동기 프로그래밍의 헬인 자바스크립트에 RxJS 라는 이름으로 빠르게 지원됨) Observer 패턴을 대체하기도 합니다.

상태변경에 따른 상호작용에 많이 사용되는 고전적인 Observer 패턴은 여러가지 문제점들이 많이 있는데, 그 문제점들은 FRP 나 Rx 를 통해 완화됩니다. 제가 얼마전에 쓴 글인데 참고를:  굿바이~ Observer 패턴 

더 자세한 정보는 아래 다양한 레퍼런스를 참고하세요.

창시자(?) 인 에릭마이어의 동영상

Rx 와 FRP 의 비교도 참고하시죠

https://github.com/ReactiveX/reactivex.github.io/issues/130
http://lambda-the-ultimate.org/node/4982

한글 블로그입니다. 왜 마소에서 Rx 를 만들었을까에 대한 근원을 찾아가 보는 소개 글입니다.

http://huns.me/development/2051

Rx 는 FRP 의 부분확장팩 라이브러리라고도 볼수 있는데, FRP 를 쉽게 풀어 놓은 책이 얼마전에 번역됨.

함수형반응형프로그래밍

Rx자체에 대한 책도 있습니다.

Reactive Extensions in Action

RxJava를 활용한 리액티브 프로그래밍

Rx 의 근간이 되는 Functor, Monad 에 대한 설명 
https://gist.github.com/jooyunghan/e14f426839454063d98454581b204452

마지막으로 제가 작성한 스칼라에서의 Rx

스칼라 강좌 (39) 동시성을 위한 Observable



Rx vs FRP 의 간단한 의미

위에서는 좀 생각나는 데로 막 쓴 거 같은데요. 아래에는 핵심만 간추려 보겠습니다.

Rx

Rx 의 주요한 화두는 비동기 이벤트(데이터) 처리 라고 생각합니다. 즉 비동기를 어떻게 하면 잘 처리 할 수 있을 까라는 고심에서 나온 라이브러리이며, 비동기를 처리함에 있어서 함수적 처리를 통해 잘 구성할 수 있도록 해주는게 목표입니다. (본질은 조금 다른 거 같지만 실용적으로는 말이죠.) 
그 과정에서 데이터를 비동기로 보내주는 쪽을 Observable 이라고 명시하며, 받아서 처리하는 쪽을 Observer 라고 명시합니다. 데이터 스트림을 가지고 어떻게 합성(zip등) 하고 동기화하고 반응 하게 할 것이냐도 스케쥴러통해 선택 할 수 있으며 나름 구체적인 지침이 있는 라이브러리라고 보면 될 거 같습니다.

FRP

Rx 가 구체적인 라이브러리인 반면, FRP 는 개념에 더 가깝다고 볼 수 있습니다.  Functional reactive programming (FRP) 은 함수적 빌딩블럭(map.reduce,filter,fold) 를 이용하여 리액티브 프로그래밍 (비동기 데이터흐름 프로그래밍) 을 위한 파라다임인데요. 주로 GUI, 로보틱스,뮤직,IoT 센서데이터처리 등 명시적으로 시간을 모델링하는 프로그램을 단순화 시키기 위해 사용됩니다.  -위키백과-

코날엘리엇이라는 FRP 를 발명한 사람의 말에 따르면 FRP 의 가장 중요한 포인트는 표시적/시간연속적 입니다. 

표시적
은 각각의 타입과 구성요소의 의미를 정확히 지정해주는 엄밀하고,단순하며, 구현과는 무관한 합성 가능한 의미론을 기초로 해야한다는 것 입니다. 시스템에 대한 정확한 명세를 제공하며, 모든 경우의 모든 구성요소에 대해 합성성이라는 중요한 특성이 성립함을 증명해주는것이 랍니다. 그냥 "합성성" 이라는 단어만 머리에 넣어 두세요. 

시간연속적은 각자 공부해보는것으로 합시다. (저도 마찬가지..) 

왜 이런 방법론들이 나오게 되는것인지에 대한 개발자로써 쉽게 와닿을 수 있는 예시를 하나 보겠습니다.

{
  a();
  b():
  c();

   ..

}

절차지향에 익숙한 우리는 위의 함수들이 순서대로 발생할 것이라는 것을 염두해 두코 코딩을 하게 됩니다.
즉 c() 함수는 a() 함수와 b() 함수가 무엇인가 처리를 한 후에 그것을 가지고 처리한다는 거죠. 물론 순서가 중요하지 않을 때 도 있습니다.

이때 나중에 참여한 개발자가 저것을 다 파악하기란 힘들 것입니다. (물론 소스가 복잡해 진 다면 말이죠) 
또한 옵저버패턴이 사용된다면 그 순서란 더 감춰지기 마련이니 파악하기 힘들어 질 것입니다. 

의존 관계 와 순서는 이렇게 우리가 짜왔던 절차지향,객체지향에서는 혼동을 주게 되는데요. 
최근에 코어를 더 적극적으로 활용해야하는 시대에 와서 멀티쓰레드/비동기라는 동시성을 다루는 부분에 있어서 더더욱 순서와 의존관계를 파악하기 힘들어 졌습니다.

FRP 와 Rx 의 반응형이라는 말에 뒤에는 이런 순서&의존관계를 명확하게 인지 시키준다는 의미도 포함되어 있습니다. 앞으로 FRP 와 Rx 를 공부하는데 있어서 이 점을 머리속에 넣어 두면 좋을 거 같습니다.


RxPY 정의 

파이썬에서 LINQ 스타일의 쿼리 연산과 Observable 콜렉션들을 사용하여 비동기,이벤트-기반 프로그램을 구성하기 위한 라이브러리입니다.  참고로 O'Reilly 출판사는 Reactive Python for Data Science 동강을 만들었으며 아래는 소개 글을 퍼왔습니다 O'Reilly Safari

액티브 프로그래밍은 데이터 모델링의 미래를 말하고 있습니다. 리액티브를 사용하면 정적 데이터를 간결하게 처리하고 분석 할 수있을 뿐 아니라 실시간의 무한 피드(PUSH)에 의한 데이터를 효과적으로 처리 할 수 있습니다. Reactive Extensions (Rx)는 2009 년에 처음으로 탄생 하였으며,12 개 주요 언어 및 플랫폼에 이식되었습니다. 이 과정에서는 Python 데이터 분석 워크 플로우라는 문제에 대해 가벼운 Rx 파이썬 라이브러리인 RxPy를 사용하는 방법을 배우게 됩니다.

  • 데이터 과학에서 반응형 프로그래밍의 이점에 대한 상세한 고찰 
  • 푸시 기반 (push-based) vs 풀 기반 (pull-based) 이터레이션을 사용하여 문제를 "reactive way"으로 해결하는 방법.
  • 리액티브 프로그래밍이 왜 강력하고 간단하며 탄력적인 코드 모델을 생성하는지에 대한 이해 
  • 클러스터 컴퓨팅 하드웨어를 사용 할 수 없을 때 동시성을 위해 RxPy 활용 방법을 배웁니다.
  • RxPy 사용법을 익히고 모든 데이터 과학 작업을 위한 더 강력한 Python 코드 작성


RxPY 시작하기  

솔직히 저런 개념같은 거 다 집어 치우고, 그냥 냅다 코딩하는게 더 나을거 같습니다. 쉽게 가져다가 만들어 놓은 라이브러리를 그 개념을 완전히 이해해야 한다면 응용개발자들 입장에선 손해니까요~ 시간과 기회는 기다려 주지 않습니다. 각자의 본질적인 서비스에 주력하시고, 저런 이쁘니는 가져다가 사용하는데 주력합시다.


RxPY 설치하기


import rx 해서 사용하면 됩니다. - 끝 -

에러나는데요?

쏘오뤼~~

pip install rx

설치해주셔용~ ^^


RxPY Hello World

from rx import Observable, Observer


def push_hello_world(observer):
observer.on_next("hello")
observer.on_next("world")
#observer.on_error("error")
observer.on_completed()


class PrintObserver(Observer):
def on_next(self, value):
print("Received {0}".format(value))

def on_completed(self):
print("Done!")

def on_error(self, error):
print("Error Occurred: {0}".format(error))


if __name__ == '__main__':
source = Observable.create(push_hello_world)
source.subscribe(PrintObserver())

머리속으로 옵저버패턴을 생각하고 봐주세요. 거의 똑같습니다.
Observable 은 데이터를 가지고 있으며, 데이터를 Observer 에게 전파(Push) 시킵니다. 
데이터를 
종류에 따라서 next, completed,, error 로 Push 받은 Observer는 그것을 출력해 줍니다. 

"별거 없네~ 뭐 그냥 옵저버 패턴이구만" 이라고 생각하고 그냥 사용하세요. 좋은건 일단 쓰고 보는겁니다.
쓰다보면서 조금 다르네, 먼가 더 좋다~~ 라고 느끼면 되는겁니다.

아래는 옵저버를 하나 더 추가 해 봤습니다.

from rx import Observable, Observer


def push_hello_world(observer):
observer.on_next("hello")
observer.on_next("world")
#observer.on_error("error")
observer.on_completed()


class PrintObserver(Observer):
def on_next(self, value):
print("Received {0}".format(value))

def on_completed(self):
print("Done!")

def on_error(self, error):
print("Error Occurred: {0}".format(error))

class ListObserver(Observer):

def __init__(self):
self.my_list = []
def on_next(self, value):
self.my_list.append(value)

def on_completed(self):
print(self.my_list)
print("Done!")

def on_error(self, error):
print("Error Occurred: {0}".format(error))

if __name__ == '__main__':
source = Observable.create(push_hello_world)
source.subscribe(PrintObserver())
source.subscribe(ListObserver())


정리하면 

  • Observable 은 데이터를 생산하는 컴포넌트이며, 시간이 흐름에 따라서 여러 데이터 (혹은 이벤트, 상태)을 바깥으로 내보내고 (Push,Emit) 있습니다.  Observer 가 구독(subscribe) 하는 순간 부터 데이터를 내보냅니다.

  • Observer 는 Observable 에서 내보내는 3가지의 데이터(이벤트) 구분하여 처리 합니다. 

 - on_next :  일반적인 데이터(이벤트)를 처리합니다.
 - on_completed : 모든 데이터를 다 내보내었을 경우에 대해 처리합니다. 
 - on_error : Observable 에서 어떤, 예외가 생겼을 때 처리해 줍니다.

맛뵈기를 보았구요,  이제 RxPY 홈페이지에 있는 진짜베기 시작하기 문서를 보시죠.


진짜 RxPY 시작하기 

Rx 는 이벤트의 흐름을 다루는것에 관한 것입니다. Rx 를 통해 당신은 :

  • 다루기 원하는 데이터가 무엇인지 말하세요. (Observable) 
  • 어떻게 다루고 싶나요? (A composition of operators)
  • 결과(데이터) 를 가지고 하고 싶은것은 무엇입니까? (Observer)

Rx를 사용하여, 이벤트가 도착하는 시점에 이벤트를 통해 원하는 것을 설명 하는 것은 매우 중요합니다. 그것은 연산에 대한 선언적 구성으로써, Observer 에 도착할 때 이벤트를 처리하게 될 것입니다. 즉 아무 일도 일어나지 않으면 아무 것도 처리하지 않습니다.

1. Rx 모듈 임포트 


import rx
from rx import Observable, Observer


2. 시퀀트 제네레이팅 

이벤트를 발생시키는 방법중에 꽤 간단한 from_iterable 을 사용합니다.

class MyObserver(Observer):
    def on_next(self, x):
        print("Got: %s" % x)
        
    def on_error(self, e):
        print("Got error: %s" % e)
        
    def on_completed(self):
        print("Sequence completed")

xs = Observable.from_iterable(range(10))
d = xs.subscribe(MyObserver())
Got: 0
Got: 1
Got: 2
Got: 3
Got: 4
Got: 5
Got: 6
Got: 7
Got: 8
Got: 9
Sequence completed

subscribe 메소드에는 Observer 객체가 들어가지만, print 도 들어 갈 수 있으며 익명 Observer 로써 on_next 에 해당하는 역할을 하게 됩니다.

xs = Observable.from_(range(10))
d = xs.subscribe(print)
0
1
2
3
4
5
6
7
8
9

3. 시퀀스 필터링 (Filtering)

filter 메소드의 매개변수로 람다식을 사용하여 홀수만 Observer에게 전파하고 있습니다.
xs = Observable.from_(range(10))
d = xs.filter(
        lambda x: x % 2
    ).subscribe(print)
1
3
5
7
9

4. 시퀀스 변형 (Transforming)

map메소드의 매개변수로 람다식을 사용하여 데이터에 2배를 하여 Observer에게 전파하고 있습니다.
xs = Observable.from_(range(10))
d = xs.map(
        lambda x: x * 2
    ).subscribe(print)
0
2
4
6
8
10
12
14
16
18

map메소드의 두번째 파라미터로 인덱스를 넘겨 줄 수 도 있습니다.

xs = Observable.from_(range(10, 20, 2))
d = xs.map(
        lambda x, i: "%s: %s" % (i, x * 2)
    ).subscribe(print)

0: 20
1: 24
2: 28
3: 32
4: 36

5. 병합  (Merge)

2개의 Observable에서 부터 흘러 나오는 데이터를 모두 처리합니다. 순서는 보장 못합니다.
xs = Observable.range(1, 5)
ys = Observable.from_("abcde")
zs = xs.merge(ys).subscribe(print)
a
1
b
2
c
3
d
4
e
5

6. Rx의 시공간 (SpaceTime)

위의 모든 예에서 모든 이벤트는 동일한 순간에 발생하며 이벤트는 순서에 따라 분리됩니다. 위의 병합 작업 결과에 다음과 같은 몇 가지 유효한 결과가 있을 수 있으므로 많은 신규 사용자들이  Rx에 혼동에 빠지곤 합니다.

a1b2c3d4e5
1a2b3c4d5e
ab12cd34e5
abcde12345

Rx가 해주는 유일한 보장은 1이 2보다 앞에 나오며, 1은 ys 데이터 중간에 어디서나 나올 수 있다는 것입니다. 그것은 어떤 이벤트가 먼저 가야 할지를 결정하기 위해 스케줄러의 정렬 안정성을 향상시킵니다. 실시간 데이터 스트림의 경우 실제 시간으로 이벤트가 분리되므로 문제가 되지 않습니다. 예상 한 결과를 얻으려면 Rx로 재생할 때 이벤트 사이에 시간을 추가하는 것이 좋습니다.

7. Marbles and Marble 다이어그램


이전 섹션에서 언급되었듯 Rx 및 RxPY로 데이터를 전파 할 때 시간을 추가하는 것이 좋습니다. 가장 좋은 방법은 Marble 다이어그램을 가지고 놀 수 있는 Marble 테스트 모듈을 사용하는 것인데 Marble 모듈은 Observable에 두 가지 새로운 확장 메소드를 추가하는데 각각은 from_marbles () 및 to_marbles ()입니다.

예제:

  1. res = rx.Observable.from_marbles("1-2-3-|")
  2. res = rx.Observable.from_marbles("1-2-3-x", rx.Scheduler.timeout)

문자열은 특별한 문자들로 구성됩니다:

    - = Timespan of 100 ms (100ms 간격) 
    x = on_error()
    | = on_completed()
모든 문자는 문자열에서 발견되는 순간에 on_next () 이벤트로 처리됩니다. 다중 문자 값을 나타내야 할 경우 "1- (42) -3"과 같이 대괄호로 그룹화 할 수 있습니다.

제대로 해보죠.
from rx.testing import marbles

xs = Observable.from_marbles("a-b-c-|")
xs.to_blocking().to_marbles()

'a-b-c-|'

이제 Marble 문자열에 x를 삽입하여 짝수 스트림에 오류를 추가하여 보겠습니다.

xs = Observable.from_marbles("1-2-3-x-5")
ys = Observable.from_marbles("1-2-3-4-5")
xs.merge(ys).to_blocking().to_marbles()

'11-22-33-4x'

8. Subject and Stream

observable stream 을 만드는 간단한 방법은 Subject 을 사용하는 것입니다. 참고로 GOF 의 Design Patterns 책의 Observer 패턴에는 Observable 부분을 Subject 라고 부릅니다만 어쨌든 여기서 subject 는 Observable 처럼 subscribe 를 제공하며, on_next 를 통해 손쉽게 전파할 데이터를 입력 받을 수 있습니다.

from rx.subjects import Subject

stream = Subject()
stream.on_next(41)

d = stream.subscribe(lambda x: print("Got: %s" % x))

stream.on_next(42)

d.dispose()
stream.on_next(43)

Got: 42


9. Multicasting

Observable 에 대한 각 구독자들은 종종 별도의 방출 스트림을 수신 받게 됩니다. 예를 들어, Observable에 두 명의 구독자가 있고, 세 개의 임의의 정수가 방출 될 때, 각 구독자들은 서로 다른 숫자를 갖게 됩니다. 아~~뭔 설명이 더 헤깔리게 하네요. 개발자는 코드죠. 그냥 코드를 보면 쉽게 이해 됩니다.

from rx import Observable from random import randint three_emissions = Observable.range(1, 3) three_random_ints = three_emissions.map(lambda i: randint(1, 100000)) three_random_ints.subscribe(lambda i: print("Subscriber 1 Received: {0}".format(i))) three_random_ints.subscribe(lambda i: print("Subscriber 2 Received: {0}".format(i)))

OUTPUT:

Subscriber 1 Received: 79262
Subscriber 1 Received: 20892
Subscriber 1 Received: 69197
Subscriber 2 Received: 66574
Subscriber 2 Received: 41177
Subscriber 2 Received: 47445

Observable 체인의 특정 지점에 모든 구독자에게 동일한 값을 Push 하기 위해 publish()를 호출하여 ConnectableObservable을 반환 할 수 있습니다. 그런 다음 구독자를 설정(subscribe) 하고 connect ()를 호출 하면 됩니다. 역시 소스를 보시죠.

from rx import Observable from random import randint three_emissions = Observable.range(1, 3) three_random_ints = three_emissions.map(lambda i: randint(1, 100000)).publish() three_random_ints.subscribe(lambda i: print("Subscriber 1 Received: {0}".format(i))) three_random_ints.subscribe(lambda i: print("Subscriber 2 Received: {0}".format(i))) three_random_ints.connect()

OUTPUT:

Subscriber 1 Received: 90994
Subscriber 2 Received: 90994
Subscriber 1 Received: 91213
Subscriber 2 Received: 91213
Subscriber 1 Received: 42335
Subscriber 2 Received: 42335

connect ()를 호출하기 전에 모든 Observers를 설정해야합니다.

mutural을 구현하는 또 다른 방법은 ConnectableObservable에서 auto_connect () 연산자를 사용하는 것입니다.
매개변수로 넣은 값에 따라서 구독자가 subscribe 되면 데이터(이벤트)를 내보내기 시작합니다.

from rx import Observable from random import randint three_emissions = Observable.range(1, 3) three_random_ints = three_emissions.map(lambda i: randint(1, 100000)).publish().auto_connect(2) three_random_ints.subscribe(lambda i: print("Subscriber 1 Received: {0}".format(i))) three_random_ints.subscribe(lambda i: print("Subscriber 2 Received: {0}".format(i))) # second subscriber triggers firing


10. Combining Observables

Observable.merge (), Observable.concat (), Observable.zip () 및 Observable.combine_latest ()와 같은 팩터 리를 사용하여 서로 다른 Observables를 함께 작성할 수 있으며  Observables가 다른 스레드 (subscribe_on () 및 observe_on () 연산자 사용)에서 작업하는 경우에도 안전하게 결합됩니다.

다음 예에서는 Observable.zip ()을 사용하여  5 개의 문자열과 다른 Observable 에서 나오는 값을 튜플로 zip 하여 처리 할 수 있게 됩니다.  두개의 Observable 중 작은 개수의 데이터 스트림만큼 zip 됩니다.

from rx import Observable letters = Observable.from_(["Alpha", "Beta", "Gamma", "Delta", "Epsilon"]) intervals = Observable.interval(1000) Observable.zip(letters, intervals, lambda s, i: (s, i)).subscribe(lambda t: print(t)) input("Press any key to quit\n")

OUTPUT:

('Alpha', 0)
('Beta', 1)
('Gamma', 2)
('Delta', 3)
('Epsilon', 4)

11. Concurrency

동시성 위해 RxPY 는 subscribe_on ()observe_on () 두 연산자를 사용하며 두 작업 모두 작업을 수행하기 위해 각 구독에 대한 스레드를 제공하는 스케줄러가 필요합니다 (아래 스케줄러 섹션 참조). ThreadPoolScheduler는 재사용 가능한 작업자 스레드 풀을 만드는 좋은 선택이 될 수 있습니다.

파이썬의 GIL은 다중 스레드가 동일한 코드 행을 동시에 액세스 할 수 없으므로 동시성 성능을 저하시킬 수 있습니다. NumPy와 같은 라이브러리는 GIL을 해제 할 때 병렬 집약적인 계산을 위해 이를 완화 할 수 있습니다. RxPy 또한 스레드 오버랩을 어느 정도 최소화 할 수 있습니다. 동시성으로 애플리케이션을 테스트하고 성능이 향상되는지 확인하십시오.

subscribe_on ()은 사용 할 스케쥴러 체인의 시작 부분에서 Observable 소스를 가르킵니다. 이 연산자를 넣는 위치는 중요하지 않습니다만 observe_on ()은 Observablechain의 해당 시점에 다른 스케줄러로 전환하여 한 스레드에서 다른 스레드로 효과적으로 이동시킵니다. Observable.interval () 및 delay ()와 같은 일부 Observable 팩토리 및 연산자에는 이미 기본 스케줄러가 있으므로 사용자가 지정한 subscribe_on ()을 무시합니다 (일반적으로 스케줄러를 인수로 전달할 수 있음).

아래에서는 observe_on ()뿐만 아니라 subscribe_on ()을 순차적으로 사용하는 대신 세 가지 다른 프로세스를 동시에 실행하는 모습을 보여줍니다.

import multiprocessing
import random
import time
from threading import current_thread

from rx import Observable
from rx.concurrency import ThreadPoolScheduler


def intense_calculation(value):
    # sleep for a random short duration between 0.5 to 2.0 seconds to simulate a long-running calculation
    time.sleep(random.randint(5, 20) * .1)
    return value


# calculate number of CPU's, then create a ThreadPoolScheduler with that number of threads
optimal_thread_count = multiprocessing.cpu_count()
pool_scheduler = ThreadPoolScheduler(optimal_thread_count)

# Create Process 1
Observable.from_(["Alpha", "Beta", "Gamma", "Delta", "Epsilon"]) \
    .map(lambda s: intense_calculation(s)) \
    .subscribe_on(pool_scheduler) \
    .subscribe(on_next=lambda s: print("PROCESS 1: {0} {1}".format(current_thread().name, s)),
               on_error=lambda e: print(e),
               on_completed=lambda: print("PROCESS 1 done!"))

# Create Process 2
Observable.range(1, 10) \
    .map(lambda s: intense_calculation(s)) \
    .subscribe_on(pool_scheduler) \
    .subscribe(on_next=lambda i: print("PROCESS 2: {0} {1}".format(current_thread().name, i)),
               on_error=lambda e: print(e), on_completed=lambda: print("PROCESS 2 done!"))

# Create Process 3, which is infinite
Observable.interval(1000) \
    .map(lambda i: i * 100) \
    .observe_on(pool_scheduler) \
    .map(lambda s: intense_calculation(s)) \
    .subscribe(on_next=lambda i: print("PROCESS 3: {0} {1}".format(current_thread().name, i)),
               on_error=lambda e: print(e))

input("Press any key to exit\n")

OUTPUT:

Press any key to exit
PROCESS 1: Thread-1 Alpha
PROCESS 2: Thread-2 1
PROCESS 3: Thread-4 0
PROCESS 2: Thread-2 2
PROCESS 1: Thread-1 Beta
PROCESS 3: Thread-7 100
PROCESS 3: Thread-7 200
PROCESS 2: Thread-2 3
PROCESS 1: Thread-1 Gamma
PROCESS 1: Thread-1 Delta
PROCESS 2: Thread-2 4
PROCESS 3: Thread-7 300
...

12. Schedulers

RxPY에서는 비동기적으로 실행되도록 선택하거나 스레드를 사용하여 작업 및 시간 초과를 예약하도록 결정할 수도 있습니다.  좋아하는 Python 프레임 워크에서 RxPY를보다 쉽게 사용할 수 있도록 Python 관련 메인 루프 스케줄러가 많이 있습니다.

  • ThreadPoolScheduler to create a fixed sized pool of Schedulers.
  • NewThreadScheduler to create a new thread for each subscription
  • AsyncIOScheduler for use withAsyncIO. (requires Python 3.4 ortrollius, a port of asyncio compatible with Python 2.6-3.5).
  • EventLetEventScheduler for use with Eventlet.
  • IOLoopScheduler for use withTornado IOLoop. See theautocomplete and konamicode examples for how to use RxPY with your Tornado application.
  • GEventScheduler for use with GEvent. (Python 2.7 only).
  • TwistedScheduler for use with Twisted.
  • TkinterScheduler for use with Tkinter. See the timeflies example for how to use RxPY with your Tkinter application.
  • PyGameScheduler for use with PyGame. See thechess example for how to use RxPY with your PyGame application.
  • QtScheduler for use withPyQt4,PyQt5, andPySide. See thetimeflies example for how to use RxPY with your Qt application.
  • GtkScheduler for use withPython GTK+ 3. See thetimeflies example for how to use RxPY with your GTK+ application.
  • WxScheduler for use with wxPython. See thetimeflies example for how to use RxPY with your wx application.


이상 시작하기 부분은 모두 끝마쳤습니다.~ 더 자세한 내용은 소스와 홈페이지를 참고하세요.


레퍼런스:

https://github.com/ReactiveX/RxPY
https://jakubturek.com/functional-reactive-programming-in-python/