일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 | 31 |
- Adapter 패턴
- 이더리움
- akka 강좌
- 그라파나
- 파이썬 동시성
- 하이퍼레저 패브릭
- Play2 로 웹 개발
- 플레이프레임워크
- 스위프트
- Golang
- play 강좌
- 스칼라
- 스칼라 동시성
- 파이썬 강좌
- 주키퍼
- play2 강좌
- 스칼라 강좌
- 파이썬
- 엔터프라이즈 블록체인
- 안드로이드 웹뷰
- CORDA
- 파이썬 머신러닝
- 하이브리드앱
- Akka
- Hyperledger fabric gossip protocol
- hyperledger fabric
- Play2
- 파이썬 데이터분석
- 블록체인
- Actor
- Today
- Total
HAMA 블로그
스칼라 강좌 (38) 동시성을 위한 Collections 본문
동시성을 위한 스칼라 Collections
스칼라도 자바5에서 등장한 자바동시성 도구들 처럼 (을 사용한) 쓰레드 안전한 콜렉션들이 있다. 대표적으로 ConcurrentHashMap. LinkedBlockingQueue 등...concurrent.TrieMap 같이 스칼라 전용으로 추가된 것들도 있다.
대략적인 것은 안다고 가정하고 스칼라 코드를 살펴보자.
* 이런 컬렉션들은 여러 쓰레드들에 의해 공유된다. 공유되지 않는 방법으로 동시성을 처리하는 경우도 있는데 이런 경우 공유 되지 않기 때문에 락을 걸 필요가 없어진다. (액터 등)
코드로 말해요.
LinkedBlockingQueue
object CollectionsIterators extends App {
import java.util.concurrent._
val queue = new LinkedBlockingQueue[String]
for (i <- 1 to 5500) queue.offer(i.toString)
execute {
val it = queue.iterator
while (it.hasNext) log(it.next())
}
for (i <- 1 to 5500) queue.poll()
}
- 생산자 소비자 패턴에서 유용하게 사용될 수 있는 쓰레드 안전 큐이다.
ConcurrentHashMap
object CollectionsConcurrentMap extends App {
import java.util.concurrent.ConcurrentHashMap
import scala.collection.convert.decorateAsScala._
def execute(body: =>Unit) = ExecutionContext.global.execute(new Runnable {
def run() = body
})
val emails = new ConcurrentHashMap[String, List[String]]().asScala
execute {
emails("James Gosling") = List("james@javalove.com")
log(s"emails = $emails")
}
execute {
emails.putIfAbsent("Alexey Pajitnov", List("alexey@tetris.com"))
log(s"emails = $emails")
}
execute {
emails.putIfAbsent("Alexey Pajitnov", List("alexey@welltris.com"))
log(s"emails = $emails")
}
}
- execute : 함수를 넘기면 자동으로 쓰레드풀을 사용해서 실행해주는 함수
- 0 ~ 32 까지, 즉 32번 execute 를 실행한다.
- execute 뒤에 나오는 { } 사이의 것은 함수 리터럴인데 다음과 같이 정의 되는 것과 같다.
def func() = {
emails("James Gosling") = List("james@javalove.com") log(s"emails = $emails")
}
execute {func(i)}
- execute 뒤에 { } 는 스칼라에서는 하나의 매개변수만 있는 함수호출시 a () 대신 a {} 를 사용 할 수 도 있기 때문에 이상한게 아니다. 이전 포스트를 참고하자 : 스칼라에서 {} 익숙해지기
- val emails = new ConcurrentHashMap[String, List[String]]().asScala : 자바의 것을 스칼라에서 활용
- 여러 개의 쓰레드에서 동시에 HashMap에 리스트를 추가해도 쓰레드 안전하다.
- putIfAbsent : 만약 해당 키에 대한 요소가 없다면 추가하라.
다른 예
object CollectionsConcurrentMapBulk extends App {
import scala.collection._
import scala.collection.convert.decorateAsScala._
import java.util.concurrent.ConcurrentHashMap
val names = new ConcurrentHashMap[String, Int]().asScala
names("Johnny") = 0
names("Jane") = 0
names("Jack") = 0
execute {
for (n <- 0 until 10) names(s"John $n") = n
}
execute {
for (n <- names) log(s"name: $n")
}
}
이것의 결과는 아래와 같다. 어라 요소가 많이 빠졌네?
scala-execution-context-global-12: name: (Johnny,0)
scala-execution-context-global-12: name: (Jack,0)
scala-execution-context-global-12: name: (John 4,4)
scala-execution-context-global-12: name: (Jane,0)
scala-execution-context-global-12: name: (John 3,3)
scala-execution-context-global-12: name: (John 2,2)
scala-execution-context-global-12: name: (John 1,1)
즉 아래의 execute 에서 맵의 요소를 가져오는 타이밍에 첫번째 execute 에 0~10까지 모든 숫자가 들어가지 않았기 때문이다. 결국 쓰레드들 끼리 맵에 번갈아가면서 접근 한다는 것.
concurrent.TrieMap
아래 경우는 어떻게 될까?
object CollectionsTrieMapBulk extends App {
import scala.collection._
def execute(body: =>Unit) = ExecutionContext.global.execute(new Runnable {
def run() = body
})
val names = new concurrent.TrieMap[String, Int]
names("Janice") = 0
names("Jackie") = 0
names("Jill") = 0
execute {
for (n <- 10 until 100) names(s"John $n") = n
}
execute {
log("snapshot time!")
for (n <- names.map(_._1).toSeq.sorted) log(s"name: $n")
}
Thread.sleep(1000* 10)
}
결과는 모든 요소들이 다 출력된다.
이유는 ? TrieMap 경우는 다른 쓰레드의 액션을 방해 하지 않는다. 따라서 성능에 문제가 생길 소지도 있다.
적절한 곳에 사용해야한다. ( 파일 관련 처리에서 파일을 다 찾은 후에 어떤 액션을 하게 하는 경우)
CounterDownLatch
여러 쓰레드들을 실행시켜두고 모두 종료되기만을 기다릴 때 사용.
분산 시스템에서도 Zookeeper 등을 활용하여 이러한 전역 동기락은 자주 사용된다.
val startLatch = new CountDownLatch(4)
class Printer(val message: String, val iterations: Int) extends Runnable {
def run(): Unit = {
for (i <- 0 until iterations)
println(message)
startLatch.countDown()
}
}
val iterations = 1
val threads = List(
new Thread(new Printer("the first message", iterations)),
new Thread(new Printer("the second message", iterations)),
new Thread(new Printer("another message", iterations)))
threads.foreach(_.start())
startLatch.await()
log ("end")
"end" 는 과연 출력 될 까요?
답은 안된다. startLatch.await() 는 startLatch.countDown() 이 4번 호출 될 때 까지 기다린다.
레퍼런스
Learning Concurrent Programming in Scala by Aleksandar Prokopec
'Scala' 카테고리의 다른 글
스칼라 강좌 (40) 동시성을 위한 Promise 와 Await (0) | 2017.02.21 |
---|---|
스칼라 강좌 (39) 동시성을 위한 Observable (0) | 2017.02.21 |
스칼라 강좌 (37) 동시성을 위한 ExecutorContext (1) | 2017.02.21 |
스칼라 강좌 (36) {} 익숙해지기, {} 는 어디서 사용되는가? (0) | 2017.02.17 |
스칼라 강좌 (35) lambda 익숙해지기 (0) | 2017.02.17 |