일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- Akka
- Golang
- hyperledger fabric
- 하이퍼레저 패브릭
- play2 강좌
- 그라파나
- 파이썬 머신러닝
- 스칼라 강좌
- Play2 로 웹 개발
- 스위프트
- 이더리움
- CORDA
- 엔터프라이즈 블록체인
- 파이썬 강좌
- Play2
- 스칼라
- 파이썬
- 파이썬 데이터분석
- akka 강좌
- Hyperledger fabric gossip protocol
- 파이썬 동시성
- play 강좌
- 블록체인
- 플레이프레임워크
- 스칼라 동시성
- 하이브리드앱
- Actor
- 주키퍼
- Adapter 패턴
- 안드로이드 웹뷰
- Today
- Total
HAMA 블로그
스칼라 강좌 (39) 동시성을 위한 Observable 본문
* 이 글은 일단 발행은 하는데 오류가 있을 가능성도 있으며 수정&발전 될 것입니다.
동시성을 위한 스칼라 Observable
스칼라의 Observable 을 배우기 전에 여러가지 것들에 대해서 편하게 읽어보자.
Play 의 Iteratee & Enumerator
Play 에서 Iteratee / Enumerator / Enumeratee을 간단하게 말하면 :
이름 | 설명 |
---|---|
Iteratee [E, A] | Iteratee [E, A]는 함수형 프로그래밍에서 iteration 컨셉의 일반화. E 입력,A 출력 (소비자역할) |
Enumerator [E] | 컬렉션을 일반화 한 것으로 형태 E를 열거한다. 무한 열거 (Streaming) 할 수도있다. (생산자역할) |
Enumeratee [E, A] | 거의 사용하지 않기 때문에 지금은 생각하지 좋다. |
과 같다.
먼저 Iteratee 에 대해서 알아보자. Iteratee 는 보다시피 Iterator 와 먼가 관련이 있어 보이는데 자바에서 Iterator 가 어떻게 활용되는지 살펴보면
val l = List(1, 234, 455, 987)
var total = 0 // will contain the final total
var it = l.iterator
while( it.hasNext ) {
total += it.next
}
total
=> resXXX : Int = 1677
짧게 정리하면 (1,234,455,987).iterating( .... total += it.next ....) 이렇게 되는데
앞에 데이터가 있고 그것을 순회하면서 어떤 행동을 하게 된다.
즉 데이터 + 순회 + 행동 으로 이루어져있는데 데이터가 정해져있다. 강결합이다.
여기서 이것을 느슨하게 만들어 줄 방법에 대해서 생각해보면 iterator 를 일반화 하는것에 미치게 되는데 앞에 데이터가 무엇이 오건간에 순회하면서 행동을 하게 하자는 것이다. 즉 데이터는 주입받자는 것이다. 제어역전!!
iteratee = 순회(iterating) + 행동(react)
Enumerator = 데이터
Enumerator 는 정해진 콜렉션의 데이터가 아니라 비동기적으로 데이터를 생산하는 것으로 또 일반화된다.
자바의 Observable
자바의 Observable 에 대한 정말 쉽고 자세히 설명하는 토비님의 동영상 을 먼저 참고하자
토비의 봄 스프링 Reactive programming
에릭마이어의 Duality
위의 Play 에서 생산자 (Enumerator ) 와 소비자 (Iteratee)가 있듯이, 다른 언어,라이브러리에서도 이와 비슷한 것들이 다양한 이름으로 만들어지고 있다. 범람하고 있다는 표현이 더 어울리겠다.
데이터를 보내주는이 <---> 데이터를 받는이
이렇게 상반되는 개념을 지칭하는 개념을 "duality" 라고도 부르는 모양이다.
더 정확한 정보는 동영상도 참고하시고..
Gof 의 옵저버 패턴
이렇게 이벤트를 주고 받는 모습은 기존에 Gof 패턴에서 Observer 패턴이 비슷한 형태를 보여줬고, 예를들어 이 패턴을 설명해보면 어떤 매니져 객체가 있고 , 이 객체에 어떤 이벤트를 받길 원하는 옵저버들이 매니져 객체에 등록한다.
매니져에서 어떤 이벤트를 감지하면 , 자신에게 등록된 옵저버들에게 이벤트를 notify 해주는게 골자이다.
GUI 프로그래밍에서 특히 당연하게 사용되고 있다.
어떤 데이터를 다루는 다양한 View 가 있을때, 어떤 하나의 View를 통해 데이터에 변화를 사용자가 주면, 나머지 View에게도 그 변화를 전파하기 위한 설계에 사용되는 것이다.
이미지에서 Observer 는 알림을 받는 주체이다.
Subject 는 알림을 주는 매니져이다.
Observer 는 자신을 Subject 에게 attach 를 통해 알려주고
Subject 는 어떤 이벤트가 생겼을때 Observer 들에게 update 를 해준다.
POSA2 의 생산자-소비자 패턴 (멀티쓰레드 패턴중 하나)
멀티쓰레드 디자인패턴의 꽃이라고 한다면 단연코 "생산자-소비자" 패턴이라고 할 수 있다. 멀티쓰레드/서버코드를 작성할때 거의 무조건 "생산자-소비자" 패턴이 사용되기 마련이며, 다른 고차원 패턴들 (예를들면 node.js 의 기반패턴인 react 패턴) 의 기반이 되면서 동시에 멀티쓰레드 코어패턴을 포함하고 있는 , 즉 "허리" 역할을 제대로 하고 있는 패턴이라고 볼수 있기 때문에 아주 중요하다고 볼 수 있다.
이것도 역시 이벤트(데이터) 를 주는 놈이 있고 받는 놈이 있는데
보다시피 매우 단순하다. Thread 1이 생산자 Thread 2가 소비자가 된다.
Scala 에서 Observable 사용하기
이제 스칼라에서 Observable 를 사용하는 방식에 대해서 알아보자.
주거니 받거니 (Polling 이 아니라 Push 를 통해) 하는 기본적인 매커니즘은 같다. 다만 기능이 추가되었고 비동기로 사용가능하도록 확장 되었다.
강결합된 데이터를 이미 가지고 있는 생산자 (Observable)
object ObservablesItems extends App {
import rx.lang.scala._
val o = Observable.items("Pascal", "Java", "Scala")
o.subscribe(name => log(s"learned the $name language"))
o.subscribe(name => log(s"forgot the $name language"))
Thread.sleep(1000)
}
코드를 보면 Observable 은 어떤 데이터(이벤트) 를 가지고 있다. 생산자 역할을 한다.
소비자는 ?? 그렇다 name => log(s"learned the $name language") 이 함수리터럴이 담당한다.
(옵저버 패턴으로 비교하면, 생산자: Subject , 소비자: Observer 이며, 따라서 위의 subscribe 는
옵저버패턴에서는 addObserver 나 addListener 메소드와 매칭된다. )
생산자(Observable) 은 자신이 보낼(Push) 할 이벤트를 처리할 친구들을 subscribe 메소드로 모집하고 있다.
* 위의 코드를 실행하려면 아래 rxjava 를 가져와야한다.
libraryDependencies += "com.netflix.rxjava" % "rxjava-scala" % "0.19.1"
정해진 시간 이후에 알림을 주는 생산자 (Observable)
val o = Observable.timer(5.second)
o.subscribe(_ => log(s"Timeout!"))
o.subscribe(_ => log(s"Another timeout!"))
5초 이후에 로그를 찍는다.
에러가 발생했음을 알려주는 생산자 (Observable)
val o = Observable.items(1, 2) ++ Observable.error(new RuntimeException) ++ Observable.items(3, 4)
o.subscribe(
x => log(s"number $x"),
t => log(s"an error occurred: $t")
)
subscribe 는 2개의 매개변수로 받는다. 두번째는 예러 발생시 핸들링할 함수이다.
에러(예외) 이 후의 데이터 처리는 하지 않는다.
지금까지는 subscribe 메소드에
def subscribe(onNext: T => Unit): Subscription = {
asJavaObservable.subscribe(scalaFunction1ProducingUnitToAction1(onNext))
}
onNext 에 해당하는 함수 리터럴을 넣었지만
이제 Observer 객체를 넣어 보도록하자.
Observer (소비자) 에게 데이터를 건네주는 생산자 (Observable)
val classics = List("Il buono", "Big", "Die Hard")
val o = Observable.from(classics)
o.subscribe(new Observer[String] {
override def onNext(m: String) = log(s"Movies Watchlist - $m")
override def onError(e: Throwable) = log(s"Ooops - $e!")
override def onCompleted() = log(s"No more movies.")
})
생산자(Observable) 에 강결합된 리스트를 넣어주었다.
소비자에 Observer 객체를 만들어서 넣어준다.
onNext : 생산자에서 주는 데이터(이벤트) 를 처리하는 메소드
onError : 생산자에서 발생한 예외를 처리하는 메소드
onCompleted : 생산자에서 이제 모든 데이터를 처리하였다고 알려주는 메소드
Observable 과 Future
val f = Future {
Thread.sleep(500)
"Back to the Future(s)"
}
val o = Observable.create[String] { obs =>
f foreach {
case s =>
obs.onNext(s)
obs.onCompleted()
}
f.failed foreach {
case t => obs.onError(t)
}
Subscription()
}
o.subscribe(log _)
- Future 를 하나 만들었다. 3초 있다가 문자열 하나를 리턴해주는 행동을 하는 ~
- Observable 에는 기존처럼 컬렉션의 강결합된 데이터가 엮여있는게 아니라 , Future 가 실행되고 그 리턴값들이 source 데이터로 활용된다.
- 그 데이터 (Future 가 리턴한 값) 를 소비자에게 전달한다.
- 소비자는 log _ 즉 로그를 찍는 함수이다
val o = Observable.from(Future {
Thread.sleep(500)
"Back to the Future(s)"
})
o.subscribe(log _)
다음처럼 Future 로 부터 직접 생산될 수도 있다.
Observable 과 Combinator
val roles = Observable.items("The Good", "The Bad", "The Ugly")
val names = Observable.items("Clint Eastwood", "Lee Van Cleef", "Eli Wallach")
val zipped = names.zip(roles).map { case (name, role) => s"$name - $role" }
zipped.subscribe(log _)
함수형 개발의 특징 답게 두개의 생산자로 부터 데이터를 결합하여 소비자에게 전달할 수도 있다.
main: Clint Eastwood - The Good
main: Lee Van Cleef - The Bad
main: Eli Wallach - The Ugly
이렇게 출력된다.
* zip 은 2개의 컬렉션에서 하나씩 가져와서 튜플을 만든다. zipWithIndex 는 하나를 1,2 등 숫자로 매핑
* map 은 컬렉션을 변화시켜서 새로운 컬렉션을 만든다. 그 자신을 변형시키진 않는다.
Observable 과 Subscription
import rx.lang.scala._
import org.apache.commons.io.monitor._
def modifiedFiles(directory: String): Observable[String] = {
Observable.create { observer =>
val fileMonitor = new FileAlterationMonitor(1000)
val fileObs = new FileAlterationObserver(directory)
val fileLis = new FileAlterationListenerAdaptor {
override def onFileChange(file: java.io.File) {
observer.onNext(file.getName)
}
}
fileObs.addListener(fileLis)
fileMonitor.addObserver(fileObs)
fileMonitor.start()
Subscription { fileMonitor.stop() }
}
}
log(s"starting to monitor files")
val subscription = modifiedFiles(".").subscribe(filename => log(s"$filename modified!"))
log(s"please modify and save a file")
Thread.sleep(10000)
subscription.unsubscribe()
log(s"monitoring done")
- apache.commons 는 스칼라 사용자에게도 보물창고이다.
- monitor 는 파일의 변화를 감지해서 우리의 생산자에게 알려준다.
- 우리의 생산자는 파일의 이름을 자연스럽게 (논블럭) 소비자에게 보내준다.
레퍼런스:
Learning Concurrent Programming in Scala by Aleksandar Prokopec
'Scala' 카테고리의 다른 글
스칼라 강좌 (41) Try 예외처리 (0) | 2017.02.27 |
---|---|
스칼라 강좌 (40) 동시성을 위한 Promise 와 Await (0) | 2017.02.21 |
스칼라 강좌 (38) 동시성을 위한 Collections (0) | 2017.02.21 |
스칼라 강좌 (37) 동시성을 위한 ExecutorContext (1) | 2017.02.21 |
스칼라 강좌 (36) {} 익숙해지기, {} 는 어디서 사용되는가? (0) | 2017.02.17 |