관리 메뉴

HAMA 블로그

스칼라 강좌 (38) 동시성을 위한 Collections 본문

Scala

스칼라 강좌 (38) 동시성을 위한 Collections

[하마] 이승현 (wowlsh93@gmail.com) 2017. 2. 21. 11:43


동시성을 위한 스칼라 Collections 


스칼라도 자바5에서 등장한 자바동시성 도구들 처럼 (을 사용한) 쓰레드 안전한 콜렉션들이 있다.  대표적으로 ConcurrentHashMap. LinkedBlockingQueue 등...concurrent.TrieMap 같이 스칼라 전용으로 추가된 것들도 있다. 

대략적인 것은 안다고 가정하고 스칼라 코드를 살펴보자.

* 이런 컬렉션들은 여러 쓰레드들에 의해 공유된다. 공유되지 않는 방법으로 동시성을 처리하는 경우도 있는데 이런 경우 공유 되지 않기 때문에 락을 걸 필요가 없어진다. (액터 등)


코드로 말해요.

LinkedBlockingQueue

object CollectionsIterators extends App {
import java.util.concurrent._

val queue = new LinkedBlockingQueue[String]
for (i <- 1 to 5500) queue.offer(i.toString)
execute {
val it = queue.iterator
while (it.hasNext) log(it.next())
}
for (i <- 1 to 5500) queue.poll()
}

- 생산자 소비자 패턴에서 유용하게 사용될 수 있는 쓰레드 안전 큐이다.


ConcurrentHashMap


object CollectionsConcurrentMap extends App {
import java.util.concurrent.ConcurrentHashMap
import scala.collection.convert.decorateAsScala._

def execute(body: =>Unit) = ExecutionContext.global.execute(new Runnable {
def run() = body
})
val emails = new ConcurrentHashMap[String, List[String]]().asScala

execute {
emails("James Gosling") = List("james@javalove.com")
log(s"emails = $emails")
}

execute {
emails.putIfAbsent("Alexey Pajitnov", List("alexey@tetris.com"))
log(s"emails = $emails")
}

execute {
emails.putIfAbsent("Alexey Pajitnov", List("alexey@welltris.com"))
log(s"emails = $emails")
}

}

- execute  : 함수를 넘기면 자동으로 쓰레드풀을 사용해서 실행해주는 함수 

- 0 ~ 32 까지, 즉 32번 execute 를 실행한다. 

- execute 뒤에 나오는 { } 사이의 것은 함수 리터럴인데 다음과 같이 정의 되는 것과 같다. 

def func() = {
   
emails("James Gosling") = List("james@javalove.com") log(s"emails = $emails")
}

execute {func(i)}

- execute 뒤에 { } 는 스칼라에서는 하나의 매개변수만 있는 함수호출시 a () 대신 a {} 를 사용 할 수 도 있기 때문에 이상한게 아니다. 이전 포스트를 참고하자 : 스칼라에서 {} 익숙해지기

val emails = new ConcurrentHashMap[String, List[String]]().asScala : 자바의 것을 스칼라에서 활용 

- 여러 개의 쓰레드에서 동시에 HashMap에 리스트를 추가해도 쓰레드 안전하다. 

- putIfAbsent : 만약 해당 키에 대한 요소가 없다면 추가하라. 


다른 예 


object CollectionsConcurrentMapBulk extends App {
import scala.collection._
import scala.collection.convert.decorateAsScala._
import java.util.concurrent.ConcurrentHashMap

val names = new ConcurrentHashMap[String, Int]().asScala
names("Johnny") = 0
names("Jane") = 0
names("Jack") = 0

execute {
for (n <- 0 until 10) names(s"John $n") = n
}

execute {
for (n <- names) log(s"name: $n")
}

}

이것의 결과는 아래와 같다. 어라 요소가 많이 빠졌네? 

scala-execution-context-global-12: name: (Johnny,0)

scala-execution-context-global-12: name: (Jack,0)

scala-execution-context-global-12: name: (John 4,4)

scala-execution-context-global-12: name: (Jane,0)

scala-execution-context-global-12: name: (John 3,3)

scala-execution-context-global-12: name: (John 2,2)

scala-execution-context-global-12: name: (John 1,1)

즉 아래의 execute 에서 맵의 요소를 가져오는 타이밍에 첫번째 execute 에 0~10까지 모든 숫자가 들어가지 않았기 때문이다. 결국 쓰레드들 끼리 맵에 번갈아가면서 접근 한다는 것. 


concurrent.TrieMap

아래 경우는 어떻게 될까?


object CollectionsTrieMapBulk extends App {
import scala.collection._

def execute(body: =>Unit) = ExecutionContext.global.execute(new Runnable {
def run() = body
})

val names = new concurrent.TrieMap[String, Int]
names("Janice") = 0
names("Jackie") = 0
names("Jill") = 0

execute {
for (n <- 10 until 100) names(s"John $n") = n
}

execute {
log("snapshot time!")
for (n <- names.map(_._1).toSeq.sorted) log(s"name: $n")
}
Thread.sleep(1000* 10)

}

결과는 모든 요소들이 다 출력된다. 

이유는 ? TrieMap 경우는 다른 쓰레드의 액션을 방해 하지 않는다. 따라서 성능에 문제가 생길 소지도 있다.

적절한 곳에 사용해야한다. ( 파일 관련 처리에서 파일을 다 찾은 후에 어떤 액션을 하게 하는 경우) 


CounterDownLatch

여러 쓰레드들을 실행시켜두고 모두 종료되기만을 기다릴 때 사용. 
분산 시스템에서도 Zookeeper 등을 활용하여 이러한 전역 동기락은 자주 사용된다. 

val startLatch = new CountDownLatch(4)

class Printer(val message: String, val iterations: Int) extends Runnable {
def run(): Unit = {

for (i <- 0 until iterations)
println(message)

startLatch.countDown()
}
}

val iterations = 1

val threads = List(
new Thread(new Printer("the first message", iterations)),
new Thread(new Printer("the second message", iterations)),
new Thread(new Printer("another message", iterations)))

threads.foreach(_.start())
startLatch.await()

log ("end")

"end" 는 과연 출력 될 까요?

답은 안된다. startLatch.await() 는 startLatch.countDown() 이 4번 호출 될 때 까지 기다린다.



레퍼런스

Learning Concurrent Programming in Scala by Aleksandar Prokopec

Comments