'스칼라 동시성'에 해당하는 글 2건


Promise

이전 블로그글에서 Future 에 대해서 먼저 읽고 오자. (안읽으셨다면..) 
스칼라의 Promise 는 Future 의 일반화라고 볼 수 있다. (스칼라의 Promise 다. 다른 언어 라이브러리들이 모두 Future, Promise 등에 대한 정의/구현이 조금씩 다를 수 있다. 요즘 처럼 동시성 이슈가 많은 시절에는 원할한 의사 소통을 위해 구분 지어야 할 것이다)  따라서 Future 에 대해서 학습했다면 쉽게 이해 가능하다.

Future 에서는 보통 위임 행동이 강결합되있었다. 즉 future.( 행동 )  
하지만 Promise 는 Promise 만 먼저 선언해두고 나중에 success 를 호출해서 완료된것을 알려준다.

즉 Future 는 아래와 같은 시나리오라면 
다른데서 하는 행위 - 그 행위에 대한 실제 결과 -  미리 받은 Future - 실제 결과를 이용한 행동

Promise 는 
다른데서 하는 행위가 빠졌고 - 실제 결과를 돌려주는 타이밍을 스스로 결정  - 실제 결과를 이용한 행동 
으로 볼 수 있다.

다음 예를보자.

Promise - 1

import scala.concurrent._
import ExecutionContext.Implicits.global

val p = Promise[String]

p.future foreach {
case text => log(s"Promise p succeeded with '$text'")
}

p success "kept"

- Promise 를 미리 만들어 두었다.
- Promise 가 완료되면 진행할 행동을 foreach 의 케이스 매칭으로 만들어 두었다.
- success 를 호출해서 완료 및 인자를 전달한다.

q failure new Exception("not kept")

q.future.failed foreach {
case t => log(s"Promise q failed with $t")
}

- p sucess 가 아니라 p failure 를 호출하여 실패 상황을 알려 줄 수도 있따.


Promise - 2

import scala.concurrent._
import ExecutionContext.Implicits.global
import scala.util.control.NonFatal

def myFuture[T](body: =>T): Future[T] = {
val p = Promise[T]

global.execute(new Runnable {
def run() = try {
val result = body
p success result
} catch {
case NonFatal(e) =>
p failure e
}
})

p.future
}

val future = myFuture {
"naaa" + "na" * 8 + " Katamari Damacy!"
}

future foreach {
case text => log(text)
}

- 먼저 Promise 객체를 행동과 연결되는게 없이 만든다.
- 어떤 행동을 하게 하고 바로 Future 를 리턴시켜준다.
- 어떤 행동 (registerOnCompleteCallback) 안에서 어떤 행동이 완료되었다면 success 라는 메소드를 실행시켜서 먼저 리턴한 Future 에게 complete 신호를 보낸다. 

아직도 헥깔리실까봐 말하자면 Future 는 자신의 쓰레드를 생성하고 , 행위를 강결합해서 실행했다면  Promise 는 전혀 그러하지 않다. 그냥 다른 쓰레드 (위에서는 ForkjoinPool) 에 스며들어가서 완료시점과 전달값을 세팅만 했을뿐이다.


취소할 수 있는 Promise

퓨처 계산을 중간에 멈추고 싶을 경우가 있다. 사용자가 취소버튼을 눌렀다던가 .. 뭐 그럴 경우 중간에 멈추게 하고 거기 까지의 값을 원할 수 도 있고 등등 ..이럴때는 취소를 위한 퓨처를 다른 퓨처와 함께 조합해서 사용하는 방법인데 이게 꽤 헥깔리다. 

소스를 보자.

def cancellable[T](b: Future[Unit] => T): (Promise[Unit], Future[T]) = {
val p = Promise[Unit]
val f = Future {
val r = b(p.future)
if (!p.tryFailure(new Exception))
throw new CancellationException
r
}
(p, f)
}

val (cancel, value) = cancellable { cancel =>
var i = 0
while (i < 5) {
if (cancel.isCompleted) throw new CancellationException
Thread.sleep(500)
log(s"$i: working")
i += 1
}
"resulting value"
}

Thread.sleep(1500)

cancel trySuccess ()

log("computation cancelled!")

솔직히 이 쯤되면 그냥 저레벨 쓰레드 사용해서 대충 짜고 싶은 마음이 들기도 한다. 세상 살면서 이런 코드를 얼마나 짜겠다고.. 이걸 쉽게 이해하고 체득하려고 노력까지 해야하나 싶다.

이 코드는 말로 설명하긴 어렵겠다. 각자 생존하자. -.-;; (아래 레퍼런스의 책을 천천히 읽으시라 권유하고 싶다).  

힌트 하나만 주자면 Promise p 가 취소용으로 사용됬다는 것이다. 즉 그 동안의 코드와 다르게 사용자 측에서 Promise 을 제어한다. trySuccess() 를 사용해서~~


Await

어느 순간에는 비동기의 연속보다는 무엇인 종료될때까지 기다리고 싶을 때도 있다. 그때 사용된다.

Await.result

import scala.concurrent._
import ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.io.Source

val urlSpecSizeFuture = Future { Source.fromURL("http://www.w3.org/Addressing/URL/url-spec.txt").size }
val urlSpecSize = Await.result(urlSpecSizeFuture, 10.seconds)

log(s"url spec contains $urlSpecSize characters")

Await.result 에서 웹싸이트를 다 긁어 올때까지 기다린다.  10초만 기다린다.


Await.result

val startTime = System.nanoTime

val futures = for (_ <- 0 until 16) yield Future {
Thread.sleep(1000)
}

for (f <- futures) Await.ready(f, Duration.Inf)

val endTime = System.nanoTime

log(s"Total execution time of the program = ${(endTime - startTime) / 1000000} ms")
log(s"Note: there are ${Runtime.getRuntime.availableProcessors} CPUs on this machine")

모든 퓨터가 완료 될 때까지 무한정 기다린다.
Await.ready 는 리턴값이 없다.

16개의 퓨쳐가 실행완료 될 때까지 얼마나 걸리는지 재는 코드이다.
저게 시퀀셜하게 동작했다면 16초이상 걸렸겠지만 내 PC에서는 아래와 같은 결과가 나왔다.

main: Total execution time of the program = 4451 ms (대략 4초)

main: Note: there are 4 CPUs on this machine ( 4코어) 



레퍼런스

스칼라 동시성 프로그래밍 - 오현석 옮김



WRITTEN BY
[前草] 이승현 (wowlsh93@gmail.com)
스타코프 (데이터지능플랫폼pd) (관심분야: 에너지IoT, 시계열(NILM) 데이터, 폴리글랏 프로그래밍 )

트랙백  0 , 댓글이 없습니다.
secret




ExecutorContext 


스칼라도 JVM 상에 돌아가는 언어 (네이티브 스칼라도 개발중이긴 하다.)라 자바의 라이브러리를 사용 할 수 도 있으며 자신의 라이브러리가 있다면 주로 그것을 사용하는 편 이다.

쓰레드 생성 관련해서도 자바에 있는 Executor 와 ExecutorService 를 사용할수도 있겠지만 스칼라에서는 기능이 보강된 자신만의 것도 있다. 이름하여 ExecutorContext!!

이번 포스트에서는  ExecutorContext 등 스칼라의 저레벨  (쓰레드를 직접 사용하는것은 원시레벨) 동시성 도구들에 대해 알아보자. 

먼저 자바의 경우 

Executor 는 그냥 Runnable 타입을 내부 쓰레드풀을 이용해서 실행시켜주는거다.기본 Executor구현으로는  ThreadPoolExecutor (JDK 5), ForkJoinPool (JDK 7) 등이 있다.

ExecutorService  는 Executor의 서브인터페이스로써 라이프싸이클 관리 (셧다운등) + Future +  Callable 을 이용해서 리턴값 받기 정도가 추가되었다.

이제 부터 스칼라식의 코딩을 살펴보자. (이 포스트는 기초적인 내용은 대략 안다고 가정하고 코드만 살펴봄) 


서론 

스칼라에 있는 ExecutionContext 가 먼지 알아보자.

1.scala.concurrent 패키지에 있는 Executor 객체와 비슷한 기능을 제공하는 ExecutionContext 는 
주로 implicit 매개 변수로 사용된다. ExecutionContext는 두개의 추상 메소드 execute (Java Executor 메소드와 동일)와 reportFailure (Throwable 객체를 이용해서 일부 태스크가 예외를 throw 할 때마다 호출 됨)가 있다.

2. ExecutionContext에는 Java Executor 또는 ExecutorService (Java와 Scala 사이의 다리 역할을 함)에서 ExecutionContext 객체를 만들기 위해 몇 가지 메소드가 추가된  companion 객체가 있다.

3. ExecutionContext companion 객체는 내부적으로 ForkJoinPool 인스턴스를 사용하는 global이라는 기본 실행 컨텍스트를 포함한다.


코드로 말해요.


기본 쓰레드 생성

val hello = new Thread(new Runnable {
def run() {
println("hello world")
}
})

hello.start()

 ExecutorService 활용 

import java.net.{Socket, ServerSocket}
import java.util.concurrent.{Executors, ExecutorService}
import java.util.Date

class NetworkService(port: Int, poolSize: Int) extends Runnable {
val serverSocket = new ServerSocket(port)
val pool: ExecutorService = Executors.newFixedThreadPool(poolSize)

def run() {
try {
while (true) {
// This will block until a connection comes in.
val socket = serverSocket.accept()
pool.execute(new Handler(socket))
}
} finally {
pool.shutdown()
}
}
}

class Handler(socket: Socket) extends Runnable {
def message = (Thread.currentThread.getName() + "\n").getBytes

def run() {
socket.getOutputStream.write(message)
socket.getOutputStream.close()
}
}

(new NetworkService(2020, 2)).run

- 서버소켓을 생성하는 예제이다.

- NetworkService 클래스에서 클라이언트를 받아서 Pool 에 있는 쓰레드를 활용하여 멀티쓰레딩 해주고있다.

- 제공되는 고정 크기 Pool (Executors.newFixedThreadPool(poolSize)) 사용했기 때문에 기존 쓰레드의 재사용이 가능하지만 개별 쓰레드에서 굉장히 오래 걸리는 처리를 한다면 사용 가능한 쓰레드가 모잘라서 클라이언트는 화가 날 것이다. 

- 핸들러에서는 메세지를 받아서 종료하는 매우 단순한 작업을 하고 있다. 물론 Handler 에서도 while 을 돈다면 서버와 계속 통신할 수 있겠다.  이런 경우는 쓰레드풀을 사용하는 의미가 없어지긴 하겠지만


 ForkJoinPool 활용 

import scala.concurrent._
val executor = new java.util.concurrent.ForkJoinPool
executor.execute(new Runnable {
def run() = log("This task is run asynchronously.")
})

Java7 부터 등장한 ForkJoinPool 을 사용하고 있다. 이것에 관해 포스팅한 자료가 있으니 참고하도록 하자. ForkJoinPool 이란? ->  http://hamait.tistory.com/612


ExecutionContext.global 활용

import scala.concurrent._
val ectx = ExecutionContext.global
ectx.execute(new Runnable {
def run() = log("Running on the execution context.")
})

-  ForkJoinPool 을 사용하고 있는 global 


ExecutionContext 생성하기 

import scala.concurrent._
val ectx = ExecutionContext.fromExecutorService(new forkjoin.ForkJoinPool)
ectx.execute(new Runnable {
def run() = log("Running on the execution context again.")
})

- ForkJoinPool 을 이용한 ExecutorService 를 통해서 ExecutionContext 를 생성


ExecutionContext 활용 

import scala.concurrent._

def execute(body: =>Unit) = ExecutionContext.global.execute(new Runnable {
def run() = body
})

for (i <- 0 until 32) execute {
Thread.sleep(2000)
log(s"Task $i completed.")
}
Thread.sleep(10000)

- execute  : 함수를 넘기면 자동으로 쓰레드풀을 사용해서 실행해주는 함수 

- 0 ~ 32 까지, 즉 32번 execute 를 실행한다. 

- execute 뒤에 나오는 { } 사이의 것은 함수 리터럴인데 다음과 같이 정의 되는 것과 같다. 

def func(i : Int) = {
Thread.sleep(2000)
log(s"Task $i completed.")
}

for (i <- 0 until 32) execute {func(i)}

- execute 뒤에 { } 는 스칼라에서는 하나의 매개변수만 있는 함수호출시 a () 대신 a {} 를 사용 할 수 도 있기 때문에 이상한게 아니다. 이전 포스트를 참고하자 : 스칼라에서 {} 익숙해지기



레퍼런스:

https://blog.knoldus.com/2016/07/25/java-executor-vs-scala-executioncontext/

Learning Concurrent Programming in Scala by Aleksandar Prokopec

https://twitter.github.io/scala_school/ko/concurrency.html  (Scala school)


WRITTEN BY
[前草] 이승현 (wowlsh93@gmail.com)
스타코프 (데이터지능플랫폼pd) (관심분야: 에너지IoT, 시계열(NILM) 데이터, 폴리글랏 프로그래밍 )

트랙백  0 , 댓글 하나 달렸습니다.


  1. ...자바하다가 스칼라로 넘어왔는데....이것저것 찾다가 ..들어왔네요


    좋은글 감사합니다!!!
secret