관리 메뉴

HAMA 블로그

스칼라 강좌 (37) 동시성을 위한 ExecutorContext 본문

Scala

스칼라 강좌 (37) 동시성을 위한 ExecutorContext

[하마] 이승현 (wowlsh93@gmail.com) 2017. 2. 21. 11:05




ExecutorContext 


스칼라도 JVM 상에 돌아가는 언어 (네이티브 스칼라도 개발중이긴 하다.)라 자바의 라이브러리를 사용 할 수 도 있으며 자신의 라이브러리가 있다면 주로 그것을 사용하는 편 이다.

쓰레드 생성 관련해서도 자바에 있는 Executor 와 ExecutorService 를 사용할수도 있겠지만 스칼라에서는 기능이 보강된 자신만의 것도 있다. 이름하여 ExecutorContext!!

이번 포스트에서는  ExecutorContext 등 스칼라의 저레벨  (쓰레드를 직접 사용하는것은 원시레벨) 동시성 도구들에 대해 알아보자. 

먼저 자바의 경우 

Executor 는 그냥 Runnable 타입을 내부 쓰레드풀을 이용해서 실행시켜주는거다.기본 Executor구현으로는  ThreadPoolExecutor (JDK 5), ForkJoinPool (JDK 7) 등이 있다.

ExecutorService  는 Executor의 서브인터페이스로써 라이프싸이클 관리 (셧다운등) + Future +  Callable 을 이용해서 리턴값 받기 정도가 추가되었다.

이제 부터 스칼라식의 코딩을 살펴보자. (이 포스트는 기초적인 내용은 대략 안다고 가정하고 코드만 살펴봄) 


서론 

스칼라에 있는 ExecutionContext 가 먼지 알아보자.

1.scala.concurrent 패키지에 있는 Executor 객체와 비슷한 기능을 제공하는 ExecutionContext 는 
주로 implicit 매개 변수로 사용된다. ExecutionContext는 두개의 추상 메소드 execute (Java Executor 메소드와 동일)와 reportFailure (Throwable 객체를 이용해서 일부 태스크가 예외를 throw 할 때마다 호출 됨)가 있다.

2. ExecutionContext에는 Java Executor 또는 ExecutorService (Java와 Scala 사이의 다리 역할을 함)에서 ExecutionContext 객체를 만들기 위해 몇 가지 메소드가 추가된  companion 객체가 있다.

3. ExecutionContext companion 객체는 내부적으로 ForkJoinPool 인스턴스를 사용하는 global이라는 기본 실행 컨텍스트를 포함한다.


코드로 말해요.


기본 쓰레드 생성

val hello = new Thread(new Runnable {
def run() {
println("hello world")
}
})

hello.start()

 ExecutorService 활용 

import java.net.{Socket, ServerSocket}
import java.util.concurrent.{Executors, ExecutorService}
import java.util.Date

class NetworkService(port: Int, poolSize: Int) extends Runnable {
val serverSocket = new ServerSocket(port)
val pool: ExecutorService = Executors.newFixedThreadPool(poolSize)

def run() {
try {
while (true) {
// This will block until a connection comes in.
val socket = serverSocket.accept()
pool.execute(new Handler(socket))
}
} finally {
pool.shutdown()
}
}
}

class Handler(socket: Socket) extends Runnable {
def message = (Thread.currentThread.getName() + "\n").getBytes

def run() {
socket.getOutputStream.write(message)
socket.getOutputStream.close()
}
}

(new NetworkService(2020, 2)).run

- 서버소켓을 생성하는 예제이다.

- NetworkService 클래스에서 클라이언트를 받아서 Pool 에 있는 쓰레드를 활용하여 멀티쓰레딩 해주고있다.

- 제공되는 고정 크기 Pool (Executors.newFixedThreadPool(poolSize)) 사용했기 때문에 기존 쓰레드의 재사용이 가능하지만 개별 쓰레드에서 굉장히 오래 걸리는 처리를 한다면 사용 가능한 쓰레드가 모잘라서 클라이언트는 화가 날 것이다. 

- 핸들러에서는 메세지를 받아서 종료하는 매우 단순한 작업을 하고 있다. 물론 Handler 에서도 while 을 돈다면 서버와 계속 통신할 수 있겠다.  이런 경우는 쓰레드풀을 사용하는 의미가 없어지긴 하겠지만


 ForkJoinPool 활용 

import scala.concurrent._
val executor = new java.util.concurrent.ForkJoinPool
executor.execute(new Runnable {
def run() = log("This task is run asynchronously.")
})

Java7 부터 등장한 ForkJoinPool 을 사용하고 있다. 이것에 관해 포스팅한 자료가 있으니 참고하도록 하자. ForkJoinPool 이란? ->  http://hamait.tistory.com/612


ExecutionContext.global 활용

import scala.concurrent._
val ectx = ExecutionContext.global
ectx.execute(new Runnable {
def run() = log("Running on the execution context.")
})

-  ForkJoinPool 을 사용하고 있는 global 


ExecutionContext 생성하기 

import scala.concurrent._
val ectx = ExecutionContext.fromExecutorService(new forkjoin.ForkJoinPool)
ectx.execute(new Runnable {
def run() = log("Running on the execution context again.")
})

- ForkJoinPool 을 이용한 ExecutorService 를 통해서 ExecutionContext 를 생성


ExecutionContext 활용 

import scala.concurrent._

def execute(body: =>Unit) = ExecutionContext.global.execute(new Runnable {
def run() = body
})

for (i <- 0 until 32) execute {
Thread.sleep(2000)
log(s"Task $i completed.")
}
Thread.sleep(10000)

- execute  : 함수를 넘기면 자동으로 쓰레드풀을 사용해서 실행해주는 함수 

- 0 ~ 32 까지, 즉 32번 execute 를 실행한다. 

- execute 뒤에 나오는 { } 사이의 것은 함수 리터럴인데 다음과 같이 정의 되는 것과 같다. 

def func(i : Int) = {
Thread.sleep(2000)
log(s"Task $i completed.")
}

for (i <- 0 until 32) execute {func(i)}

- execute 뒤에 { } 는 스칼라에서는 하나의 매개변수만 있는 함수호출시 a () 대신 a {} 를 사용 할 수 도 있기 때문에 이상한게 아니다. 이전 포스트를 참고하자 : 스칼라에서 {} 익숙해지기



레퍼런스:

https://blog.knoldus.com/2016/07/25/java-executor-vs-scala-executioncontext/

Learning Concurrent Programming in Scala by Aleksandar Prokopec

https://twitter.github.io/scala_school/ko/concurrency.html  (Scala school)

Comments