관리 메뉴

HAMA 블로그

스칼라 강좌 (39) 동시성을 위한 Observable 본문

Scala

스칼라 강좌 (39) 동시성을 위한 Observable

[하마] 이승현 (wowlsh93@gmail.com) 2017. 2. 21. 16:08


* 이 글은 일단 발행은 하는데 오류가 있을 가능성도 있으며 수정&발전 될 것입니다. 


동시성을 위한 스칼라 Observable 

스칼라의 Observable 을 배우기 전에 여러가지 것들에 대해서 편하게 읽어보자.

Play 의 Iteratee & Enumerator 

Play 에서 Iteratee / Enumerator / Enumeratee을 간단하게 말하면 :

이름설명
Iteratee [E, A]

 Iteratee [E, A]는 함수형 프로그래밍에서 iteration 컨셉의 일반화. E 입력,A 출력 (소비자역할)

Enumerator [E]컬렉션을 일반화 한 것으로 형태 E를 열거한다. 무한 열거 (Streaming) 할 수도있다. (생산자역할)
Enumeratee [E, A]거의 사용하지 않기 때문에 지금은 생각하지 좋다.

과 같다.

먼저 Iteratee 에 대해서 알아보자. Iteratee 는 보다시피 Iterator 와 먼가 관련이 있어 보이는데 자바에서 Iterator 가 어떻게 활용되는지 살펴보면 

val l = List(1, 234, 455, 987)

var total = 0 // will contain the final total
var it = l.iterator
while( it.hasNext ) {
total += it.next
}

total
=> resXXX : Int = 1677

짧게 정리하면  (1,234,455,987).iterating( .... total += it.next ....)  이렇게 되는데 
앞에 데이터가 있고 그것을 순회하면서 어떤 행동 하게 된다. 

즉 데이터 + 순회 + 행동 으로 이루어져있는데  데이터가 정해져있다. 강결합이다. 

여기서 이것을 느슨하게 만들어 줄 방법에 대해서 생각해보면 iterator 를 일반화 하는것에 미치게 되는데 앞에 데이터가 무엇이 오건간에 순회하면서 행동을 하게 하자는 것이다. 즉 데이터는 주입받자는 것이다. 제어역전!! 

iteratee = 순회(iterating) + 행동(react) 
Enumerator  = 데이터 

Enumerator 는 정해진 콜렉션의 데이터가 아니라 비동기적으로 데이터를 생산하는 것으로 또 일반화된다.  


자바의 Observable

자바의 Observable 에 대한 정말 쉽고 자세히 설명하는 토비님의 동영상 을 먼저 참고하자
토비의 봄 스프링 Reactive programming


에릭마이어의 Duality 

위의 Play 에서 생산자 (Enumerator ) 와 소비자 (Iteratee)가 있듯이, 다른 언어,라이브러리에서도 이와 비슷한 것들이 다양한 이름으로 만들어지고 있다. 범람하고 있다는 표현이 더 어울리겠다.

데이터를 보내주는이 <---> 데이터를 받는이 

이렇게 상반되는 개념을 지칭하는 개념을 "duality"  라고도 부르는 모양이다.
더 정확한 정보는 동영상도 참고하시고..

에릭마이어가 설명하는 Duality 


Gof 의 옵저버 패턴 

이렇게 이벤트를 주고 받는 모습은 기존에 Gof 패턴에서 Observer 패턴이 비슷한 형태를 보여줬고, 예를들어 이 패턴을 설명해보면 어떤 매니져 객체가 있고 , 이 객체에 어떤 이벤트를 받길 원하는 옵저버들이 매니져 객체에 등록한다. 
매니져에서 어떤 이벤트를 감지하면 , 자신에게 등록된 옵저버들에게 이벤트를 notify 해주는게 골자이다.

GUI 프로그래밍에서 특히 당연하게 사용되고 있다. 
어떤 데이터를 다루는 다양한 View 가 있을때, 어떤 하나의 View를 통해 데이터에 변화를 사용자가 주면, 나머지 View에게도 그 변화를 전파하기 위한 설계에 사용되는 것이다. 

이미지에서 Observer 는 알림을 받는 주체이다.
Subject 는 알림을 주는 매니져이다.
Observer 는 자신을 Subject 에게 attach 를 통해 알려주고
Subject 는 어떤 이벤트가 생겼을때 Observer 들에게 update 를 해준다. 


POSA2 의 생산자-소비자 패턴 (멀티쓰레드 패턴중 하나) 

멀티쓰레드 디자인패턴의 꽃이라고 한다면 단연코 "생산자-소비자" 패턴이라고 할 수 있다. 멀티쓰레드/서버코드를 작성할때 거의 무조건 "생산자-소비자" 패턴이 사용되기 마련이며, 다른 고차원 패턴들 (예를들면 node.js 의 기반패턴인 react 패턴) 의 기반이 되면서 동시에 멀티쓰레드 코어패턴을 포함하고 있는 , 즉  "허리" 역할을 제대로 하고 있는 패턴이라고 볼수 있기 때문에 아주 중요하다고 볼 수 있다.

이것도 역시 이벤트(데이터) 를 주는 놈이 있고 받는 놈이 있는데 


보다시피 매우 단순하다. Thread 1이 생산자 Thread 2가 소비자가 된다.


Scala 에서 Observable 사용하기 

이제 스칼라에서  Observable 를 사용하는 방식에 대해서 알아보자.
주거니 받거니 (Polling 이 아니라 Push 를 통해)  하는 기본적인 매커니즘은 같다. 다만 기능이 추가되었고 비동기로 사용가능하도록 확장 되었다. 


강결합된 데이터를 이미 가지고 있는 생산자 (Observable)


object ObservablesItems extends App {
import rx.lang.scala._

val o = Observable.items("Pascal", "Java", "Scala")
o.subscribe(name => log(s"learned the $name language"))
o.subscribe(name => log(s"forgot the $name language"))

Thread.sleep(1000)
}

코드를 보면 Observable 은 어떤 데이터(이벤트) 를 가지고 있다. 생산자 역할을 한다.
소비자는 ?? 그렇다 name => log(s"learned the $name language")  이 함수리터럴이 담당한다.

(옵저버 패턴으로 비교하면, 생산자: Subject , 소비자: Observer 이며, 따라서 위의 subscribe 는 
 옵저버패턴에서는 addObserver 나 addListener 메소드와 매칭된다.

생산자(Observable) 은 자신이 보낼(Push) 할 이벤트를 처리할 친구들을 subscribe 메소드로 모집하고 있다.

* 위의 코드를 실행하려면 아래 rxjava 를 가져와야한다. 

libraryDependencies += "com.netflix.rxjava" % "rxjava-scala" % "0.19.1"


정해진 시간 이후에 알림을 주는 생산자 (Observable)

val o = Observable.timer(5.second)
o.subscribe(_ => log(s"Timeout!"))
o.subscribe(_ => log(s"Another timeout!"))

5초 이후에 로그를 찍는다.


에러가 발생했음을 알려주는 생산자 (Observable)

val o = Observable.items(1, 2) ++ Observable.error(new RuntimeException) ++ Observable.items(3, 4)
o.subscribe(
x => log(s"number $x"),
t => log(s"an error occurred: $t")
)

subscribe 는 2개의  매개변수로 받는다. 두번째는 예러 발생시 핸들링할 함수이다.
에러(예외) 이 후의 데이터 처리는 하지 않는다.

지금까지는 subscribe 메소드에 

def subscribe(onNext: T => Unit): Subscription = {
asJavaObservable.subscribe(scalaFunction1ProducingUnitToAction1(onNext))
}

onNext 에 해당하는 함수 리터럴을 넣었지만

이제 Observer 객체를 넣어 보도록하자.


Observer (소비자) 에게 데이터를 건네주는 생산자 (Observable)

val classics = List("Il buono", "Big", "Die Hard")
val o = Observable.from(classics)

o.subscribe(new Observer[String] {
override def onNext(m: String) = log(s"Movies Watchlist - $m")
override def onError(e: Throwable) = log(s"Ooops - $e!")
override def onCompleted() = log(s"No more movies.")
})

생산자(Observable) 에 강결합된 리스트를 넣어주었다.
소비자에 Observer 객체를 만들어서 넣어준다.

onNext : 생산자에서 주는 데이터(이벤트) 를 처리하는 메소드
onError : 생산자에서 발생한 예외를 처리하는 메소드
onCompleted : 생산자에서 이제 모든 데이터를 처리하였다고 알려주는 메소드 


Observable 과 Future 

val f = Future {
Thread.sleep(500)
"Back to the Future(s)"
}

val o = Observable.create[String] { obs =>
f foreach {
case s =>
obs.onNext(s)
obs.onCompleted()
}
f.failed foreach {
case t => obs.onError(t)
}
Subscription()
}

o.subscribe(log _)

- Future 를 하나 만들었다. 3초 있다가 문자열 하나를 리턴해주는 행동을 하는 ~
- Observable 에는 기존처럼 컬렉션의  강결합된 데이터가 엮여있는게 아니라 , Future 가 실행되고 그 리턴값들이 source 데이터로 활용된다.
- 그 데이터 (Future 가 리턴한 값) 를 소비자에게 전달한다. 
- 소비자는 log _   즉 로그를 찍는 함수이다 

val o = Observable.from(Future {
Thread.sleep(500)
"Back to the Future(s)"
})

o.subscribe(log _)

다음처럼 Future 로 부터 직접 생산될 수도 있다.


Observable 과 Combinator

val roles = Observable.items("The Good", "The Bad", "The Ugly")
val names = Observable.items("Clint Eastwood", "Lee Van Cleef", "Eli Wallach")
val zipped = names.zip(roles).map { case (name, role) => s"$name - $role" }

zipped.subscribe(log _)

함수형 개발의 특징 답게 두개의 생산자로 부터 데이터를 결합하여 소비자에게 전달할 수도 있다.

main: Clint Eastwood - The Good
main: Lee Van Cleef - The Bad
main: Eli Wallach - The Ugly

이렇게 출력된다.

* zip 은 2개의 컬렉션에서 하나씩 가져와서 튜플을 만든다. zipWithIndex 는 하나를 1,2 등 숫자로 매핑 
* map 은 컬렉션을 변화시켜서 새로운 컬렉션을 만든다. 그 자신을 변형시키진 않는다. 


Observable 과 Subscription

import rx.lang.scala._
import org.apache.commons.io.monitor._

def modifiedFiles(directory: String): Observable[String] = {
Observable.create { observer =>
val fileMonitor = new FileAlterationMonitor(1000)
val fileObs = new FileAlterationObserver(directory)
val fileLis = new FileAlterationListenerAdaptor {
override def onFileChange(file: java.io.File) {
observer.onNext(file.getName)
}
}
fileObs.addListener(fileLis)
fileMonitor.addObserver(fileObs)
fileMonitor.start()

Subscription { fileMonitor.stop() }
}
}

log(s"starting to monitor files")
val subscription = modifiedFiles(".").subscribe(filename => log(s"$filename modified!"))
log(s"please modify and save a file")

Thread.sleep(10000)

subscription.unsubscribe()
log(s"monitoring done")

- apache.commons 는 스칼라 사용자에게도 보물창고이다. 
- monitor 는 파일의 변화를 감지해서 우리의 생산자에게 알려준다.
- 우리의 생산자는 파일의 이름을 자연스럽게 (논블럭) 소비자에게 보내준다. 



레퍼런스:

Learning Concurrent Programming in Scala by Aleksandar Prokopec


Comments