관리 메뉴

HAMA 블로그

Reactive 프로그래밍 - Hello world 본문

소프트웨어 사색

Reactive 프로그래밍 - Hello world

[하마] 이승현 (wowlsh93@gmail.com) 2017. 2. 19. 11:47


이 글은 웹 개발 파라다임의 거대한 변화 - "Reactive" 에 이어지는 코딩 위주의 글입니다.
스칼라 언어와 Play 프레임워크를 통해서 진행되니 관련 지식이 있으면  이해하기 편할 것입니다.
다만 학습을 위한 글로서는 내용이 많이 생략되어 있으며 추상층이 높아서 한번에 이해하기 원래 어려우니  자책 할 필요는 없습니다. 저도 체득하려면 멀었음을 많이 느끼고 있습니다. 완전한 이해를 하려면 여기저기 찾아다니면서 의문점을 해결해야하는 수고가 동반되며 그런면에서 Hello world 라는 제목은 좀 안어울리긴 합니다..@@  그냥 대략 어떤것인지 맛만 본다고 생각하시고 제대로된 학습을 위해서는 나중에 기회가 되면 오프라인 모임등을 통해서 함께 하였으면 하는 마음을 전합니다. 


서론

자 위와 같은 프로그램을 코딩해야한다고 치자. 메인쓰레드가 쭈욱 나가고 있으며 2개의 동작이 분기되어 나중에는 2개 동작의 결과가 합쳐져서 새로운 결과를 표시해야하는 것이다.

먼저 떠오르는 것은 쓰레드 하나를 생성해서 일 시키고.. 또 하나 생성해서 일 시키고... 좋아!!
근데 저 두개의 결과를 메인쓰레드 블럭없이 어떻게합치지??  고민의 시간이다..아주 다양한 방법이 있을 수 있겠다. 
이런데 쏟아야 하는 에너지를 아주 쉬운(편리한) 방법으로 해결해보자. 


먼저 알아 두어야 할 것들 


Future

* Future 자체에 대한 포스트는 요기에 있으니 참고를 => http://hamait.tistory.com/748 

먼저 스칼라에서 비동기 방식으로 Helloworld 를 찍어보자. 스칼라에서의 Future 는 꽤 다양한 방식으로 사용 할 수있는데 먼저 스칼라에서의 Future 모양은 다음과 같다.

trait Future[T]   

퓨쳐값을 나타낸다. T 타입의 실제 우리가 리턴받기 원하는 객체를 포함한다. 

defy apply[T](b: =>T) (implicit e: ExecutionContext) : Future[T] 

퓨처 계산을 나타낸다. 실제 계산을 수행하는 함수를 매개변수로 넣어주고 있다.

: 암시적으로 ExecutionContext 를 맥변수로 넣어준다. 즉 쓰레드풀을 넣어주는것. 

퓨쳐값 과 퓨쳐계산을 잘 구분해서 기억해두자.

예를 보면 이해가 더 잘 될거 같다. ㄱㄱ~

object FuturesDataType extends App {
import scala.concurrent._
import ExecutionContext.Implicits.global

val myFuture: Future[String] = Future {
Thread.sleep(1000*1)
"HelloWorld"
} log(s"status: ${myFuture.isCompleted}")

Thread.sleep(3000)
log(s"status: ${myFuture.value}")
}

그 어디에도 쓰레드를 직접 실행하는 모습을 볼 수 없다. 하지만 위의 코드는 메인쓰레드와 서브쓰레드로 나뉘어져있다. 어디에서 서브쓰레드가 실행되는 것일까? 그렇다. Future {  .. }  안의 코딩이 내부 쓰레드풀에서 실행되어진다. 1초 쉬었다가 "HelloWorld" 문자열을 리턴해주는 행동을 한다. 퓨쳐계산 한다고 한다.

참고로 Future { .. } 는 Future.apply 메소드 호출을 생략해서 쓴 것이다. (이런 문법적 유도리는 Syntatic sugar 라고 불린다)

그럼 myFuture 는 무엇인가? 퓨쳐값이다. 바로 리턴받게 되는데 myFuture 에는 실제 리턴값 즉 "HelloWorld" 는 들어 있지 않다. 대략 1초 후에 real data 가 담길 것이기 때문에 메인쓰레드에서 3초를 쉬었다. 그 후에 real data 를 가져와서 log 로 출력해주고 있다.

순서를 다시 정리해보면 

- "HelloWorld" 를 만드는 일을 Future 계산을 통해서 위임하고 바로 Future [String] 값을 리턴받는다.
- 리턴 받은 Future 에서 계산 완료가 되었는지 계속 확인 한 후에 (즉 계속 폴링) 실제 결과 값을 찍어준다. 
- 이렇게 Future 를 사용하면 사실 비동기를 사용하는 의미가 줄어 든다.

예를들어 스타벅스에서 1분마다 카운터에 가서 계속 내 커피가 나왔는지 물어 보지 말고, 커피가 나왔으면 커피를 내 자리로 바로 배달해주도록 하는 것 (콜백받는)은 어떻게 할까? 

자 그 역할을 하는 콜백함수 Future 에 넣어보자.

def getHello(): Future[String] = Future {
Thread.sleep(1000*1)
"HelloWorld"
}

val myHello: Future[String] = getHello()

myHello foreach {
word => log(s"result: ${word}")
}

myHello 라는 Future 값을 가지고 real data 가 결정되었는지 물어보는 코드가 없다.
다만 실제 데이터가 결정되었을때 바로 이용하는 콜백함수 
foreach { .... }   가 생겼다.
저 함수는 메인쓰레드를 블럭시키지 않고 또 다른 쓰레드에서 실행될 것이다.

마지막으로 함수 합성에 대해서 알아보자.

비동기 함수합성 과 map

콜백은 유용하지만 프로그램이 커지면 이를 사용한 프로그램 흐름에 대한 추론이 어려워 진다. 
또한 콜백을 사용하면 특정 패턴의 비동기 프로그래밍을 사용할 수 없게 된다. 특히 여러 퓨처에 한꺼번에 한 콜백을 등록하는 것은 귀찮은 일이 된다.

그래서 이러한 비동기 작업의 흐름을 "결정론적" 으로 자연스럽게 순서를 부여해서 해결 해주는 방식이 map 을 이용하는 것인데.. 이렇게 하면 첫번째 비동기 작업이 끝난 후에 다음 비동기 작업을 자연스럽게 연계해 줄 수 있게 된다. 

* map 은 기존 퓨처에 있는 값을 사용해서 새 퓨처를 만드는 메소드이다. 
def map[S](f: T => S) (implicit e : ExecutionContext) : Future[S] 

아래 예를 보자.


val buildFile = Future { Source.fromFile("build.sbt").getLines }

val longestBuildLine = buildFile.map(lines => lines.maxBy(_.length))

longestBuildLine onComplete {
case Success(line) => log(s"the longest line is '$line'")
}


위의 예에서는 1. build.sbt 파일에서 각 라인들을 읽어서 2. 그 중에서 가장 긴 라인을 출력하는 2가지의 함수가 합성되었다.이걸 쓰레드를 직접만들어서 했을 때와 비교하면 얼마나 간단해 졌는지 실감 할 수 있을 것이다.

val gitignoreFile = Future { Source.fromFile(".gitignore-SAMPLE").getLines }

val longestGitignoreLine = for (lines <- gitignoreFile) yield lines.maxBy(_.length)

longestBuildLine onComplete {
case Success(line) => log(s"the longest line is '$line'")
}

다음처럼 for comprehension 으로 처리 할 수 도 있다.
스칼라에서는 map 메소드가 있는 객체에 대해서는 for comprehension 을 허용한다.

* 아래 카카오에서 발행한 글을 통해 다시 복습해서 이해해보면 좋을 거 같다.
카카오톡 - 스칼라 Future 를 통한 모나드 이해 


Promise 

스칼라의 Promise 는 (스칼라의 Promise 다. 다른 언어 라이브러리들에서 Future, Promise 등에 대한 정의가 조금씩 다를 수 있다. 요즘 처럼 동시성 이슈가 많은 시절에는 원할한 의사 소통을 위해 구분 지어야 할 것이다)  는 Future 의 일반화라고 볼 수 있다.

위의 Future 에서는 항상 행동이 강결합되있었다.

future.( 행동 )  

하지만 Promise 는 Promise 만 먼저 선언해두고 나중에 해당 Promise 에 어떤 행위를 해서 결과를 받게 한다.

즉 Future 는 아래와 같은 시나리오라면 
다른데서 하는 행위 - 그 행위에 대한 실제 결과 -  미리 받은 Future - 실제 결과를 이용한 행동

Promise 는 
다른데서 하는 행위가 빠졌고 - 실제 결과를 돌려주는 타이밍을 스스로 결정  - 실제 결과를 이용한 행동 
으로 볼 수 있다.

다음 예를 보면 이해가 쉽게 갈 것이다.

import scala.concurrent._
import ExecutionContext.Implicits.global

val p = Promise[String]

p.future foreach {
case text => log(s"Promise p succeeded with '$text'")
}

p success "kept"

- Promise 를 미리 만들어 두었다.
- Promise 가 완료되면 진행할 행동을 foreach 의 케이스 매칭으로 만들어 두었다.
- success 를 호출해서 완료 및 인자를 전달한다.

다른 예로는 

def myFuture[T](body: =>T): Future[T] = {
val p = Promise[T]

global.execute(new Runnable {
def run() = try {
val result = body
p success result
} catch {
case NonFatal(e) =>
p failure e
}
})

p.future
}

val future = myFuture {
"naaa" + "na" * 8 + " Katamari Damacy!"
}

future foreach {
case text => log(text)
}

- 먼저 Promise 객체를 행동과 연결되는게 없이 만든다.
- 어떤 행동을 하게 하고 바로 Future 를 리턴시켜준다.
- 어떤 행동 (registerOnCompleteCallback) 안에서 어떤 행동이 완료되었다면 success 라는 메소드를 실행시켜서 먼저 리턴한 Future 에게 complete 신호를 보낸다. 

마지막으로 아직도 헥깔리실까봐 말하자면 Future 는 자신의 쓰레드를 생성하고 , 행위를 강결합해서 실행했다면  Promise 는 전혀 그러하지 않다. 그냥 다른 쓰레드 (위에서는 ForkjoinPool) 에 스며들어가서 완료시점과 전달값을 세팅만 했을뿐이다.

Observable

동시성 도구인 Observable 을 배우기 전에 선두 학습이 필요한데 옵저버패턴과 생산자-소비자 패턴이다.

Gof 의 옵저버 패턴 

Reactive 프로그래밍이 데이터 흐름을 자연스럽게 이어지게 하자는 것이라고 말했었다. 이벤트를 주고 받는 모습은 기존에 Gof 패턴에서 Observer 패턴이 비슷한 형태를 보여줬고, 예를들어 이 패턴을 설명해보면 어떤 매니져 객체가 있고 , 이 객체에 어떤 이벤트를 받길 원하는 옵저버들이 매니져 객체에 등록한다. 
매니져에서 어떤 이벤트를 감지하면 , 자신에게 등록된 옵저버들에게 이벤트를 notify 해주는게 골자이다.

GUI 프로그래밍에서 특히 당연하게 사용되고 있다. 
어떤 데이터를 다루는 다양한 View 가 있을때, 어떤 하나의 View를 통해 데이터에 변화를 사용자가 주면, 나머지 View에게도 그 변화를 전파하기 위한 설계에 사용되는 것이다. 


POSA2 의 생산자-소비자 패턴 (멀티쓰레드 패턴중 하나) 

멀티쓰레드 디자인패턴의 꽃이라고 한다면 단연코 "생산자-소비자" 패턴이라고 할 수 있다. 멀티쓰레드/서버코드를 작성할때 거의 무조건 "생산자-소비자" 패턴이 사용되기 마련이며, 다른 고차원 패턴들 (예를들면 node.js 의 기반패턴인 react 패턴) 의 기반이 되면서 동시에 멀티쓰레드 코어패턴을 포함하고 있는 , 즉  "허리" 역할을 제대로 하고 있는 패턴이라고 볼수 있기 때문에 아주 중요하다고 볼 수 있다.

이것도 역시 이벤트(데이터) 를 주는 놈이 있고 받는 놈이 있는데 


스칼라 (rxJava) 의 Observable
주거니 받거니 (Polling 이 아니라 Push 를 통해)  하는 기본적인 매커니즘은 같다. 다만 기능이 추가되었고 비동기로 사용가능하도록 확장 되었다. 

강결합된 데이터를 이미 가지고 있는 생산자 (Observable)


object ObservablesItems extends App {
import rx.lang.scala._

val o = Observable.items("Pascal", "Java", "Scala")
o.subscribe(name => log(s"learned the $name language"))
o.subscribe(name => log(s"forgot the $name language"))

Thread.sleep(1000)
}

코드를 보면 Observable 은 어떤 데이터(이벤트) 를 가지고 있다. 생산자 역할을 한다.
소비자는 ?? 그렇다 name => log(s"learned the $name language")  이 함수리터럴이 담당한다.

생산자(Observable) 은 자신이 보낼(Push) 할 이벤트를 처리할 친구들을 subscribe 메소드로 모집하고 있다.


Iteratee & Enumerator 

Play 를 이 Reactive 웹프로그래밍을 하기 위한 최적의 툴로 play 라 손꼽힌다. (개인적으로 스프링은 조잡한거 같다.) 따라서 play 에서 제공하는 도우미들에 대해서 알아두면 아래 진행될 "Hello world" 를 이해하기 편할 것같아 간단히 소개하고자 한다.  아래 레퍼런스를 참고로 짥게 정리하였으니 참고 하길 바란다.

레퍼런스: http://mandubian.com/2012/08/27/understanding-play2-iteratees-for-normal-humans/

개념 

Play 에서 Iteratee / Enumerator / Enumeratee을 간단하게 말하면 :

이름설명
Iteratee [E, A]

 Iteratee [E, A]는 함수형 프로그래밍에서 iteration 컨셉의 일반화. E 입력,A 출력 (소비자역할)

Enumerator [E]컬렉션을 일반화 한 것으로 형태 E를 열거한다. 무한 열거 (Streaming) 할 수도있다. (생산자역할)
Enumeratee [E, A]거의 사용하지 않기 때문에 지금은 생각하지 좋다.

과 같다.

먼저 Iteratee 에 대해서 알아보자. Iteratee 는 보다시피 Iterator 와 먼가 관련이 있어 보이는데 자바에서 Iterator 가 어떻게 활용되는지 살펴보면 

val l = List(1, 234, 455, 987)

var total = 0 // will contain the final total
var it = l.iterator
while( it.hasNext ) {
total += it.next
}

total
=> resXXX : Int = 1677


짧게 정리하면  (1,234,455,987).iterating( .... total += it.next ....)  이렇게 되는데 
앞에 데이터가 있고 그것을 순회하면서 어떤 행동 하게 된다. 

데이터 + 순회 + 행동 으로 이루어져있는데  데이터가 정해져있다. 강결합이다. 

여기서 이것을 느슨하게 만들어 줄 방법에 대해서 생각해보면 iterator 를 일반화 하는것에 미치게 되는데 앞에 데이터가 무엇이 오건간에 순회하면서 행동을 하게 하자는 것이다. 즉 데이터는 주입받자는 것이다. 제어역전!! 

iteratee = 순회(iterating) + 행동(react)
Enumerator  = 데이터 

Enumerator 는 정해진 콜렉션의 데이터가 아니라 비동기적으로 데이터를 생산하는 것으로 또 일반화된다.  
다음 예제를 통해서 냄새를 맡아보자.

Enumerator 예제)

// 고정된 데이터로 부터 Enumerator 가 만들어진다. val stringEnumerator: Enumerator[String] = Enumerate("alpha", "beta", "gamma")

val integerEnumerator: Enumerator[Int] = Enumerate(123, 456, 789)
// 파일로 부터 Enumerator 가 만들어진다.

val fileEnumerator: Enumerator[Array[Byte]] = Enumerator.fromFile("myfile.txt")

// 어떤 콜백함수로부터 생성된 데이터로 부터 Enumerator가 만들어진다.
val dateGenerator: Enumerator[String] = Enumerator.generateM(
play.api.libs.concurrent.Promise.timeout(
Some("current time %s".format((new java.util.Date()))),
500
)
)

Iteratee 예제)

// 데이터들의 합을 만드는 (행동) 을 정의했다.
val iterator = Iteratee.fold(0){ (total, elt) => total + elt }
// 데이터는 요기 Enumerator 를 사용하고 val e = Enumerator(1, 234, 455, 987)

// apply 메소드는 결과는 받지 않는다.
val totalIteratee: Promise[Iteratee[Int, Int]] = enumerator apply iterator

// run 메소드는 결과를 result1 에 담는다. 물론 비동기로 받는다.
val total: Promise[Int] = enumerator run iterator

앞으로 이런 표시들이 나올 텐데  &>|>>|>>> ><>.  각각  throughapplyapplyOn or compose. 의 약자이다. 자세히 알고 싶으면  관련 문서를 찾아서 공부하길 바란다.


"Hello world" in reactive Web 

외부에서 내부로 많은 데이터들이 흘러다니는 환경이 마련되어진 요즘 그런 데이터 흐름을 유연하게 처리 할 아키텍처가 필요해졌다. 그 중 트위터로부터 데이터가 흘러들어오면 자연스럽게 사용자에게 서비스해주는것을 목표로 한다. Play 를 이용해서 웹서비스를 하기 때문에 지식을 미리가지고 있으면 좋겠지만 그렇지 개념을 이해하는데는 무리 없을 것이다. 

트위터로 부터 흘러들어오는 데이터를 웹소켓을 이용해서 바로 사용자에게 전달해주는 방식이다.

결과 부터 보면  "박근혜" 라는 낱말로 매칭을 해서 실시간 트윗들이 흘러 들어오게 하고 있다.

사용자 <----- 웹소켓 ---->   Play Reactive 서버 < ------ Rest API -----> 트위터  
이런 자연스러운 흐름이 생긴 것이다.

실제 Reactive 코드를 살펴보자.

View

<div id="tweets"></div>

<script type="text/javascript">

function appendTweet(text) {
var tweet = document.createElement("p");
var message = document.createTextNode(text);
tweet.appendChild(message);
document.getElementById("tweets").appendChild(tweet);
}

function connect(attempt) {
var connectionAttempt = attempt;
var tweetSocket = new WebSocket("@routes.Application.tweets().webSocketURL()");
tweetSocket.onmessage = function (event) {
console.log(event);
var data = JSON.parse(event.data);
appendTweet(data.text);
};
tweetSocket.onopen = function() {
connectionAttempt = 1;
tweetSocket.send("subscribe");
};
tweetSocket.onclose = function() {
if (connectionAttempt <= 3) {
appendTweet("WARNING: server lost" + connectionAttempt);
setTimeout(function() {
connect(connectionAttempt + 1);
}, 5000);
} else {
alert("server was lost. Please try again later");
}
};
}

connect(1);
</script>

- WebSocket 객체를 생성해서 서버측과 연결하고 있다.
- onmessage 콜백을 통해서 데이터를 서버로 부터 받으면  div 에 append 해당 내용을 해주고 있다. 

Controller

class Application extends Controller {

def index = Action { implicit request =>
Ok(views.html.index("Tweets"))
}

def tweets = WebSocket.acceptWithActor[String, JsValue] { request => out =>
TwitterStreamer.props(out)
}

def replicateFeed = Action { implicit request =>
Ok.feed(TwitterStreamer.subscribeNode)
}

}

WebSockets 를 TwitterStreamer 액터를 이용해서 사용한다. out 은 외부 즉 클라이언트의 연결통로 역할을 하는 액터의 레퍼런스이다. A 액터라고 치자.  TwitterStreamer 액터는 그 A액터에 메세지를 보내는역할.

- [String, JsValue] 는 [In, Out] 인데, 웹소켓은 클라이언트에서 내용(String)을 받고,  JSON 객체(JsValue)로 만들어서 클라이언트에게 전달(Out) 한다는 의미이다.

- request => out => TwitterStreamer.props(out) 은 좀 헥깔릴수 있겠는데 스칼라의 람다식이다.
  request 라는 매개변수를 받고 out=> TwitterStreamer.props(out) 라는 함수를 리턴하는 람다식이다.
  최종적으로  TwitterStreamer.props(out) 라는 액터객체가 만들어 질 것이다.
  여기서 props 는 Akka 에서 Actor 를 만드는 팩토리 메소드 쯤으로 보면 된다.

Play 는 2가지 다른 방식으로 웹소켓을 제공합니다. => 사용법 바로가기

 1. 액터 기반의 Akka Streams 사용 
 2. iteratees 사용

def acceptWithActor[In, Out](f: RequestHeader => HandlerProps)(implicit in: FrameFormatter[In],
out: FrameFormatter[Out], app: Application, outMessageType: ClassTag[Out]): WebSocket[In, Out] = {
tryAcceptWithActor { req =>
Future.successful(Right((actorRef) => f(req)(actorRef)))
}
}

 - acceptWithActor 메소드의 내부는 이렇게 생겼다. 어질어질...-.-;;


Twitter Actor 

class TwitterStreamer(out: ActorRef) extends Actor {
def receive = {
case "subscribe" =>
Logger.info("Received subscription from a client")
TwitterStreamer.subscribe(out)
}

override def postStop() {
Logger.info("Client unsubscribing from stream")
TwitterStreamer.unsubscribe(out)
}
}

object TwitterStreamer {

private var broadcastEnumerator: Option[Enumerator[JsObject]] = None
private var broadcaster: Option[Broadcaster] = None
private val subscribers = new ArrayBuffer[ActorRef]()

def props(out: ActorRef) = Props(new TwitterStreamer(out))

def subscribe(out: ActorRef): Unit = {

if (broadcastEnumerator.isEmpty) {
init()
}

def twitterClient: Iteratee[JsObject, Unit] = Cont {
case in@Input.EOF => Done(None)
case in@Input.El(o) =>
if (subscribers.contains(out)) {
out ! o
twitterClient
} else {
Done(None)
}
case in@Input.Empty =>
twitterClient
}

broadcastEnumerator.foreach { enumerator =>
enumerator run twitterClient
}
subscribers += out
}

def unsubscribe(subscriber: ActorRef): Unit = {
val index = subscribers.indexWhere(_ == subscriber)
if (index > 0) {
subscribers.remove(index)
Logger.info("Unsubscribed client from stream")
}
}

def subscribeNode: Enumerator[JsObject] = {
if (broadcastEnumerator.isEmpty) {
TwitterStreamer.init()
}

broadcastEnumerator.getOrElse {
Enumerator.empty[JsObject]
}
}

def init(): Unit = {

credentials.map { case (consumerKey, requestToken) =>

val (iteratee, enumerator) = Concurrent.joined[Array[Byte]]

val jsonStream: Enumerator[JsObject] = enumerator &>
Encoding.decode() &>
Enumeratee.grouped(JsonIteratees.jsSimpleObject)

val (e, b) = Concurrent.broadcast(jsonStream)

broadcastEnumerator = Some(e)
broadcaster = Some(b)

val maybeMasterNodeUrl = Option(System.getProperty("masterNodeUrl"))
val url = maybeMasterNodeUrl.getOrElse {
"https://stream.twitter.com/1.1/statuses/filter.json"
}

WS
.url(url)
.sign(OAuthCalculator(consumerKey, requestToken))
.withQueryString("track" -> "박근혜")
.get { response =>
Logger.info("Status: " + response.status)
iteratee
}.map { _ =>
Logger.info("Twitter stream closed")
}

} getOrElse {
Logger.error("Twitter credentials are not configured")
}

}

}

The Play WS API 를 이용해서 외부 서비스에 비동기 http 로 접속한다. 

- withQueryString("track" -> "박근혜") 로 연관단어를 통해 트윗을 검색한다.

- 접속후에 수신한 메세지를 Enumerator 를 통해  iteratee 에 넘긴다.

- iteratee 는 브로드 캐스팅하며 연결된 모든 웹소켓 객체에 데이터를 뿌려준다.

- 액터를 사용하여 쓰레드 안전성을 높혔다. 많은 클라이언트들을 대응하려면 이렇게 하면 좋다.
  즉 subscribe (사용자들의 목록)  같은 멤버변수는 쓰레드 안전이다. 

  &> 는  through의 약자이다. 필터링을 한다는 뜻이다.


다음 포스팅에는 Play기반 Reactive Web applications 을 좀 더 분해해서 설명하는 시간을 가질 예정입니다. 스프링의 경우 아래 토비의스프링으로 유명하신 이일민님의 동강을 참고하세요. 

* 스프링으로 하는 Reactive Streams - 이일민 



레퍼런스:

reactive web applications 

Comments