관리 메뉴

HAMA 블로그

스칼라 강좌 (40) 동시성을 위한 Promise 와 Await 본문

Scala

스칼라 강좌 (40) 동시성을 위한 Promise 와 Await

[하마] 이승현 (wowlsh93@gmail.com) 2017. 2. 21. 17:45


Promise

이전 블로그글에서 Future 에 대해서 먼저 읽고 오자. (안읽으셨다면..) 
스칼라의 Promise 는 Future 의 일반화라고 볼 수 있다. (스칼라의 Promise 다. 다른 언어 라이브러리들이 모두 Future, Promise 등에 대한 정의/구현이 조금씩 다를 수 있다. 요즘 처럼 동시성 이슈가 많은 시절에는 원할한 의사 소통을 위해 구분 지어야 할 것이다)  따라서 Future 에 대해서 학습했다면 쉽게 이해 가능하다. (여기서 일반화라는 말이 너무추상적이라 헤깔릴수 있을 텐데 좀 참아보자.)

Future 에서는 보통 위임 행동이 강결합 되있었다. 즉 future.( 행동 )  
하지만 Promise 는 Promise 만 먼저 선언해두고 나중에 success 를 호출해서 완료된것을 알려준다.
(자바의 CompletableFuture라는 작명이 오히려 더 잘 설명해 주는거 같다. Promise는 완료 시점을 스스로 알려주는 Future라고 볼 수 있다.) 


다음 예를보자.

Promise - 1

import scala.concurrent._
import ExecutionContext.Implicits.global

val p = Promise[String]

p.future foreach {
case text => log(s"Promise p succeeded with '$text'")
}

p success "kept"

- Promise 를 미리 만들어 두었다.
- Promise 가 완료되면 진행할 행동을 foreach 의 케이스 매칭으로 만들어 두었다.
- success 를 호출해서 완료 및 인자를 전달한다.

q failure new Exception("not kept")

q.future.failed foreach {
case t => log(s"Promise q failed with $t")
}

- p sucess 가 아니라 p failure 를 호출하여 실패 상황을 알려 줄 수도 있따.


Promise - 2

import scala.concurrent._
import ExecutionContext.Implicits.global
import scala.util.control.NonFatal

def myFuture[T](body: =>T): Future[T] = {
val p = Promise[T]

global.execute(new Runnable {
def run() = try {
val result = body
p success result
} catch {
case NonFatal(e) =>
p failure e
}
})

p.future
}

val future = myFuture {
"naaa" + "na" * 8 + " Katamari Damacy!"
}

future foreach {
case text => log(text)
}

- 먼저 Promise 객체를 행동과 연결되는게 없이 만든다.
- 어떤 행동을 하게 하고 바로 Future 를 리턴시켜준다.
- 어떤 행동 (registerOnCompleteCallback) 안에서 어떤 행동이 완료되었다면 success 라는 메소드를 실행시켜서 먼저 리턴한 Future 에게 complete 신호를 보낸다. 

아직도 헥깔리실까봐 말하자면 Future 는 자신의 쓰레드를 생성하고 , 행위를 강결합해서 실행했다면  Promise 는 전혀 그러하지 않다. 그냥 다른 쓰레드 (위에서는 ForkjoinPool) 에 스며들어가서 완료시점과 전달값을 세팅만 했을뿐이다.


취소할 수 있는 Promise

퓨처 계산을 중간에 멈추고 싶을 경우가 있다. 사용자가 취소버튼을 눌렀다던가 .. 뭐 그럴 경우 중간에 멈추게 하고 거기 까지의 값을 원할 수 도 있고 등등 ..이럴때는 취소를 위한 퓨처를 다른 퓨처와 함께 조합해서 사용하는 방법인데 이게 꽤 헥깔리다. 

소스를 보자.

def cancellable[T](b: Future[Unit] => T): (Promise[Unit], Future[T]) = {
val p = Promise[Unit]
val f = Future {
val r = b(p.future)
if (!p.tryFailure(new Exception))
throw new CancellationException
r
}
(p, f)
}

val (cancel, value) = cancellable { cancel =>
var i = 0
while (i < 5) {
if (cancel.isCompleted) throw new CancellationException
Thread.sleep(500)
log(s"$i: working")
i += 1
}
"resulting value"
}

Thread.sleep(1500)

cancel trySuccess ()

log("computation cancelled!")

솔직히 이 쯤되면 그냥 저레벨 쓰레드 사용해서 대충 짜고 싶은 마음이 들기도 한다. 세상 살면서 이런 코드를 얼마나 짜겠다고.. 이걸 쉽게 이해하고 체득하려고 노력까지 해야하나 싶다.

이 코드는 말로 설명하긴 어렵겠다. 각자 생존하자. -.-;; (아래 레퍼런스의 책을 천천히 읽으시라 권유하고 싶다).  

힌트 하나만 주자면 Promise p 가 취소용으로 사용됬다는 것이다. 즉 그 동안의 코드와 다르게 사용자 측에서 Promise 을 제어한다. trySuccess() 를 사용해서~~


Await

어느 순간에는 비동기의 연속보다는 무엇인 종료될때까지 기다리고 싶을 때도 있다. 그때 사용된다.

Await.result

import scala.concurrent._
import ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.io.Source

val urlSpecSizeFuture = Future { Source.fromURL("http://www.w3.org/Addressing/URL/url-spec.txt").size }
val urlSpecSize = Await.result(urlSpecSizeFuture, 10.seconds)

log(s"url spec contains $urlSpecSize characters")

Await.result 에서 웹싸이트를 다 긁어 올때까지 기다린다.  10초만 기다린다.


Await.result

val startTime = System.nanoTime

val futures = for (_ <- 0 until 16) yield Future {
Thread.sleep(1000)
}

for (f <- futures) Await.ready(f, Duration.Inf)

val endTime = System.nanoTime

log(s"Total execution time of the program = ${(endTime - startTime) / 1000000} ms")
log(s"Note: there are ${Runtime.getRuntime.availableProcessors} CPUs on this machine")

모든 퓨터가 완료 될 때까지 무한정 기다린다.
Await.ready 는 리턴값이 없다.

16개의 퓨쳐가 실행완료 될 때까지 얼마나 걸리는지 재는 코드이다.
저게 시퀀셜하게 동작했다면 16초이상 걸렸겠지만 내 PC에서는 아래와 같은 결과가 나왔다.

main: Total execution time of the program = 4451 ms (대략 4초)

main: Note: there are 4 CPUs on this machine ( 4코어) 



레퍼런스

스칼라 동시성 프로그래밍 - 오현석 옮김


Comments