관리 메뉴

HAMA 블로그

스칼라 강좌 (34) 동시성을 위한 Future 본문

Scala

스칼라 강좌 (34) 동시성을 위한 Future

[하마] 이승현 (wowlsh93@gmail.com) 2017. 2. 16. 18:14


1. Future
2. Promise
3. Awiat
4. async
5. Observable
6. 병렬 

Future

스칼라에서의 Future 는 꽤 다양한 방식으로 사용 할 수있는데 

먼저 스칼라에서의 Future 모양을 살펴보자.

trait Future[T]   

: 퓨쳐값을 나타낸다. T 타입의 실제 우리가 리턴받기 원하는 객체를 포함한다. 

def apply[T](b: =>T) (implicit e: ExecutionContext) : Future[T] 

: 퓨처 계산을 나타낸다. 실제 계산을 수행하는 함수를 매개변수로 넣어주고 있다.

: 암시적으로 ExecutionContext 가 매개변수로 들어간다. 즉 쓰레드풀을 넣어주는것. 

퓨쳐값 과 퓨쳐계산을 잘 구분해서 기억해두자.

1) 퓨쳐 실행 

object FuturesComputation extends App {
import scala.concurrent._
import ExecutionContext.Implicits.global

Future {
log(s"the future is here")
}

log(s"the future is coming")
}

2개의 로그중 어떤것이 먼저 출력되는지는 비 결정적이다. (물론 아래 log 가 먼저 찍힐 가능성이 크겠지만) Future 를 통해서 자연스럽게 첫번째 로그는 자신의 쓰레드안에서 독립적으로 실행된다. 

Future{...} 는 예상했다시피 Future.apply 메소드 호출을 생략되진 것이다. 스칼라에서 함수호출시 () 대신 {} 를 이용할 수 있다. 단 인자가 하나일 경우에만.  {..} 의 해당내용은 apply 의 매개변수로 들어 갈 것이며 Future 값을 반환 할 것이다. 

2) 퓨쳐 실행 그리고 Future 리턴 


object FuturesDataType extends App {
import scala.concurrent._
import ExecutionContext.Implicits.global
import scala.io.Source

val myFuture: Future[String] = Future {
val f = Source.fromFile("build.sbt")
try f.getLines.mkString("\n") finally f.close()
}

log(s"started reading build file asynchronously")
log(s"status: ${myFuture.isCompleted}")
Thread.sleep(250)
log(s"status: ${myFuture.isCompleted}")
log(s"status: ${myFuture.value}")

}

- 파일을 읽는 일을 Future 계산을 통해서 위임하고 바로 Future [String] 값을 리턴받는다.
- 리턴 받은 Future 에서 계산 완료가 되었는지 계속 확인 한 후에 (즉 계속 폴링) 실제 결과 값을 찍어준다.
- 이렇게 Future 를 사용하면 사실 비동기를 사용하는 의미가 줄어 든다.

예를들어 스타벅스에서 1분마다 카운터에 가서 계속 내 커피가 나왔는지 물어 보지 말고, 커피가 나왔으면 커피를 내 자리로 바로 배달해주도록 하는 것도 커피를 주문 할 때 알려주면 안될까?

자 그 역할을 하는 콜백함수를 Future 에 넣어보자.

3) 퓨쳐 와 콜백 

object FuturesCallbacks extends App {
import scala.concurrent._
import ExecutionContext.Implicits.global
import scala.io.Source

def getUrlSpec(): Future[Seq[String]] = Future {
val f = Source.fromURL("http://www.w3.org/Addressing/URL/url-spec.txt")
try f.getLines.toList finally f.close()
}

val urlSpec: Future[Seq[String]] = getUrlSpec()

def find(lines: Seq[String], word: String) = lines.zipWithIndex collect {
case (line, n) if line.contains(word) => (n, line)
} mkString("\n")

urlSpec foreach {
lines => log(s"Found occurrences of 'telnet'\n${find(lines, "telnet")}\n")
}

urlSpec foreach {
lines => log(s"Found occurrences of 'password'\n${find(lines, "password")}\n")
}

log("callbacks installed, continuing with other work")
Thread.sleep(1000 * 10)

}

- 웹페이지를 긁어 오는 Future 를 실행하고 퓨처값[리스트]을 리턴 받는다.
-  foreach 메소드를 통해서 최종완료된 값을 처리해 준다. (콜백처리) 
- 가져온 데이터에서 "telnet" , "password" 가 있는 라인만 추려서 출력해준다.

3) 퓨쳐 와 예외 


object FuturesFailure extends App {
import scala.concurrent._
import ExecutionContext.Implicits.global
import scala.io.Source

val urlSpec: Future[String] = Future {
Source.fromURL("http://www.w3.org/non-existent-url-spec.txt").mkString
//Source.fromURL("http://www.w3.org/Addressing/URL/url-spec.txt").mkString

}

// urlSpec foreach {
// value => log(s"Found occurrences of 'telnet'\n${value}\n")
// }

urlSpec.failed foreach {
case t => log(s"exception occurred - $t")
}

Thread.sleep(1000*10)
}

- 보통 Future 계산에서 예외를 던지면 그에 대응하는 Future 값은 값으로 완료 될 수 없다.
- 스칼라 Future 는 성공적으로 완료되거나, 실패로 완료된다. 퓨처가 실패로 완료되는 경우를 퓨쳐가 실패 했다라고 말한다. 
- foreach 메소드는 성공적으로 완료한 퓨처에서만 값을 받는 콜백만을 받는다. 
- 따라서 실패 콜백을 지정하기 위해서는 다른 메소드가 필요하다. 그게 failed 라 한다. 
- failed 메소드는 퓨처가 실패하면서 내놓는 예외를 포함하는 Future[Throwable] 객체를 반환한다. 


object FuturesExceptions extends App {
import scala.concurrent._
import ExecutionContext.Implicits.global
import scala.io.Source
import scala.util.{Try, Success, Failure}


val file = Future { Source.fromFile(".gitignore-SAMPLE").getLines.mkString("\n") }

file foreach {
text => log(text)
}

file.failed foreach {
case fnfe: java.io.FileNotFoundException => log(s"Cannot find file - $fnfe")
case t => log(s"Failed due to $t")
}


file onComplete {
case Success(text) => log(text)
case Failure(t) => log(s"Failed due to $t")
}

Thread.sleep(1000*10)
}

- 간결하게 onComplete 를 통해서도 성공과 실패를 처리 함께 할 수 있다.
- onComplete 함수안에는 Try[T] 가 패턴매칭으로 들어간다.


object FuturesTry extends App {
import scala.util._

val threadName: Try[String] = Try(Thread.currentThread.getName)
val someText: Try[String] = Try("Try objects are created synchronously")
val message: Try[String] = for {
tn <- threadName
st <- someText
} yield s"$st, t = $tn"

message match {
case Success(msg) => log(msg)
case Failure(error) => log(s"There should be no $error here.")
}

Thread.sleep(1000*10)
}

- Try[T] 타입은 Option[T] 와 비슷하지만 다른점은 실패시 정보를 추가 할 수 있다는 점이다. Option[T] 같은 경우 그냥 None 일 뿐이지 않는가.
- Try[T] 는 Future[T] 와 달리 동기적으로 다루어진다.

4) 퓨쳐 와 합성

map

콜백은 유용하지만 프로그램이 커지면 이를 사용한 프로그램 흐름에 대한 추론이 어려워 진다.
또한 콜백을 사용하면 특정 패턴의 비동기 프로그래밍을 사용할 수 없게 된다. 특히 여러 퓨처에 한꺼번에 한 콜백을 등록하는 것은 귀찮은 일이 된다.


val buildFile = Future { Source.fromFile("build.sbt").getLines }

val longestBuildLine = buildFile.map(lines => lines.maxBy(_.length))

longestBuildLine onComplete {
case Success(line) => log(s"the longest line is '$line'")
}

map 은 기존 퓨처에 있는 값을 사용해서 새 퓨처를 만드는 메소드이다. 
def map[S](f: T => S) (implicit e : ExecutionContext) : Future[S] 

위의 예에서는 1. build.sbt 파일에서 각 라인들을 읽어서 2. 그 중에서 가장 긴 라인을 출력하는 2가지의 함수가 합성되었다.이걸 쓰레드를 직접만들어서 했을 때와 비교하면 얼마나 간단해 졌는지 실감 할 수 있을 것이다.

for complehension

val gitignoreFile = Future { Source.fromFile(".gitignore-SAMPLE").getLines }

val longestGitignoreLine = for (lines <- gitignoreFile) yield lines.maxBy(_.length)

longestBuildLine onComplete {
case Success(line) => log(s"the longest line is '$line'")
}

다음처럼 for comprehension 으로 처리 할 수 도 있다.
스칼라에서는 map 메소드가 있는 객체에 대해서는 for comprehension 을 허용한다.

async - await

아래 처럼 Future 를 async-await 구문을 사용하여, 명령형 언어처럼 순차적으로 쓸 수 있어서 가독성이 증가 된다.

import scala.concurrent.ExecutionContext.Implicits.global import scala.async.Async._ val gitignoreFile1: Future[Int] = ... val gitignoreFile2: Future[Int] = ... val asyncComputation = async { await(gitignoreFile1) + await(gitignoreFile2)

}

flatMap

먼저 스칼라 List 에서의 map 과 flatMap의 차이를 살펴보자.

scala> val fruits = Seq("apple", "banana", "orange")
fruits: Seq[java.lang.String] = List(apple, banana, orange)

scala> fruits.map(_.toUpperCase)
res0: Seq[java.lang.String] = List(APPLE, BANANA, ORANGE)

scala> fruits.flatMap(_.toUpperCase)
res1: Seq[Char] = List(A, P, P, L, E, B, A, N, A, N, A, O, R, A, N, G, E)

map 은 받아드린 리스트의 하나의 값에 대해서 따로 실행하여 개별 List 를 만들었고
flatMap 은 따로 실행하여 큰 뭉치의 List 로 만들었다. 즉 하나의 큰 List 로 평평하게 펴서 나열했다는 의미. 

또 다른 flatMap 의 특징을 알아보자.

def toInt(s: String): Option[Int] = {
    try {
        Some(Integer.parseInt(s.trim))
    } catch {
        // catch Exception to catch null 's'
        case e: Exception => None
    }
}

scala> val strings = Seq("1", "2", "foo", "3", "bar")
strings: Seq[java.lang.String] = List(1, 2, foo, 3, bar)

scala> strings.map(toInt)
res0: Seq[Option[Int]] = List(Some(1), Some(2), None, Some(3), None)

scala> strings.flatMap(toInt)
res1: Seq[Int] = List(1, 2, 3)

scala> strings.flatMap(toInt).sum
res2: Int = 6
 None 은 무시하고 Some 타입만, 타입을 벗겨서 처리하고있다.

이제 Future 에서 어떻게 flatMap을 정의하였는지 살펴보자. 

val netiquette = Future { Source.fromURL("http://www.ietf.org/rfc/rfc1855.txt").mkString }
val urlSpec = Future { Source.fromURL("http://www.w3.org/Addressing/URL/url-spec.txt").mkString }
val answer = netiquette.flatMap { nettext =>
urlSpec.map { urltext =>
"First, read this: " + nettext + ". Now, try this: " + urltext
}
}

answer foreach {
case contents => log(contents)
}

FlatMap 을 사용하였다. 이것은 Map과는 다르게 현재 퓨처가 완료되고 그 결과값에 f 를 적용해서 생기는 새로운 퓨처가 완료되어야만 완료된다. 즉 아래와 같다.

def map[S](f: T => S) (implicit e : ExecutionContext) : Future[S] 

def flatMap[S](f: T => Future[S]) (implicit e : ExecutionContext) : Future[S] 

위의 코드는 결국 nettext 라는 값을 A퓨처의 결과로받고, urltext 라는 값 또한 B라는 퓨처의 결과로 받아서     하나의 처리를 거친 후에 answer 라는 새로운  Future[S] 를 반환한다는것을 말해주고있다. 

object FuturesFlatMap extends App {
import scala.concurrent._
import ExecutionContext.Implicits.global
import scala.io.Source

val netiquette = Future { Source.fromURL("http://www.ietf.org/rfc/rfc1855.txt").mkString }
val urlSpec = Future { Source.fromURL("http://www.w3.org/Addressing/URL/url-spec.txt").mkString }
val answer = for {
nettext <- netiquette
urltext <- urlSpec
} yield {
"First of all, read this: " + nettext + " Once you're done, try this: " + urltext
}

answer foreach {
case contents => log(contents)
}

Thread.sleep(1000*10)
}

보기 간단하게 for comprehesion 으로 바꾸었다.

for comprehension 보다 더 이해하기 쉬운 방법으로는 async, awit 가 있는데 여기서는 생략한다.

다음과 같은 Data Flow 로 그려볼 수 있다.  Reactive 파라다임에서 말하는  
"모든 지점에서 블럭 되지 않게 하자. 자연스럽게 이벤트가 흘러다니도록 하자." 
이 되겠다.

5) 이후에 알아야 할 것들

기존 콜백기반의API와 퓨처사이에 다리를 놓고 잘 조합하기 위해서는 퓨처계산을 원하는 대로 정의할수있는 Promise를 알아야한다. 또한 퓨처들의 종료를  기다렸다가 무언가를 처리해야 하는 경우에 (블록이 가끔 필요하다) Await.readyAwait.result 등도 알아야한다.

위의 모든것들을 편하게 사용하게 하는 스칼라 비동기 라이브러리가 있다. 바로 asyncawait 인데 이것을 사용하면 조금은 더 편하게 Future 를 활용할 수 있다. 

마지막으로 결과를 한번만 처리 할 수 있는 퓨처말고 동일한 계산으로 부터 여러 번 서로 다른 이벤트를 받아야 하는 경우 ( 파일 다운로드 상태 추적등) 에는 이벤트 스트림에 대해서 알아야한다. 이러한 이벤트 스트림을 Observable[T] 라는 타입으로 표현했는데 이에 관한 내용도 알아야한다. Akka Streaming 이라는 도구도 있다.

6) 한꺼번에 이해하기 

아래 카카오에서 발행한 글을 통해 다시 복습해서 이해해보면 좋을 거 같다.

카카오톡 - 스칼라 Future 를 통한 모나드 이해 

7) 비동기식 퓨처 연산 정리 

  설명

 예제 

 설명 

 failbackTo 

 nextFtr(1) failbackTo nextFtr(2) 

 두번째 퓨쳐를 첫번째 연결하고 새로운 종합적인 퓨처를 반환함. 첫 번째 퓨처가 성공적이지 않다면 두 번째 퓨처가 호출됨 

 flatMap 

 nextFtr(1).flatMap(int => nextFtr()) 

 두번째 퓨처를 첫 번째에 연결하고 새로운 종합적인 퓨처를 반환함. 첫 번째가 성공적이라면 그 반환 값이 두 번째를 호출하는데 사용됨 

 map 

 nextFtr(1) map (_ * 2) 

 주어진 함수를 퓨처에 연결하고 새로운 종합적인 퓨처를 반환함. 퓨처가 성공적이라면 그 반환 값이 해당 함수를 호출할 때 사용됨 

 onComplete 

 nextFtr() onComplete
  { _ getOrElse 0 }  

 퓨처의 작업이 완료된 후 주어진 함수가 값 또는 예외를 포함한 Try를 이용하여 호출됨 

 onFailure 

 * 2.12 부터 사장됨 

 

 onSuccess  

 * 2.12 부터 사장됨 

 

 Future.sequence

 concurrent.Future sequence List(nextFtre(1),nextFtr(5))  

 주어진 시퀀스에서 퓨처를 벙행으로 실행하여 새로운 퓨처를 반환함. 시퀀스 내의 모든 퓨처가 성공하면 이들의 반환값의 리스트가 반환됨. 그렇지 않으면 그 시퀀스내에서 처음으로 발생한 예외가 반환됨  

* 러닝스칼라 (제이슨스와츠,제이펍 발생) 에서 발췌


레퍼런스:

1.스칼라 동시성 프로그래밍

2.러닝 스칼라 

Comments