텐서플로우는 범용적인 기술로써 딥러닝 뿐 만 아니라 다양한 곳에 사용 될 수 있습니다. 즉 머신러닝 라이브러리를 사용하지 않고 밑바닥에서 무엇인가 만들수 있는 여지가 많다는 뜻이겠지요. 예전에 CUDA 로 Ray-Tracing을 통해 볼륨랜더링을 만들어 본 경험이 있어, 혹시 TensorFlow 로도 할 수 있을 듯 하여 찾아 봤는데 관련 블로그가 있어서 번역 해 보았습니다. (오역이 있을 수 있으니, 진지하게 접근하는 분이라면 원문을 참고) 

[번역] https://medium.com/@awildtaber/building-a-rendering-engine-in-tensorflow-262438b2e062


텐서플로우로 랜더링 엔진 만들기 


이 포스트에서는 텐서플로우를 이용해 만든 랜더링 엔진에 대해  파이썬 코드 샘플 과 함께 알아 볼 것 이다. 아이디어와 코드의 많은 부분은 Íñigo Quílez, Matt Keeter, and the HyperFunproject 에게서 영감을 얻었다. 탱큐~


지오메트릭 표현 : 암시적 surface

(역주: 보통 surface 는 외부면만 있는것, Solid 는 내부가 채워져 있는 것을 지칭하는데 이 문서에선 그냥 합쳐서 말하는 듯) 
 


암시적 surface 는입력점이 물체(geometry)의 내부 또는 외부에 있는지 여부를 나타내는 부호를 반환하는 3차원 공간에서 함수 (f라고 함)를 사용하여 기하학(geometry)을 표현한 것이다. 우리는 음수 기호가 "외부"를 의미하고 양수 기호가 "내부"를 의미하는 규칙을 사용 할 것이며, f (x, y, z) = 0 인 각 입력점 (x, y, z)이 표면에 정확히 놓여 있고, 그 결과 표면은 f 의 솔루션 셋으로 암시적으로 정의되므로 그것들은 "암시적" surface 이라고 불리운다.

일반적으로 f에 대한 다른 가정은 없다. 즉, 반지름이 1 인 원에 대해서는 절차적으로 (프랙탈과 같이) , 대수적으로 (f = sqrt (x ^ 2 + y ^ 2 + z ^ 2) -1) ), 또는 심지어 확률론적으로 정의 될 수 있다.대수 표현으로 정의된 암시적 surface 을 생각해보면, 기본 도형은 구체, 토리(tori), 평면, 타원체 및 원통형이 있을 것이며, 이것들은 모두 간단한 대수 표현식을 가지고 있는데. 예를 들어, plane (x, y, z) = x * a + y * b + z * c + m이 될 것이다. 이것을 파이썬에서 구현하기 위해 closure를 사용하여 값 a, b, c, m을 설정해보자.

def plane(a, b, c, m):
  def expression(x, y, z):
    return x*a + y*b + z*c + m
  return expression

부울 연산과 좌표 변환을 통해 더 복잡한 지오메트리를 작성 해 보자. 부울 연산을 정의하는 방법은 다음과 같다.

# 합쳐서 확장하기 (합집합) 

def union(f, g):
  def unioned(x, y, z):
    return max(f(x, y, z), g(x, y, z))
  return unioned

# 두 평면의 겹치는 부분만 (교집합)

def intersection(f, g):
  def intersected(x, y, z):
    return min(f(x, y, z), g(x, y, z))
  return intersected 

def negation(f):
  def negated(x, y, z):
    return -f(x, y, z)
  return negated

좌표 변환을 이해하기 위해 머리를 좀 굴려야 할 필요가 있긴 하지만, 일반적으로 함수에 대한 입력 좌표를 왜곡(distort) 한다. 예를 들어, 가장 간단한 변환은 다음과 같다.

# 이동 

def translate(a,b,c):
  def translated(x,y,z):
    return (x-a, y-b, z-c)
  return translated

만약 우리가 함수를 이동(translate) 하기를 원한다면 변환(transformation) 함수 f 를 구성하여 f(*translate(a,b,c)(x, y, z))를 작성하고 나면, 이 새로운 이동 함수의 솔루션 셋(solution set)은 (a,b,c) 에 의해 이동(translated) 된다. 예를들어 f (0, 0, 0) = 0이면, (0,0,0)은 f의 표면에 있으므로,  f(*translate(1,0,0)(1,0,0)) => f(1-1, 0-0, 0-0) => f(0,0,0) = 0 처럼 된다면 결과 솔루션 셋은 (1,0,0) 만큼 높아진다.  

이 개념은 원뿔 좌표 변환 및 톱니파 사용을 통한 도메인 반복의 영역으로 상당히 멀리 떨어져있을 수 있다 (암시적 surface 로 그릴 수있는 crazy 한 것들에 대한 Johann Korndörfer’s presentation 참조). 여기 암시적 표면의 매력에 대한 첫 번째 증거를 볼 수 있는데 간단한 대수 방정식으로 엄청나게 복잡한 지오메트리를 간결하게 나타낼 수 있다. 다른 패치가 어디에서 만나고 금지되어 있는지 추적해야 하므로 미세 구조를 갖는 기하학의 경우 이러한 상황에서 경계 표현 ( CAD의 산업 표준 표현) 이 서로 다르기 때문에 복잡한 패치 토폴로지를 사용하여 모델링 형상을 모델링 할 때 특히 유용하다. 

나는 암시적인 surface 를 공간 복잡성에 대한 거래 시간 복잡성으로 생각한다. 메모리에서 모든 토폴로지 스티칭(stitching) 정보를 유지할 필요는 없지만 비용을 지불하기 위해 (잠재적으로) 거대한 대수 방정식을 풀어야 하며 프리미티브를 사용하여 흥미로운 것을 정의하기 위해 큰 숫자의 연산 트리가 필요하다. 결국 스크린에 무엇인가를 렌더링하려면 수십만 포인트의 함수 f를 샘플링 해야 한다.

이 지점에서 아키텍처 과제가 있는데, 우리는 대수 표현을 가능한 한 많이 재사용하고 대량의 입력 데이터 배열에 대해 효율적인 수치 구현을 계산하고 결국 계산을 병렬화하는 방식으로 대수 표현을 작성 해야 하는 운명에 쳐해있다.

이제 우리를 도와 줄 해결사가 등장 할 시간이다.  "텐서플로우" 


텐서플로우: 선언적 그래프 계산 


Tensorflow는 파이썬에서 계산 그래프를 정의하고, C ++ 또는 CUDA에서 그것을 실행하기 위한 프레임 워크이다. 주요 적용 지점은 신경망을 정의하는 것이지만 훨씬 더 일반적인 사용이 가능하게 설계되었습니다. (역주: CUDA, OpenCL , AMP, 텐서폴로우 모두 각각의 장,단이 있지만 내 경우는 요즘 데이터 분석에 촛점이 맞춰져 있기 때문에  많은 정보가 존재하는 텐서플로우에 집중하는게 나을 거 같다. C++을 사랑 하므로, AMP 도 놓치긴 싫지만...) 

                   

       (숫자가 높을 수록 좋음( FPS)  AMP 가 좀 떨어지긴 하지만 이런 벤치마크는 상황에 따라 천차만별임)  



우리가 찾아야 할 일반적인 기능에는 일반적인 하위 표현식 제거, 자동 차별화 및 CUDA 컴파일과 같은 매우 귀중한 기능이 있다. Tensorflow의 주요 데이터 구조는 Tensor 이며, 계산 결과를 나타냅니다. 여러면에서 numpy.ndarray와 유사하도록 설계되었으므로, ndarrays 을 해보셨다면 많은 부분 직감적으로 적용 할 수 있을 것이다. 우리가 두 개의 텐서 a와 b를 가지고 있다면, 산술 연산이나 복잡한 함수를 적용하여 더 많은 계산을 생성 할 수 있게 된다. 예를 들어 a + b는 두 개의 텐서를 더한 결과를 나타내는 새로운 Tensor를 반환 한다.


                                                           제공:  An Introduction to TensorFlow




여기에 계획이 하나 있다. 우리는 텐서 (Tensors)를 입력으로 사용 한 후 함수를 평가하여 계산 그래프를 생성 할 것이다.그런 다음 Tensorflow의 일반적인 하위 표현식 제거를 통해 가능한 한 많은 계산을 자동으로 재사용하고 자동 차별화를 통해 함수의 정확한 derivatives 을 가져올 수 있게 할 것이다 (이는 렌더링에  꽤나 유용 할 것이다). 암시적 surface 를 렌더링하는 두 가지 일반적인 방법으로는 polygonization과 ray-tracing이 있다. 


두 가지 중 더 단순한 방법인 polygonization 부터 시작해 본다.


폴리곤화 (Polygonization)


일을 간단하게하기 위해 우리는 고전적인 Marching Cubes algorithm을 사용하여 주어진 함수 값의 체적 그리드가 주어진 삼각형을 생성하고. 체적 그리드를 생성하기 위해 서는 x, y, z라고하는 세 개의 다른 텐서를 정의 할 것이다. 이 텐서는 함수에 공급할 수있는 좌표를 나타내며, 이 텐서들을 결합하여 하나의 텐서를 나타 낼 것이다. 우리는 Tensorflow의 Variable 클래스를 사용하여 좌표 텐서를 초기화 할 것이지만 데이터를 입력하기 위해 런타임 feed_dict 옵션을 사용할 수도 있다.


min_bounds = [-1,-1,-1] # the geometric coordinate bounds
max_bounds = [1,1,1] 
output_shape = [200,200,200]

resolutions = list(map(lambda x: x*1j, output_shape))

space_grid = np.mgrid[min_bounds[0]:max_bounds[0]:resolutions[0],min_bounds[1]:max_bounds[1]:resolutions[1],min_bounds[2]:max_bounds[2]:resolutions[2]]
space_grid = space_grid.astype(np.float32)

x = tf.Variable(space_grid[0,:,:,:], trainable=False, name="X-Coordinates")
y = tf.Variable(space_grid[1,:,:,:], trainable=False, name="Y-Coordinates")
z = tf.Variable(space_grid[2,:,:,:], trainable=False, name="Z-Coordinates")

draw_op = function(x,y,z)

session = tf.Session()
session.run(tf.initialize_all_variables())
volumetric_grid = tf.session.run(draw_op)

marching_cubes(volumetric_grid) # => list of faces and vertices for rendering


이 렌더링 방법은 디버깅하기 좋으며 빠르고 간단하지만 지오메트리에 거친(Coarse) 간격의 격자로 캡처 할 수 없는 feature를 가진 경우의 이슈는 있다. Raytracing은 일반적으로 폴리곤화 보다는 느리지만 근본적으로 무한한 해상도를 가진  진정 아름다운 장면을 렌더링 할 수 있다.



광선 추적법 (Raytracing) 


raytracing을 사용하려면 크기가 입력 점에서 가장 가까운 점까지의 유클리드 거리보다 항상 작아야 한다는 함수 f 에 대한 추가 가정이 필요하다. 즉 우리는 Signed Distance Functions 로 한정 할 것이다.

근본적으로 raytracing은 수식 f (x, y, z)를 하나의 변수 t 인 광선 길이로 표현하는 것으로 구성되는데 우리는 t에 대해 f (* (ray * t)) = 0을 반복적으로 계산하고 그로부터 우리는 표면의 어디에 있는지를 알게 된다. 화면의 각 픽셀에 대해 이 작업을 수행하여 이미지를 출력하게 되며 각 반복마다 t를 얼마나 많이 증가 시킬지를 적용하는데 있어서 signed 거리 속성이 필요하다. 광선 추적 방정식을 풀기 위해 bisection, 뉴턴 방정식 또는 심지어 Tensorflow의 빌트인 최적화 방안을 사용할 수도 있지만, 어느 것도 정확하고 좋은 성능을 보여주지는 못했다.


def normalize_vector(vector):
  return vector / tf.sqrt(tf.reduce_sum(tf.square(vector), reduction_indices=0))

def vector_fill(shape, vector):
  return tf.pack([
    tf.fill(shape, vector[0]),
    tf.fill(shape, vector[1]),
    tf.fill(shape, vector[2]),
  ])

resolution = (1920, 1080)
aspect_ratio = resolution[0]/resolution[1]
min_bounds, max_bounds = (-aspect_ratio, -1), (aspect_ratio, 1)
resolutions = list(map(lambda x: x*1j, resolution))
image_plane_coords = np.mgrid[min_bounds[0]:max_bounds[0]:resolutions[0],min_bounds[1]:max_bounds[1]:resolutions[1]]

# Find the center of the image plane

camera_position = tf.constant([-2, 0, 0])
lookAt = (0, 0, 0)
camera = camera_position - np.array(lookAt)
camera_direction = normalize_vector(camera)
focal_length = 1
eye = camera + focal_length * camera_direction

# Coerce into correct shape

image_plane_center = vector_fill(resolution, camera_position)

# Convert u,v parameters to x,y,z coordinates for the image plane

v_unit = [0, 0, -1]
u_unit = tf.cross(camera_direction, v_unit)
image_plane = image_plane_center + image_plane_coords[0] * vector_fill(resolution, u_unit) + image_plane_coords[1] * vector_fill(resolution, v_unit)

# Populate the image plane with initial unit ray vectors

initial_vectors = image_plane - vector_fill(resolution, eye)
ray_vectors = normalize_vector(initial_vectors)

t = tf.Variable(tf.zeros_initializer(resolution, dtype=tf.float32), name="ScalingFactor")
space = (ray_vectors * t) + image_plane

# Name TF ops for better graph visualization

x = tf.squeeze(tf.slice(space, [0,0,0], [1,-1,-1]), squeeze_dims=[0], name="X-Coordinates")
y = tf.squeeze(tf.slice(space, [1,0,0], [1,-1,-1]), squeeze_dims=[0], name="Y-Coordinates")
z = tf.squeeze(tf.slice(space, [2,0,0], [1,-1,-1]), squeeze_dims=[0], name="Z-Coordinates")

evaluated_function = function(x,y,z)

# Iteration operation

epsilon = 0.0001
distance = tf.abs(evaluated_function)
distance_step = t - (tf.sign(evaluated_function) * tf.maximum(distance, epsilon))
ray_step = t.assign(distance_step)


각 픽셀의 광선 길이를 얻은 후에는 Tensorflow의 자동 차등화( automatic differentiation) 기능을 사용하여 함수 f의 정확한 수치 미분을 계산하여 surface 를 가져올 수 있습니다. 표면으로 이동함에 따라 f에 의해 생성되는 스칼라 필드가 항상 증가하므로 df가 항상 지오메트리의 내부를 가리키게 되고 표면의 법선이 된다.


light = {"position": np.array([0, 1, 1]), "color": np.array([255, 255, 255])}
gradient = tf.pack(tf.gradients(evaluated_functional, [x,y,z]))
normal_vector = normalize_vector(gradient)
incidence = normal_vector - vector_fill(resolution, light["position"])
normalized_incidence = normalize_vector(incidence)
incidence_angle = tf.reduce_sum(normalized_incidence * normal_vector, reduction_indices=0)

# Split the color into three channels

light_intensity = vector_fill(resolution, light['color']) * incidence_angle

# Add ambient light

ambient_color = [119, 139, 165]
with_ambient = light_intensity * 0.5 + vector_fill(resolution, ambient_color) * 0.5
lighted = with_ambient

# Mask out pixels not on the surface

epsilon = 0.0001
bitmask = tf.less_equal(distance, epsilon)
masked = lighted * tf.to_float(bitmask)
sky_color = [70, 130, 180]
background = vector_fill(resolution, sky_color) * tf.to_float(tf.logical_not(bitmask))
image_data = tf.cast(masked + background, tf.uint8)

image = tf.transpose(image_data)
render = tf.image.encode_jpeg(image)

마지막으로, 렌더링하기에 충분할 정도로 반복 단계를 실행한다!  모든 픽셀이 이미 수렴되었거나 배경의 일부인 경우 루프를 종료하는 수렴 테스트를 추가 할 수 있다.


session = tf.Session()
session.run(tf.initialize_all_variables())
step = 0
while step < 50:
  session.run(ray_step)
  step += 1

session.run(render) # <= returns jpeg data you can write to disk

다양한 프레임을 통해 애니메이트된 최종 제품 :


                                    


아마존 GPU 인스턴스에서 실행할 때 1080p의 프레임을 렌더링하는 데 약 1분이 걸렸다. GPU 셰이더를 통한 레이트레이싱과 비교 하긴 어렵지만 오류역전파를 하려는 경우 Tensorflow에 이미지 텐서를 가질 수 있게 되었다.





여기까지 마치구요. 원문에는 텐서플로우 디버깅 및 심볼릭 계산 트리가 후속 글로 있음을 알려드립니다.


참고:  CUDA Ray -Tracing 



저작자 표시 비영리 동일 조건 변경 허락
신고
Posted by [前草] 이승현 (wowlsh93@gmail.com)



     RxPY 소개 및 시작하기 

* RxPY 소개글을 써보았습니다. 앞부분의 개념 설명 부분은 건너 뛰고 "RxPY 시작하기" 부터 보는것도 좋을 거 같습니다. 먼저 코딩을 해보는게 정신건강에...도움이 될지도..


Rx? RxPY? FRP?  

RxPY란 Reactive Programming을 하기위한 개념인 Rx(Reactive Extensions)의 파이썬 라이브러리입니다. Rx (반응형 확장) 는 FRP (함수형 반응형 프로그래밍) 개념을 가지고 있는 시스템으로써 (완전한 FRP 는 아니며, 사용자의 편의를 위한 장치가 많은 반면 표시적 의미론이 부족하여 합성성이 떨어진다는 의견도 있습니다.)  마이크로소프트가 닷넷용으로 개발한 라이브러리인 Rx.NET으로부터 시작되어서 자바판인 RxJava, 자바스크립트판인 RxJS, RxSwift,RxGo,RxScala 도 있으며, 기타 언어로 확장 구현 되였습니다. 즉 Rx 는 특정 언어에 얽매여 있지 않기 때문에 하나만 익히면 어떤 언어 환경에서든 쉽게 사용 할 수 있을 것 입니다. 

주요 개념을 간단히 살펴보면 Rx 는 주로 Observable 이라는 인터페이스를 중심으로 한 비동기 API 를 제공하는데 Observable 은 상태변경에 해당되는 이벤트를 발생/전파시키는 스트림으로써 onNext - 값, onError - 오류, onComplete - 스트림의 끝 이라는 세가지 이벤트를 지원합니다. (순수 FRP 에서는 나머지 2개는 불필요하다고 보며, 오직 한가지 이벤트 타입인 값만 지원) Rx 는 비동기 프로그래밍의 어려움도 완화 시켜 주지만 (덕분에 비동기 프로그래밍의 헬인 자바스크립트에 RxJS 라는 이름으로 빠르게 지원됨) Observer 패턴을 대체하기도 합니다.

상태변경에 따른 상호작용에 많이 사용되는 고전적인 Observer 패턴은 여러가지 문제점들이 많이 있는데, 그 문제점들은 FRP 나 Rx 를 통해 완화됩니다. 제가 얼마전에 쓴 글인데 참고를:  굿바이~ Observer 패턴 

더 자세한 정보는 아래 다양한 레퍼런스를 참고하세요.

창시자(?) 인 에릭마이어의 동영상

Rx 와 FRP 의 비교도 참고하시죠

https://github.com/ReactiveX/reactivex.github.io/issues/130
http://lambda-the-ultimate.org/node/4982

한글 블로그입니다. 왜 마소에서 Rx 를 만들었을까에 대한 근원을 찾아가 보는 소개 글입니다.

http://huns.me/development/2051

Rx 는 FRP 의 부분확장팩 라이브러리라고도 볼수 있는데, FRP 를 쉽게 풀어 놓은 책이 얼마전에 번역됨.

함수형반응형프로그래밍

Rx자체에 대한 책도 있습니다.

Reactive Extensions in Action

RxJava를 활용한 리액티브 프로그래밍

Rx 의 근간이 되는 Functor, Monad 에 대한 설명 
https://gist.github.com/jooyunghan/e14f426839454063d98454581b204452

마지막으로 제가 작성한 스칼라에서의 Rx

스칼라 강좌 (39) 동시성을 위한 Observable



Rx vs FRP 의 간단한 의미

위에서는 좀 생각나는 데로 막 쓴 거 같은데요. 아래에는 핵심만 간추려 보겠습니다.

Rx

Rx 의 주요한 화두는 비동기 이벤트(데이터) 처리 라고 생각합니다. 즉 비동기를 어떻게 하면 잘 처리 할 수 있을 까라는 고심에서 나온 라이브러리이며, 비동기를 처리함에 있어서 함수적 처리를 통해 잘 구성할 수 있도록 해주는게 목표입니다. (본질은 조금 다른 거 같지만 실용적으로는 말이죠.) 
그 과정에서 데이터를 비동기로 보내주는 쪽을 Observable 이라고 명시하며, 받아서 처리하는 쪽을 Observer 라고 명시합니다. 데이터 스트림을 가지고 어떻게 합성(zip등) 하고 동기화하고 반응 하게 할 것이냐도 스케쥴러통해 선택 할 수 있으며 나름 구체적인 지침이 있는 라이브러리라고 보면 될 거 같습니다.

FRP

Rx 가 구체적인 라이브러리인 반면, FRP 는 개념에 더 가깝다고 볼 수 있습니다.  Functional reactive programming (FRP) 은 함수적 빌딩블럭(map.reduce,filter,fold) 를 이용하여 리액티브 프로그래밍 (비동기 데이터흐름 프로그래밍) 을 위한 파라다임인데요. 주로 GUI, 로보틱스,뮤직,IoT 센서데이터처리 등 명시적으로 시간을 모델링하는 프로그램을 단순화 시키기 위해 사용됩니다.  -위키백과-

코날엘리엇이라는 FRP 를 발명한 사람의 말에 따르면 FRP 의 가장 중요한 포인트는 표시적/시간연속적 입니다. 

표시적
은 각각의 타입과 구성요소의 의미를 정확히 지정해주는 엄밀하고,단순하며, 구현과는 무관한 합성 가능한 의미론을 기초로 해야한다는 것 입니다. 시스템에 대한 정확한 명세를 제공하며, 모든 경우의 모든 구성요소에 대해 합성성이라는 중요한 특성이 성립함을 증명해주는것이 랍니다. 그냥 "합성성" 이라는 단어만 머리에 넣어 두세요. 

시간연속적은 각자 공부해보는것으로 합시다. (저도 마찬가지..) 

왜 이런 방법론들이 나오게 되는것인지에 대한 개발자로써 쉽게 와닿을 수 있는 예시를 하나 보겠습니다.

{
  a();
  b():
  c();

   ..

}

절차지향에 익숙한 우리는 위의 함수들이 순서대로 발생할 것이라는 것을 염두해 두코 코딩을 하게 됩니다.
즉 c() 함수는 a() 함수와 b() 함수가 무엇인가 처리를 한 후에 그것을 가지고 처리한다는 거죠. 물론 순서가 중요하지 않을 때 도 있습니다.

이때 나중에 참여한 개발자가 저것을 다 파악하기란 힘들 것입니다. (물론 소스가 복잡해 진 다면 말이죠) 
또한 옵저버패턴이 사용된다면 그 순서란 더 감춰지기 마련이니 파악하기 힘들어 질 것입니다. 

의존 관계 와 순서는 이렇게 우리가 짜왔던 절차지향,객체지향에서는 혼동을 주게 되는데요. 
최근에 코어를 더 적극적으로 활용해야하는 시대에 와서 멀티쓰레드/비동기라는 동시성을 다루는 부분에 있어서 더더욱 순서와 의존관계를 파악하기 힘들어 졌습니다.

FRP 와 Rx 의 반응형이라는 말에 뒤에는 이런 순서&의존관계를 명확하게 인지 시키준다는 의미도 포함되어 있습니다. 앞으로 FRP 와 Rx 를 공부하는데 있어서 이 점을 머리속에 넣어 두면 좋을 거 같습니다.


RxPY 정의 

파이썬에서 LINQ 스타일의 쿼리 연산과 Observable 콜렉션들을 사용하여 비동기,이벤트-기반 프로그램을 구성하기 위한 라이브러리입니다.  참고로 O'Reilly 출판사는 Reactive Python for Data Science 동강을 만들었으며 아래는 소개 글을 퍼왔습니다 O'Reilly Safari

액티브 프로그래밍은 데이터 모델링의 미래를 말하고 있습니다. 리액티브를 사용하면 정적 데이터를 간결하게 처리하고 분석 할 수있을 뿐 아니라 실시간의 무한 피드(PUSH)에 의한 데이터를 효과적으로 처리 할 수 있습니다. Reactive Extensions (Rx)는 2009 년에 처음으로 탄생 하였으며,12 개 주요 언어 및 플랫폼에 이식되었습니다. 이 과정에서는 Python 데이터 분석 워크 플로우라는 문제에 대해 가벼운 Rx 파이썬 라이브러리인 RxPy를 사용하는 방법을 배우게 됩니다.

  • 데이터 과학에서 반응형 프로그래밍의 이점에 대한 상세한 고찰 
  • 푸시 기반 (push-based) vs 풀 기반 (pull-based) 이터레이션을 사용하여 문제를 "reactive way"으로 해결하는 방법.
  • 리액티브 프로그래밍이 왜 강력하고 간단하며 탄력적인 코드 모델을 생성하는지에 대한 이해 
  • 클러스터 컴퓨팅 하드웨어를 사용 할 수 없을 때 동시성을 위해 RxPy 활용 방법을 배웁니다.
  • RxPy 사용법을 익히고 모든 데이터 과학 작업을 위한 더 강력한 Python 코드 작성


RxPY 시작하기  

솔직히 저런 개념같은 거 다 집어 치우고, 그냥 냅다 코딩하는게 더 나을거 같습니다. 쉽게 가져다가 만들어 놓은 라이브러리를 그 개념을 완전히 이해해야 한다면 응용개발자들 입장에선 손해니까요~ 시간과 기회는 기다려 주지 않습니다. 각자의 본질적인 서비스에 주력하시고, 저런 이쁘니는 가져다가 사용하는데 주력합시다.


RxPY 설치하기


import rx 해서 사용하면 됩니다. - 끝 -

에러나는데요?

쏘오뤼~~

pip install rx

설치해주셔용~ ^^


RxPY Hello World

from rx import Observable, Observer


def push_hello_world(observer):
observer.on_next("hello")
observer.on_next("world")
#observer.on_error("error")
observer.on_completed()


class PrintObserver(Observer):
def on_next(self, value):
print("Received {0}".format(value))

def on_completed(self):
print("Done!")

def on_error(self, error):
print("Error Occurred: {0}".format(error))


if __name__ == '__main__':
source = Observable.create(push_hello_world)
source.subscribe(PrintObserver())

머리속으로 옵저버패턴을 생각하고 봐주세요. 거의 똑같습니다.
Observable 은 데이터를 가지고 있으며, 데이터를 Observer 에게 전파(Push) 시킵니다. 
데이터를 
종류에 따라서 next, completed,, error 로 Push 받은 Observer는 그것을 출력해 줍니다. 

"별거 없네~ 뭐 그냥 옵저버 패턴이구만" 이라고 생각하고 그냥 사용하세요. 좋은건 일단 쓰고 보는겁니다.
쓰다보면서 조금 다르네, 먼가 더 좋다~~ 라고 느끼면 되는겁니다.

아래는 옵저버를 하나 더 추가 해 봤습니다.

from rx import Observable, Observer


def push_hello_world(observer):
observer.on_next("hello")
observer.on_next("world")
#observer.on_error("error")
observer.on_completed()


class PrintObserver(Observer):
def on_next(self, value):
print("Received {0}".format(value))

def on_completed(self):
print("Done!")

def on_error(self, error):
print("Error Occurred: {0}".format(error))

class ListObserver(Observer):

def __init__(self):
self.my_list = []
def on_next(self, value):
self.my_list.append(value)

def on_completed(self):
print(self.my_list)
print("Done!")

def on_error(self, error):
print("Error Occurred: {0}".format(error))

if __name__ == '__main__':
source = Observable.create(push_hello_world)
source.subscribe(PrintObserver())
source.subscribe(ListObserver())


정리하면 

  • Observable 은 데이터를 생산하는 컴포넌트이며, 시간이 흐름에 따라서 여러 데이터 (혹은 이벤트, 상태)을 바깥으로 내보내고 (Push,Emit) 있습니다.  Observer 가 구독(subscribe) 하는 순간 부터 데이터를 내보냅니다.

  • Observer 는 Observable 에서 내보내는 3가지의 데이터(이벤트) 구분하여 처리 합니다. 

 - on_next :  일반적인 데이터(이벤트)를 처리합니다.
 - on_completed : 모든 데이터를 다 내보내었을 경우에 대해 처리합니다. 
 - on_error : Observable 에서 어떤, 예외가 생겼을 때 처리해 줍니다.

맛뵈기를 보았구요,  이제 RxPY 홈페이지에 있는 진짜베기 시작하기 문서를 보시죠.


진짜 RxPY 시작하기 

Rx 는 이벤트의 흐름을 다루는것에 관한 것입니다. Rx 를 통해 당신은 :

  • 다루기 원하는 데이터가 무엇인지 말하세요. (Observable) 
  • 어떻게 다루고 싶나요? (A composition of operators)
  • 결과(데이터) 를 가지고 하고 싶은것은 무엇입니까? (Observer)

Rx를 사용하여, 이벤트가 도착하는 시점에 이벤트를 통해 원하는 것을 설명 하는 것은 매우 중요합니다. 그것은 연산에 대한 선언적 구성으로써, Observer 에 도착할 때 이벤트를 처리하게 될 것입니다. 즉 아무 일도 일어나지 않으면 아무 것도 처리하지 않습니다.

1. Rx 모듈 임포트 


import rx
from rx import Observable, Observer


2. 시퀀트 제네레이팅 

이벤트를 발생시키는 방법중에 꽤 간단한 from_iterable 을 사용합니다.

class MyObserver(Observer):
    def on_next(self, x):
        print("Got: %s" % x)
        
    def on_error(self, e):
        print("Got error: %s" % e)
        
    def on_completed(self):
        print("Sequence completed")

xs = Observable.from_iterable(range(10))
d = xs.subscribe(MyObserver())
Got: 0
Got: 1
Got: 2
Got: 3
Got: 4
Got: 5
Got: 6
Got: 7
Got: 8
Got: 9
Sequence completed

subscribe 메소드에는 Observer 객체가 들어가지만, print 도 들어 갈 수 있으며 익명 Observer 로써 on_next 에 해당하는 역할을 하게 됩니다.

xs = Observable.from_(range(10))
d = xs.subscribe(print)
0
1
2
3
4
5
6
7
8
9

3. 시퀀스 필터링 (Filtering)

filter 메소드의 매개변수로 람다식을 사용하여 홀수만 Observer에게 전파하고 있습니다.
xs = Observable.from_(range(10))
d = xs.filter(
        lambda x: x % 2
    ).subscribe(print)
1
3
5
7
9

4. 시퀀스 변형 (Transforming)

map메소드의 매개변수로 람다식을 사용하여 데이터에 2배를 하여 Observer에게 전파하고 있습니다.
xs = Observable.from_(range(10))
d = xs.map(
        lambda x: x * 2
    ).subscribe(print)
0
2
4
6
8
10
12
14
16
18

map메소드의 두번째 파라미터로 인덱스를 넘겨 줄 수 도 있습니다.

xs = Observable.from_(range(10, 20, 2))
d = xs.map(
        lambda x, i: "%s: %s" % (i, x * 2)
    ).subscribe(print)

0: 20
1: 24
2: 28
3: 32
4: 36

5. 병합  (Merge)

2개의 Observable에서 부터 흘러 나오는 데이터를 모두 처리합니다. 순서는 보장 못합니다.
xs = Observable.range(1, 5)
ys = Observable.from_("abcde")
zs = xs.merge(ys).subscribe(print)
a
1
b
2
c
3
d
4
e
5

6. Rx의 시공간 (SpaceTime)

위의 모든 예에서 모든 이벤트는 동일한 순간에 발생하며 이벤트는 순서에 따라 분리됩니다. 위의 병합 작업 결과에 다음과 같은 몇 가지 유효한 결과가 있을 수 있으므로 많은 신규 사용자들이  Rx에 혼동에 빠지곤 합니다.

a1b2c3d4e5
1a2b3c4d5e
ab12cd34e5
abcde12345

Rx가 해주는 유일한 보장은 1이 2보다 앞에 나오며, 1은 ys 데이터 중간에 어디서나 나올 수 있다는 것입니다. 그것은 어떤 이벤트가 먼저 가야 할지를 결정하기 위해 스케줄러의 정렬 안정성을 향상시킵니다. 실시간 데이터 스트림의 경우 실제 시간으로 이벤트가 분리되므로 문제가 되지 않습니다. 예상 한 결과를 얻으려면 Rx로 재생할 때 이벤트 사이에 시간을 추가하는 것이 좋습니다.

7. Marbles and Marble 다이어그램


이전 섹션에서 언급되었듯 Rx 및 RxPY로 데이터를 전파 할 때 시간을 추가하는 것이 좋습니다. 가장 좋은 방법은 Marble 다이어그램을 가지고 놀 수 있는 Marble 테스트 모듈을 사용하는 것인데 Marble 모듈은 Observable에 두 가지 새로운 확장 메소드를 추가하는데 각각은 from_marbles () 및 to_marbles ()입니다.

예제:

  1. res = rx.Observable.from_marbles("1-2-3-|")
  2. res = rx.Observable.from_marbles("1-2-3-x", rx.Scheduler.timeout)

문자열은 특별한 문자들로 구성됩니다:

    - = Timespan of 100 ms (100ms 간격) 
    x = on_error()
    | = on_completed()
모든 문자는 문자열에서 발견되는 순간에 on_next () 이벤트로 처리됩니다. 다중 문자 값을 나타내야 할 경우 "1- (42) -3"과 같이 대괄호로 그룹화 할 수 있습니다.

제대로 해보죠.
from rx.testing import marbles

xs = Observable.from_marbles("a-b-c-|")
xs.to_blocking().to_marbles()

'a-b-c-|'

이제 Marble 문자열에 x를 삽입하여 짝수 스트림에 오류를 추가하여 보겠습니다.

xs = Observable.from_marbles("1-2-3-x-5")
ys = Observable.from_marbles("1-2-3-4-5")
xs.merge(ys).to_blocking().to_marbles()

'11-22-33-4x'

8. Subject and Stream

observable stream 을 만드는 간단한 방법은 Subject 을 사용하는 것입니다. 참고로 GOF 의 Design Patterns 책의 Observer 패턴에는 Observable 부분을 Subject 라고 부릅니다만 어쨌든 여기서 subject 는 Observable 처럼 subscribe 를 제공하며, on_next 를 통해 손쉽게 전파할 데이터를 입력 받을 수 있습니다.

from rx.subjects import Subject

stream = Subject()
stream.on_next(41)

d = stream.subscribe(lambda x: print("Got: %s" % x))

stream.on_next(42)

d.dispose()
stream.on_next(43)

Got: 42


9. Multicasting

Observable 에 대한 각 구독자들은 종종 별도의 방출 스트림을 수신 받게 됩니다. 예를 들어, Observable에 두 명의 구독자가 있고, 세 개의 임의의 정수가 방출 될 때, 각 구독자들은 서로 다른 숫자를 갖게 됩니다. 아~~뭔 설명이 더 헤깔리게 하네요. 개발자는 코드죠. 그냥 코드를 보면 쉽게 이해 됩니다.

from rx import Observable from random import randint three_emissions = Observable.range(1, 3) three_random_ints = three_emissions.map(lambda i: randint(1, 100000)) three_random_ints.subscribe(lambda i: print("Subscriber 1 Received: {0}".format(i))) three_random_ints.subscribe(lambda i: print("Subscriber 2 Received: {0}".format(i)))

OUTPUT:

Subscriber 1 Received: 79262
Subscriber 1 Received: 20892
Subscriber 1 Received: 69197
Subscriber 2 Received: 66574
Subscriber 2 Received: 41177
Subscriber 2 Received: 47445

Observable 체인의 특정 지점에 모든 구독자에게 동일한 값을 Push 하기 위해 publish()를 호출하여 ConnectableObservable을 반환 할 수 있습니다. 그런 다음 구독자를 설정(subscribe) 하고 connect ()를 호출 하면 됩니다. 역시 소스를 보시죠.

from rx import Observable from random import randint three_emissions = Observable.range(1, 3) three_random_ints = three_emissions.map(lambda i: randint(1, 100000)).publish() three_random_ints.subscribe(lambda i: print("Subscriber 1 Received: {0}".format(i))) three_random_ints.subscribe(lambda i: print("Subscriber 2 Received: {0}".format(i))) three_random_ints.connect()

OUTPUT:

Subscriber 1 Received: 90994
Subscriber 2 Received: 90994
Subscriber 1 Received: 91213
Subscriber 2 Received: 91213
Subscriber 1 Received: 42335
Subscriber 2 Received: 42335

connect ()를 호출하기 전에 모든 Observers를 설정해야합니다.

mutural을 구현하는 또 다른 방법은 ConnectableObservable에서 auto_connect () 연산자를 사용하는 것입니다.
매개변수로 넣은 값에 따라서 구독자가 subscribe 되면 데이터(이벤트)를 내보내기 시작합니다.

from rx import Observable from random import randint three_emissions = Observable.range(1, 3) three_random_ints = three_emissions.map(lambda i: randint(1, 100000)).publish().auto_connect(2) three_random_ints.subscribe(lambda i: print("Subscriber 1 Received: {0}".format(i))) three_random_ints.subscribe(lambda i: print("Subscriber 2 Received: {0}".format(i))) # second subscriber triggers firing


10. Combining Observables

Observable.merge (), Observable.concat (), Observable.zip () 및 Observable.combine_latest ()와 같은 팩터 리를 사용하여 서로 다른 Observables를 함께 작성할 수 있으며  Observables가 다른 스레드 (subscribe_on () 및 observe_on () 연산자 사용)에서 작업하는 경우에도 안전하게 결합됩니다.

다음 예에서는 Observable.zip ()을 사용하여  5 개의 문자열과 다른 Observable 에서 나오는 값을 튜플로 zip 하여 처리 할 수 있게 됩니다.  두개의 Observable 중 작은 개수의 데이터 스트림만큼 zip 됩니다.

from rx import Observable letters = Observable.from_(["Alpha", "Beta", "Gamma", "Delta", "Epsilon"]) intervals = Observable.interval(1000) Observable.zip(letters, intervals, lambda s, i: (s, i)).subscribe(lambda t: print(t)) input("Press any key to quit\n")

OUTPUT:

('Alpha', 0)
('Beta', 1)
('Gamma', 2)
('Delta', 3)
('Epsilon', 4)

11. Concurrency

동시성 위해 RxPY 는 subscribe_on ()observe_on () 두 연산자를 사용하며 두 작업 모두 작업을 수행하기 위해 각 구독에 대한 스레드를 제공하는 스케줄러가 필요합니다 (아래 스케줄러 섹션 참조). ThreadPoolScheduler는 재사용 가능한 작업자 스레드 풀을 만드는 좋은 선택이 될 수 있습니다.

파이썬의 GIL은 다중 스레드가 동일한 코드 행을 동시에 액세스 할 수 없으므로 동시성 성능을 저하시킬 수 있습니다. NumPy와 같은 라이브러리는 GIL을 해제 할 때 병렬 집약적인 계산을 위해 이를 완화 할 수 있습니다. RxPy 또한 스레드 오버랩을 어느 정도 최소화 할 수 있습니다. 동시성으로 애플리케이션을 테스트하고 성능이 향상되는지 확인하십시오.

subscribe_on ()은 사용 할 스케쥴러 체인의 시작 부분에서 Observable 소스를 가르킵니다. 이 연산자를 넣는 위치는 중요하지 않습니다만 observe_on ()은 Observablechain의 해당 시점에 다른 스케줄러로 전환하여 한 스레드에서 다른 스레드로 효과적으로 이동시킵니다. Observable.interval () 및 delay ()와 같은 일부 Observable 팩토리 및 연산자에는 이미 기본 스케줄러가 있으므로 사용자가 지정한 subscribe_on ()을 무시합니다 (일반적으로 스케줄러를 인수로 전달할 수 있음).

아래에서는 observe_on ()뿐만 아니라 subscribe_on ()을 순차적으로 사용하는 대신 세 가지 다른 프로세스를 동시에 실행하는 모습을 보여줍니다.

import multiprocessing
import random
import time
from threading import current_thread

from rx import Observable
from rx.concurrency import ThreadPoolScheduler


def intense_calculation(value):
    # sleep for a random short duration between 0.5 to 2.0 seconds to simulate a long-running calculation
    time.sleep(random.randint(5, 20) * .1)
    return value


# calculate number of CPU's, then create a ThreadPoolScheduler with that number of threads
optimal_thread_count = multiprocessing.cpu_count()
pool_scheduler = ThreadPoolScheduler(optimal_thread_count)

# Create Process 1
Observable.from_(["Alpha", "Beta", "Gamma", "Delta", "Epsilon"]) \
    .map(lambda s: intense_calculation(s)) \
    .subscribe_on(pool_scheduler) \
    .subscribe(on_next=lambda s: print("PROCESS 1: {0} {1}".format(current_thread().name, s)),
               on_error=lambda e: print(e),
               on_completed=lambda: print("PROCESS 1 done!"))

# Create Process 2
Observable.range(1, 10) \
    .map(lambda s: intense_calculation(s)) \
    .subscribe_on(pool_scheduler) \
    .subscribe(on_next=lambda i: print("PROCESS 2: {0} {1}".format(current_thread().name, i)),
               on_error=lambda e: print(e), on_completed=lambda: print("PROCESS 2 done!"))

# Create Process 3, which is infinite
Observable.interval(1000) \
    .map(lambda i: i * 100) \
    .observe_on(pool_scheduler) \
    .map(lambda s: intense_calculation(s)) \
    .subscribe(on_next=lambda i: print("PROCESS 3: {0} {1}".format(current_thread().name, i)),
               on_error=lambda e: print(e))

input("Press any key to exit\n")

OUTPUT:

Press any key to exit
PROCESS 1: Thread-1 Alpha
PROCESS 2: Thread-2 1
PROCESS 3: Thread-4 0
PROCESS 2: Thread-2 2
PROCESS 1: Thread-1 Beta
PROCESS 3: Thread-7 100
PROCESS 3: Thread-7 200
PROCESS 2: Thread-2 3
PROCESS 1: Thread-1 Gamma
PROCESS 1: Thread-1 Delta
PROCESS 2: Thread-2 4
PROCESS 3: Thread-7 300
...

12. Schedulers

RxPY에서는 비동기적으로 실행되도록 선택하거나 스레드를 사용하여 작업 및 시간 초과를 예약하도록 결정할 수도 있습니다.  좋아하는 Python 프레임 워크에서 RxPY를보다 쉽게 사용할 수 있도록 Python 관련 메인 루프 스케줄러가 많이 있습니다.

  • ThreadPoolScheduler to create a fixed sized pool of Schedulers.
  • NewThreadScheduler to create a new thread for each subscription
  • AsyncIOScheduler for use withAsyncIO. (requires Python 3.4 ortrollius, a port of asyncio compatible with Python 2.6-3.5).
  • EventLetEventScheduler for use with Eventlet.
  • IOLoopScheduler for use withTornado IOLoop. See theautocomplete and konamicode examples for how to use RxPY with your Tornado application.
  • GEventScheduler for use with GEvent. (Python 2.7 only).
  • TwistedScheduler for use with Twisted.
  • TkinterScheduler for use with Tkinter. See the timeflies example for how to use RxPY with your Tkinter application.
  • PyGameScheduler for use with PyGame. See thechess example for how to use RxPY with your PyGame application.
  • QtScheduler for use withPyQt4,PyQt5, andPySide. See thetimeflies example for how to use RxPY with your Qt application.
  • GtkScheduler for use withPython GTK+ 3. See thetimeflies example for how to use RxPY with your GTK+ application.
  • WxScheduler for use with wxPython. See thetimeflies example for how to use RxPY with your wx application.


이상 시작하기 부분은 모두 끝마쳤습니다.~ 더 자세한 내용은 소스와 홈페이지를 참고하세요.


레퍼런스:

https://github.com/ReactiveX/RxPY
https://jakubturek.com/functional-reactive-programming-in-python/


저작자 표시 비영리 동일 조건 변경 허락
신고
Posted by [前草] 이승현 (wowlsh93@gmail.com)


문제 공유

우리는 오랫 동안 상호작용 되는 많은 부분에 있어서 옵저버패턴을 당연하듯 활용해 왔지만,

옵저버(관찰자, 소비자, 리스너) 패턴을 사용하다보면 경험 많은 개발자라면 누구나 "아 이거 먼가 깨름칙 한데" 라는 경험을 해보았을 것이다. 나 같은 평범한 개발자의 경우 그런 깨름칙한 냄새를 맡고서도, "내가 모자라서 그렇지 뭐" 자책을 하거나,  "여기서 어떻게 더 잘 고칠수 있지? 옵저버패턴은 Gof 패턴 중 하나이며 훌륭한것이니 더 나은것은 없을 거야" 라고 이른 만족을 하거나, "그냥 잘 굴러가는 거 같아 보이니, 냅두자", "나는 코드를 잘 이해하고 있어, 다른 신참이나 이해 부족한 개발자 네 탓" 이 라고 기술 부채를 남기며 자기 최면을 건다든지 할 것이다. 

하지만 역시 구루님들은 달랐다. 옵저버패턴의 문제점을 요목조목 따져가며 샅샅히 지적을 했으며, 그것에 대한 해결책까지 나왔다.  따라서 이젠 거인의 어깨에 올라 타서 그것에 관한 이해하고, 잘 만들어진 라이브러리를 사용하면 된다. 그 전반적인 내용에 대해서 설명 해 볼 예정이지만 모든 것을 쉽게 풀어서 적을 수는 없기에,  중간중간 이해 하기 힘든 구멍들은 각자 수고스럽지만 찾아보거나, 깊은 사색을 해야 할 것이다. 


옵저버 패턴

어떤 상태를 관리하는 Subject 객체(상태머신,생산자,Observable)가 있고 , 이 객체에서 일어나는 상태 변화에 따라서 해당 이벤트를 받길 원하는 옵저버(관찰자,소비자)들은 매니져 객체에 자신을 등록 한다.

그 후에, Subject 객체에서 어떤 상태가 변경 되었을 때 , 자신에게 등록된 옵저버들에게 이벤트를 notify 해주는게 옵저버 패턴의 주요 골자이며, Subject 객체은 관찰자들에 대해서 유연하게 커플링 되어 있게 된다. 즉 매니저 객체가 소비자들에 대해서 정확한 정보를 알 필요가 없이, 소비자들이 알아서 Subject 객체가 선언한 인터페이스를 따라주고, 등록해주면 되므로, 매니저 객체는 독립적인 컴포넌트가 될 수 있는 여지가 생기는데 옵저버패턴은 그런 유연성이라는 장점을 내세우는 패턴이라고 할 수 있다.

자~ 그럼 옵저버패턴은 어디서 사용 될 까?

글쓴이 본인은 주로 그래픽스/편집기 솔루션을 개발한 경험이 많기 때문에 관련하여 설명 해본다. (이러한 곳에서 정말 많이 사용된다) 


각종 도형의 리스트가 나열 되어 있는 리스트 박스가 있다. 우리는 그 리스트에 추가,삭제를 할 수 있는데, 도형 하나를 추가하면 (상태를 변경하면) 그 소식을 리스트의 데이터와 밀접한 관계를 가지는 다른 컴포넌트(객체) 들에게도 연락을 해줘야 한다. 만약 삼각형3을 추가해줬다면 아래와 같은 변경이 전파되어야 한다.

- 마우스 커서는 삼각형3에 해당하는 커서모양으로 변경
- 상태바에는 삼각형3가 추가되었다고 글씨가 추가
- 뷰화면에는 삼각형3에 해당하는 도형이 그려짐.

GUI 부분에서는 이것 말고도 매우 다양하게 사용되는데 다른 예는 마우스 버튼이 Subject 가 되며, 버튼이 클릭되는 상태변경 (좌클릭,우클릭,더블클릭등) 에 따라서 옵저버들이 고지 받아서 행동(선 그리기)하게 될 것이다. 

웹이나 통신 개발에서의 예를 들면 아래 처럼 구성이 될 것이다.

MVC 패턴에서, M (model) 은 subject 역할을 맡고, V(view) 는 Observer 역할을 맡는 것도 유추 할 수 있다. 


옵저버 패턴의 특징 

디커플링 : 상태머신이 자신이 호출해 줘야 할 객체들을 모두 강하게 내부 변수로 가지고 있게 되면, 상태머신 자체를 다른 곳에서 재활용하기 힘들게 된다.



제어역전 : 옵저버 패턴은 전형적인 제어 역전 구조로 (사실 제어역전이란 말은 모호하기 때문에 그냥 DI 로 사용하는게 나음) 옵저버들이 상태머신을 계속 polling 하거나, 상태머신을 호출하는 것이아니라,  상태머신에 자신의 레퍼런스를 넘겨서 (addObserver , addListener , subscribe ) 상태머신에 의해 콜백되게 만든다.



옵저버 패턴의 문제 

1. 예측 불가능한 순서

Subject (상태머신) 자체내에서는 옵저버들을 보통 리스트를 순회하며 Notify 를 해주게 되지만, 옵저버들의 추가,삭제가 자유로이 이루어지고 있고, 개별 옵저버 입장에서는 자신이 호출되는 순서에 대해서 알 수가 없다. 따라서 상태변경에 따른 행위를 할 때 , 자신의 순서 앞에서 어떤 다른 옵저버가 어떤 행위를 했는지를 미리 알 수가 없게 된다. 제어권을 Subject 로 제어역전을 시켜 준 결과 이렇게 되버리는데, 이것을 해결 하기 위해서는 옵저버들 전체를 일종의 트랜잭션으로 감싸서 트랙잭션이 끝날 때 어떤 행위를 하게 끔 하는 수 밖에 없지만, 코드복잡도가 상당히 올라가게 마련이다. 

2. 첫번째 이벤트 소실

Subject(상태머신) 에 DI 를 해주는 시점이, Subject 에서 첫번째 이벤트(상태변경)가 발생하는 시점 보다 늦게 될 수가 있다. 예를들어 클라이언트가 접속되었다는 이벤트를 통지 받지 못한다면, 클라이언트와 상호통신하는 옵저버는 무용지물이 될 것이다.

3. 지저분한 상태

Subject(상태머신) 은 위에서 보았다시피, 계속 변경되기 때문에 그로 인한 부수효과(side-effects) 가 발생하기 마련이다. 상태가 1~2개라면 모르겠으나, 상태가 만약 5개이상을 가지고 있다고 하자. 그 상태를 변경하는 이벤트들의 종류가 10가지 라고하면, 50개의 조합이 생겨난다. 그런 조합에 의해 옵저버들이 호출 되었는데, 먼가가 작동을 안하는 버그가 생겼을 때 , 현재 상태가 올바른 상태인지에 대한 디버깅이 힘들어지기 마련이다. 

4. 캡슐화 문제

옵저버패턴은 캡슐화를 종종 깨버리는데,  상태머신의 변경에 따라서 a 옵저버가 mylist 라는 변수를 초기화 시키고, b 옵저버는 mylist 라는 변수를 사용하게 되는 경우가 많이 있다.

5. 스레드 안전 문제

가장 곤혹스러운 문제이다. 일단 Subject (상태머신) 내에서도 락이나 경쟁관계를 해소해야하지만, 그것 보다 더 큰 문제는 각각의 옵저버들과 그 옵저버들이 호출하는 함수체인 속에서 어떤 락을 잡고 있는지 알 수 없게 되는 경향이 있다. 즉 A 옵저버가 a 락을 잡고 b 락을 잡으려고 하지만, B 옵저버가 이미 b락을 잡고 있다면 (a락을 잡아야 b락을 풀어준다면) 터지는거다. 역시 상태를 가지고있는 모든 OOP 프로그래밍에서의 쓰레드 사용은 큰 문제거리가 될 수 밖에 없을 것이다. 더군다나 옵저버패턴처럼 제어권이 역전 된 상태이면 더더욱~

6. 콜백 누수

가장 옵저버(리스너)를 등록 시킨 후에, 쓸모가 없어 졌을때 removeObserver, removeListener, dipose 등을 호출하는 것을 잊어 버렸다고 하자. 앞으로 상태가 변경 될 때 마다 쓸때없는 CPU 사이클만 날려버릴 것이다. 
옵저버 패턴이 생산자와 소비자의 제어 관계를 자연스럽게 역전 시켜서 생산자가 소비자에게 의존 하지 못하게 만드는 것이지만, 생산자의 실수를 소비자는 알 수가 없게 된다. 이상적이라면 이 관계를 다시 역전 시켜야 한다. 

7. 의도치 않은 재귀 

실제 현업에서 복잡한 솔루션을 짤 때, 이 문제도 쓰레드 문제와 같이 가장 크게 다가오곤 한다. 예를들어 커맨드 패턴에서 execute 가 발생해서 -> 상태머신(S) 의 상태를 변경하면 -> 옵저버 A가 호출되고 -> 옵저버 B가 호출되고 -> ... -> 옵저버A 는 어떤 다른 함수를 호출하고 그 함수는 -> 무엇인가를 하고 -> 여기서 끝나야 하지만 ->  마지막 함수는 다시 상태머신(S) 를 변경한다. 

바보라고 말 할 수 있겠지만, 정말 복잡한 수백만 라인의 코드에서는 종종 일어나는 일이다. 자신이 코드의 모든 곳을 속속들이 알지 못하는 신참일 경우, 일 부분에 대해서만 작업을 하게 되는데 자신의 작업이 가져 올 여파까지는 미리 알 수가 없는 상황인 경우가 발생한다.

8. 기타등등

Composability, SOC , Scalabilty, Abstraction, Resource management, Semantic distance 등이 있으며. 아래 레퍼런스 중 첫번째 마틴오더스키의 논문에 짧은 코멘트가 적혀져 있다.


자! 이 모든 문제를 해결 해야 할 때가 왔다. FRP(Functional Reactive Programming) 이 그것을 해준다.

다음 편에서는 아래의 주제로 이것에 대한 해결 방안들을 알아 보자.



함수적 반응형 프로그래밍 (FRP) 

먼저 리액티브라는 단어에는 다양한 의미가 있지만 가장 개발자로써 와닿을 수 있는 예시를 하나 보자.

{

  a();

  b():

  c();

   ..

}

절차지향에 익숙한 우리는 위의 함수들이 순서대로 발생할 것이라는 것을 염두해 두코 코딩을 하게 된다.

즉 c() 함수는 a() 함수와 b() 함수가 무엇인가 처리를 한 후에 그것을 가지고 처리한다는 건데, 물론 순서가 중요하지 않을 때 도 있다. 이때 나중에 참여한 개발자가 저것을 다 파악하기란 힘들 것이다. (물론 소스가 복잡해 진 다면 말이죠) 또한 옵저버패턴이 사용된다면 그 순서란 더 감춰지기 마련이니 파악하기 힘들어 질 것이다.

의존 관계 와 순서는 이렇게 우리가 짜왔던 절차지향,객체지향에서는 혼동을 주게 되는데 최근에 코어를 더 적극적으로 활용해야하는 시대에 와서 멀티쓰레드라는 동시성을 다루는 부분에 있어서 더더욱 순서와 의존관계를 파악하기 힘들어 졌다.

FRP 와 Rx 의 반응형이라는 말에 뒤에는 이런 순서&의존관계를 명확하게 인지 시키준다는 의미도 포함되어 있으며 앞으로 FRP 와 Rx 를 공부하는데 있어서 이 점을 머리속에 넣어 두면 좋을 거 같다.


p.s

Reactive 도 이해 못하겠는데, Proactive 가 조만간 나올거 같은 느낌적인 느낌이라....무섭다..


레퍼런스:

https://infoscience.epfl.ch/record/148043/files/DeprecatingObserversTR2010.pdf
Functional Reactive Programming 
https://www.scala-lang.org/
https://github.com/ReactiveX/RxJava
https://github.com/SodiumFRP/sodium
https://github.com/ReactiveX/RxPY

저작자 표시 비영리 동일 조건 변경 허락
신고
Posted by [前草] 이승현 (wowlsh93@gmail.com)