일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 | 31 |
- 스칼라 강좌
- 스칼라 동시성
- Play2 로 웹 개발
- Actor
- CORDA
- 엔터프라이즈 블록체인
- Play2
- Adapter 패턴
- hyperledger fabric
- 파이썬 동시성
- akka 강좌
- 블록체인
- Akka
- 스위프트
- Hyperledger fabric gossip protocol
- 파이썬 데이터분석
- 파이썬
- Golang
- play 강좌
- 플레이프레임워크
- 스칼라
- 주키퍼
- 그라파나
- 안드로이드 웹뷰
- 하이퍼레저 패브릭
- play2 강좌
- 하이브리드앱
- 파이썬 강좌
- 이더리움
- 파이썬 머신러닝
- Today
- Total
HAMA 블로그
스칼라 강좌 (34) 동시성을 위한 Future 본문
1. Future
2. Promise
3. Awiat
4. async
5. Observable
6. 병렬
Future
스칼라에서의 Future 는 꽤 다양한 방식으로 사용 할 수있는데
먼저 스칼라에서의 Future 모양을 살펴보자.
trait Future[T]
: 퓨쳐값을 나타낸다. T 타입의 실제 우리가 리턴받기 원하는 객체를 포함한다.
def apply[T](b: =>T) (implicit e: ExecutionContext) : Future[T]
: 퓨처 계산을 나타낸다. 실제 계산을 수행하는 함수를 매개변수로 넣어주고 있다.
: 암시적으로 ExecutionContext 가 매개변수로 들어간다. 즉 쓰레드풀을 넣어주는것.
퓨쳐값 과 퓨쳐계산을 잘 구분해서 기억해두자.
1) 퓨쳐 실행
object FuturesComputation extends App {
import scala.concurrent._
import ExecutionContext.Implicits.global
Future {
log(s"the future is here")
}
log(s"the future is coming")
}
2개의 로그중 어떤것이 먼저 출력되는지는 비 결정적이다. (물론 아래 log 가 먼저 찍힐 가능성이 크겠지만) Future 를 통해서 자연스럽게 첫번째 로그는 자신의 쓰레드안에서 독립적으로 실행된다.
Future{...} 는 예상했다시피 Future.apply 메소드 호출을 생략되진 것이다. 스칼라에서 함수호출시 () 대신 {} 를 이용할 수 있다. 단 인자가 하나일 경우에만. {..} 의 해당내용은 apply 의 매개변수로 들어 갈 것이며 Future 값을 반환 할 것이다.
2) 퓨쳐 실행 그리고 Future 리턴
object FuturesDataType extends App {
import scala.concurrent._
import ExecutionContext.Implicits.global
import scala.io.Source
val myFuture: Future[String] = Future {
val f = Source.fromFile("build.sbt")
try f.getLines.mkString("\n") finally f.close()
}
log(s"started reading build file asynchronously")
log(s"status: ${myFuture.isCompleted}")
Thread.sleep(250)
log(s"status: ${myFuture.isCompleted}")
log(s"status: ${myFuture.value}")
}
- 파일을 읽는 일을 Future 계산을 통해서 위임하고 바로 Future [String] 값을 리턴받는다.
- 리턴 받은 Future 에서 계산 완료가 되었는지 계속 확인 한 후에 (즉 계속 폴링) 실제 결과 값을 찍어준다.
- 이렇게 Future 를 사용하면 사실 비동기를 사용하는 의미가 줄어 든다.
예를들어 스타벅스에서 1분마다 카운터에 가서 계속 내 커피가 나왔는지 물어 보지 말고, 커피가 나왔으면 커피를 내 자리로 바로 배달해주도록 하는 것도 커피를 주문 할 때 알려주면 안될까?
자 그 역할을 하는 콜백함수를 Future 에 넣어보자.
3) 퓨쳐 와 콜백
object FuturesCallbacks extends App {
import scala.concurrent._
import ExecutionContext.Implicits.global
import scala.io.Source
def getUrlSpec(): Future[Seq[String]] = Future {
val f = Source.fromURL("http://www.w3.org/Addressing/URL/url-spec.txt")
try f.getLines.toList finally f.close()
}
val urlSpec: Future[Seq[String]] = getUrlSpec()
def find(lines: Seq[String], word: String) = lines.zipWithIndex collect {
case (line, n) if line.contains(word) => (n, line)
} mkString("\n")
urlSpec foreach {
lines => log(s"Found occurrences of 'telnet'\n${find(lines, "telnet")}\n")
}
urlSpec foreach {
lines => log(s"Found occurrences of 'password'\n${find(lines, "password")}\n")
}
log("callbacks installed, continuing with other work")
Thread.sleep(1000 * 10)}
- 웹페이지를 긁어 오는 Future 를 실행하고 퓨처값[리스트]을 리턴 받는다.
- foreach 메소드를 통해서 최종완료된 값을 처리해 준다. (콜백처리)
- 가져온 데이터에서 "telnet" , "password" 가 있는 라인만 추려서 출력해준다.
3) 퓨쳐 와 예외
object FuturesFailure extends App {
import scala.concurrent._
import ExecutionContext.Implicits.global
import scala.io.Source
val urlSpec: Future[String] = Future {
Source.fromURL("http://www.w3.org/non-existent-url-spec.txt").mkString
//Source.fromURL("http://www.w3.org/Addressing/URL/url-spec.txt").mkString
}
// urlSpec foreach {
// value => log(s"Found occurrences of 'telnet'\n${value}\n")
// }
urlSpec.failed foreach {
case t => log(s"exception occurred - $t")
}
Thread.sleep(1000*10)
}
- 보통 Future 계산에서 예외를 던지면 그에 대응하는 Future 값은 값으로 완료 될 수 없다.
- 스칼라 Future 는 성공적으로 완료되거나, 실패로 완료된다. 퓨처가 실패로 완료되는 경우를 퓨쳐가 실패 했다라고 말한다.
- foreach 메소드는 성공적으로 완료한 퓨처에서만 값을 받는 콜백만을 받는다.
- 따라서 실패 콜백을 지정하기 위해서는 다른 메소드가 필요하다. 그게 failed 라 한다.
- failed 메소드는 퓨처가 실패하면서 내놓는 예외를 포함하는 Future[Throwable] 객체를 반환한다.
object FuturesExceptions extends App {
import scala.concurrent._
import ExecutionContext.Implicits.global
import scala.io.Source
import scala.util.{Try, Success, Failure}
val file = Future { Source.fromFile(".gitignore-SAMPLE").getLines.mkString("\n") }
file foreach {
text => log(text)
}
file.failed foreach {
case fnfe: java.io.FileNotFoundException => log(s"Cannot find file - $fnfe")
case t => log(s"Failed due to $t")
}
file onComplete {
case Success(text) => log(text)
case Failure(t) => log(s"Failed due to $t")
}
Thread.sleep(1000*10)
}
- 간결하게 onComplete 를 통해서도 성공과 실패를 처리 함께 할 수 있다.
- onComplete 함수안에는 Try[T] 가 패턴매칭으로 들어간다.
object FuturesTry extends App {
import scala.util._
val threadName: Try[String] = Try(Thread.currentThread.getName)
val someText: Try[String] = Try("Try objects are created synchronously")
val message: Try[String] = for {
tn <- threadName
st <- someText
} yield s"$st, t = $tn"
message match {
case Success(msg) => log(msg)
case Failure(error) => log(s"There should be no $error here.")
}
Thread.sleep(1000*10)
}
- Try[T] 타입은 Option[T] 와 비슷하지만 다른점은 실패시 정보를 추가 할 수 있다는 점이다. Option[T] 같은 경우 그냥 None 일 뿐이지 않는가.
- Try[T] 는 Future[T] 와 달리 동기적으로 다루어진다.
4) 퓨쳐 와 합성
map
콜백은 유용하지만 프로그램이 커지면 이를 사용한 프로그램 흐름에 대한 추론이 어려워 진다.
또한 콜백을 사용하면 특정 패턴의 비동기 프로그래밍을 사용할 수 없게 된다. 특히 여러 퓨처에 한꺼번에 한 콜백을 등록하는 것은 귀찮은 일이 된다.
val buildFile = Future { Source.fromFile("build.sbt").getLines }
val longestBuildLine = buildFile.map(lines => lines.maxBy(_.length))
longestBuildLine onComplete {
case Success(line) => log(s"the longest line is '$line'")
}
map 은 기존 퓨처에 있는 값을 사용해서 새 퓨처를 만드는 메소드이다.
def map[S](f: T => S) (implicit e : ExecutionContext) : Future[S]
위의 예에서는 1. build.sbt 파일에서 각 라인들을 읽어서 2. 그 중에서 가장 긴 라인을 출력하는 2가지의 함수가 합성되었다.이걸 쓰레드를 직접만들어서 했을 때와 비교하면 얼마나 간단해 졌는지 실감 할 수 있을 것이다.
for complehension
val gitignoreFile = Future { Source.fromFile(".gitignore-SAMPLE").getLines }
val longestGitignoreLine = for (lines <- gitignoreFile) yield lines.maxBy(_.length)
longestBuildLine onComplete {
case Success(line) => log(s"the longest line is '$line'")
}
다음처럼 for comprehension 으로 처리 할 수 도 있다.
스칼라에서는 map 메소드가 있는 객체에 대해서는 for comprehension 을 허용한다.
async - await
아래 처럼 Future 를 async-await 구문을 사용하여, 명령형 언어처럼 순차적으로 쓸 수 있어서 가독성이 증가 된다.
import scala.concurrent.ExecutionContext.Implicits.global import scala.async.Async._ val gitignoreFile1: Future[Int] = ... val gitignoreFile2: Future[Int] = ... val asyncComputation = async { await(gitignoreFile1) + await(gitignoreFile2)
}
flatMap
먼저 스칼라 List 에서의 map 과 flatMap의 차이를 살펴보자.
scala> val fruits = Seq("apple", "banana", "orange") fruits: Seq[java.lang.String] = List(apple, banana, orange) scala> fruits.map(_.toUpperCase) res0: Seq[java.lang.String] = List(APPLE, BANANA, ORANGE) scala> fruits.flatMap(_.toUpperCase) res1: Seq[Char] = List(A, P, P, L, E, B, A, N, A, N, A, O, R, A, N, G, E)
map 은 받아드린 리스트의 하나의 값에 대해서 따로 실행하여 개별 List 를 만들었고
flatMap 은 따로 실행하여 큰 뭉치의 List 로 만들었다. 즉 하나의 큰 List 로 평평하게 펴서 나열했다는 의미.
또 다른 flatMap 의 특징을 알아보자.
def toInt(s: String): Option[Int] = { try { Some(Integer.parseInt(s.trim)) } catch { // catch Exception to catch null 's' case e: Exception => None } }
scala> val strings = Seq("1", "2", "foo", "3", "bar") strings: Seq[java.lang.String] = List(1, 2, foo, 3, bar) scala> strings.map(toInt) res0: Seq[Option[Int]] = List(Some(1), Some(2), None, Some(3), None) scala> strings.flatMap(toInt) res1: Seq[Int] = List(1, 2, 3) scala> strings.flatMap(toInt).sum res2: Int = 6
이제 Future 에서 어떻게 flatMap을 정의하였는지 살펴보자.
val netiquette = Future { Source.fromURL("http://www.ietf.org/rfc/rfc1855.txt").mkString }
val urlSpec = Future { Source.fromURL("http://www.w3.org/Addressing/URL/url-spec.txt").mkString }
val answer = netiquette.flatMap { nettext =>
urlSpec.map { urltext =>
"First, read this: " + nettext + ". Now, try this: " + urltext
}
}
answer foreach {
case contents => log(contents)
}
FlatMap 을 사용하였다. 이것은 Map과는 다르게 현재 퓨처가 완료되고 그 결과값에 f 를 적용해서 생기는 새로운 퓨처가 완료되어야만 완료된다. 즉 아래와 같다.
def map[S](f: T => S) (implicit e : ExecutionContext) : Future[S]
def flatMap[S](f: T => Future[S]) (implicit e : ExecutionContext) : Future[S]
위의 코드는 결국 nettext 라는 값을 A퓨처의 결과로받고, urltext 라는 값 또한 B라는 퓨처의 결과로 받아서 하나의 처리를 거친 후에 answer 라는 새로운 Future[S] 를 반환한다는것을 말해주고있다.
object FuturesFlatMap extends App {
import scala.concurrent._
import ExecutionContext.Implicits.global
import scala.io.Source
val netiquette = Future { Source.fromURL("http://www.ietf.org/rfc/rfc1855.txt").mkString }
val urlSpec = Future { Source.fromURL("http://www.w3.org/Addressing/URL/url-spec.txt").mkString }
val answer = for {
nettext <- netiquette
urltext <- urlSpec
} yield {
"First of all, read this: " + nettext + " Once you're done, try this: " + urltext
}
answer foreach {
case contents => log(contents)
}
Thread.sleep(1000*10)
}
보기 간단하게 for comprehesion 으로 바꾸었다.
for comprehension 보다 더 이해하기 쉬운 방법으로는 async, awit 가 있는데 여기서는 생략한다.
다음과 같은 Data Flow 로 그려볼 수 있다. Reactive 파라다임에서 말하는
"모든 지점에서 블럭 되지 않게 하자. 자연스럽게 이벤트가 흘러다니도록 하자." 이 되겠다.
5) 이후에 알아야 할 것들
기존 콜백기반의API와 퓨처사이에 다리를 놓고 잘 조합하기 위해서는 퓨처계산을 원하는 대로 정의할수있는 Promise를 알아야한다. 또한 퓨처들의 종료를 기다렸다가 무언가를 처리해야 하는 경우에 (블록이 가끔 필요하다) Await.ready 와 Await.result 등도 알아야한다.
위의 모든것들을 편하게 사용하게 하는 스칼라 비동기 라이브러리가 있다. 바로 async 와 await 인데 이것을 사용하면 조금은 더 편하게 Future 를 활용할 수 있다.
마지막으로 결과를 한번만 처리 할 수 있는 퓨처말고 동일한 계산으로 부터 여러 번 서로 다른 이벤트를 받아야 하는 경우 ( 파일 다운로드 상태 추적등) 에는 이벤트 스트림에 대해서 알아야한다. 이러한 이벤트 스트림을 Observable[T] 라는 타입으로 표현했는데 이에 관한 내용도 알아야한다. Akka Streaming 이라는 도구도 있다.
6) 한꺼번에 이해하기
아래 카카오에서 발행한 글을 통해 다시 복습해서 이해해보면 좋을 거 같다.
7) 비동기식 퓨처 연산 정리
설명 |
예제 |
설명 |
failbackTo |
nextFtr(1) failbackTo nextFtr(2) |
두번째 퓨쳐를 첫번째 연결하고 새로운 종합적인 퓨처를 반환함. 첫 번째 퓨처가 성공적이지 않다면 두 번째 퓨처가 호출됨 |
flatMap |
nextFtr(1).flatMap(int => nextFtr()) |
두번째 퓨처를 첫 번째에 연결하고 새로운 종합적인 퓨처를 반환함. 첫 번째가 성공적이라면 그 반환 값이 두 번째를 호출하는데 사용됨 |
map |
nextFtr(1) map (_ * 2) |
주어진 함수를 퓨처에 연결하고 새로운 종합적인 퓨처를 반환함. 퓨처가 성공적이라면 그 반환 값이 해당 함수를 호출할 때 사용됨 |
onComplete |
nextFtr() onComplete |
퓨처의 작업이 완료된 후 주어진 함수가 값 또는 예외를 포함한 Try를 이용하여 호출됨 |
onFailure |
* 2.12 부터 사장됨 |
|
onSuccess |
* 2.12 부터 사장됨 |
|
Future.sequence |
concurrent.Future sequence List(nextFtre(1),nextFtr(5)) |
주어진 시퀀스에서 퓨처를 벙행으로 실행하여 새로운 퓨처를 반환함. 시퀀스 내의 모든 퓨처가 성공하면 이들의 반환값의 리스트가 반환됨. 그렇지 않으면 그 시퀀스내에서 처음으로 발생한 예외가 반환됨 |
* 러닝스칼라 (제이슨스와츠,제이펍 발생) 에서 발췌
레퍼런스:
2.러닝 스칼라
'Scala' 카테고리의 다른 글
스칼라 강좌 (36) {} 익숙해지기, {} 는 어디서 사용되는가? (0) | 2017.02.17 |
---|---|
스칼라 강좌 (35) lambda 익숙해지기 (0) | 2017.02.17 |
스칼라 강좌 (32) - 가변 인자 처리 ( * 과 _* ) (0) | 2016.12.15 |
스칼라 강좌 (32) -타입기초/ 타입별칭 /추상 타입/ 타입 경계 / 변성 (0) | 2016.12.07 |
스칼라 강좌 (31) - 스칼라에서 사용되는 심볼들 (0) | 2016.12.07 |