스칼라 스쿨 타입 고급주제 (한글번역) 참고. 

https://twitter.github.io/scala_school/ko/advanced-types.html


먼가 좀 조잡하긴 한데....

나중에 스칼라에 대한 도가 좀 쌓이면, 풀어서 다시 정리하는 글을 올릴것이다.



         F-bounded polymorphism / recursive types

                         번역:http://blog.originate.com/blog/2014/02/27/types-inside-types-in-scala/  


지금까지 이런 타입 시그니처 본 적이 있어?   (역주: 난 얼마전에 봤다...당황 많이 했어 ;;) 

1
trait T[U <: T[U]]

처음 이런 모습을 보았을때,  "머야 이 퐝당한 코드" 같은 생각이 드는건 어쩌면 당연해. 그리고 이것에 대해 알아 보기 위해 구글링등을 하기 시작 했을테고, 결국 이 블로그를 찾아 왔을 지도? 그렇다면 잘 찾아 왔네 친구~ ^^

좋아,  저 헤괴망측한것은 도대체 뭘까? 보통 우린  trait T[U]  이렇게 써 왔잖아? 
(역주:  T[U <: T[U]] 라니? U 타입이긴 한데 T[U] 의 하위 타입이라고? U 가 두군데 저렇게 쓰여도 됨? 미리 답을 얘기하면 
여기서 U 타입은 T[U] 를 상속받은 타입만으로 강제 되는 거야. )   

알고보면  매우 간단하니깐 겁먹지 말고 , 간단한 예를 통해서 이해해 보자고. 

자 출발!

우리에게 매우 익숙한 CRUD 에 대한 코드를 짜 볼거야. 아래와 같이 case class 를 이용해서 간단하게

1
2
3
4
5
6
7
8
9
10
11
12
13
14

case class Apple(name: String, price: Double) {
  def create(entityData: String): Apple
  def read(id: String): Option[Apple]
  def update(f: Apple => Apple): Apple
  def delete(id: String): Unit
}

case class Bird(name: String, birthday: DateTime) {
  def create(entityData: String): Bird
  def read(id: String): Option[Bird]
  def update(f: Bird => Bird): Bird
  def delete(id: String): Unit
}

사과랑 새라는 두개의 클래스가 있는데, 잘 보면  두 클래스가 하는 역할이 너무 비슷하잖아? 중복되는 느낌도 들고~ 앞으로 비슷한 클래스를 작성 할때 저 코드를 또 타이핑 해야할 것을 생각하면 한숨이 나오지..그렇다면 리팩토링의 화신인 우리가 해야할 것은 무엇일까?

그래 인터페이스로 끌어 올리는거야. 스칼라에서는 trait 라는 쿨한 녀석이 있잖아?

1
2
3
4
5
6
7
8
9
10
trait CrudEntity {
  def create(entityData: String): CrudEntity
  def read(id: String): Option[CrudEntity]
  def update(f: CrudEntity => CrudEntity): CrudEntity
  def delete(id: String): Unit
}

case class Apple(name: String, age: Int) extends CrudEntity

case class Bird(name: String, hobby: String) extends CrudEntity

앗..망할 ㅋㅋ  trait 의 메소드 시그니처 보면 좀 문제가 있네. 우리가 원한것은 Apple 이면 Apple 에 대해 작업 하길 원하는데 이 코드라면  그냥 CrudEntity 를 리턴하잖아;;

CrudEntity 트레잇에 좀 더 명확한 타입을 추가해보자..

1
2
3
4
5
6
7
8
9
10
trait CrudEntity_2[E] {
  def create(entityData: String): E
  def read(id: String): Option[E]
  def update(f: E => E): E
  def delete(id: String): Unit
}

case class Apple(name: String, age: Int) extends CrudEntity_2[Apple]

case class Bird(name: String, hobby: String) extends CrudEntity_2[Bird]

좋아. 우리가 생각한건 이런거잖아. ( 대부분 여기 까지 작업하고 종료를 하겠지 ?) 
하지만 좀 만 더 생각해보면  살짝 문제가 있어.  무엇일까?

문제점은 바로 누군가 CrudEntity_2 를 이렇게 정의하면 생겨..

1
case class Orange(name: String, bankAccount: Double) extends CrudEntity_2[FloobyDust]

웁스~ 위에 코드에선 CrudEntity_2[E] 의 E타입에 제약이 없어서, 아무거나 넣어도 컴파일러가 불평불만을 안하잖아? 우리가 강력한 타입시스템을 사용하는 이유가 조금 의미 없어졌네.. Orange 만 넣도록 해야하는데 말이야.

그래 무조건 CrudEntity_2[E] 에서 E 에는 CrudEntity_2 를 상속한 녀석만 들어가도록 제약을 걸고 싶지?
짜잔~~ 이때 등장하는것이 우리가 처음 퐝당해 했던 저 코드야~! 아래를 보라고.

1
2
3
4
5
6
7
8
9
10
trait CrudEntity_3[E <: CrudEntity_3[E]] {
  def create(entityData: String): E
  def read(id: String): Option[E]
  def update(f: E => E): E
  def delete(id: String): Unit
}

case class Apple(name: String, age: Int) extends CrudEntity_3[Apple]

case class Bird(name: String, hobby: String) extends CrudEntity_3[Bird]

아주 훌륭해 졌어. 이제 E 타입은 무조건 해당 subtype 만 들어 갈 수 있어. 응? 해당 subtype?? 이라고?? 그럼 아래처럼도 되겠네?  이건 우리가 원한게 아니잖아?

1
case class Orange(name: String, age: Int) extends CrudEntity_3[Apple]

그럼 이건 어떻게 해결 해야 할까?  이 경우에 우리는 self type 을 써서 해결 할 수 있어.

이제 마지막이야..다 왔어. 친구~ CrudEntity는 다음과 같이 정의면 모든게 해결이야.
(역주: self 대신 this 로 해도되고, foo 로 해도 됩니다.)

1
2
3
4
5
6
trait CrudEntity[E <: CrudEntity[E]] { 
  self: E =>
  def create(entityData: String): E
  def read(id: String): Option[E]
  def update(f: E => E): E
  def delete(id: String): Unit
}

self: E => 를 추가하면 CrudEntity [E] 를 상속받은 객체는 반드시 자신의 타입인 E 가 되야해.

1
case class Orange(name: String, age: Int) extends CrudEntity[Apple]

Orange 는 Apple타입이 아니라는것을 컴파일러가 확실히 제약해주지.

이 정도는 해야 우리가 원하는 안전하고 확실한 강력한 타입을 가지는 코드라고 할 수 있지 않을까?
그럼 빠잉~




Try

값이 util.Try 클렉션은 에러 처리를 컬렉션 관리로 바꾸어 놓는다. 이 클렉션은 주어진 함수 매개변수에서 발생한 에러를 잡아내는 메커니즘을 제공하여 함수가 성공적으로 실행된 경우에는 함수의 결과값을 그렇지 않은 경우에는 에러를 반환한다.

추가 설명은 아래 링크를 참고하세요.  

http://blog.seulgi.kim/2015/07/monad-try.html



Try 를 이용한 에러 처리 메소드 

 이름

예제 

설명 

 flatMap 

 nextError flatMap { _ =>
   nextError }

Success인 경우 util.Try를 반환하는 함수를 호출함으로써 현재의 반환값을 새로운 내장된 반환값(또는 예외에 매핀함, 우리의 'nextError' 데모함수는 입력값을 취하지 않기 때문에 우리는 현재 Success로 부터 사용하지 않는 입력값을 나타내는 언더스코어를 사용함

 foreach

 nextError foreach(x =>
            println("success!" + x))

Success인 경우 주어진 함수를 한 번 실행하고, Failure 일 때는 실행하지 않음

 getOrElse

 nextError getOrElse 0 

Success에 내장된 값을 반환하거나,Failure인 경우 이름에 의한 매개변수의 결과를 반환함. 

 orElse

 nextError orElse nextError 

flatMap의 반대되는 메소드, Failure인 경우 그 역시Try를 반환하는 함수를 호출함.orElse로 어쩌면 Failure를 Success로 전환 할 수 있음 

 toOption 

 nextError.toOption 

 Try를 Option으로 전환하여 Success는 Some으로 Failure는 None이 됨. 내장된 Exception을 잃게 된다는 단점이 있음 

 map 

 nextError map (_ * 2)  

Success인 경우 새로운 값에 내장된 값을 매핑하는 함수를 호출함 

 매치표현식 

 nextError match { case util.Success(x) => x;
         case util.Failure(error) => -1 } 

Success를 ('x'에 저장된) 반환값으로 또는 Failure를 ('error'에 저장된) 예외로 처리하기 위해 매치 표현식을 사용함.  

 아무일도
 하지않음

 nextError 

가장 쉬운 에러 처리 방식으로, 내가 개인적으로 선호하는 방식임. 이 방식으로 단순히 예외가 잡히거나,현재의 애플리케이션을 종료시킬 때까지 호출 스택을 타고 전파되도록 그대로 둠. 이 방식은 민감한 경우에 사용하면 문제가 크겠짐나, 발새한 예왼느 결코 무시되지 않음  

*러닝스칼라(제이슨스와츠) 에서 발췌




레퍼런스:

러닝 스칼라 
프로그래밍 인 스칼라 


Promise

이전 블로그글에서 Future 에 대해서 먼저 읽고 오자. (안읽으셨다면..) 
스칼라의 Promise 는 Future 의 일반화라고 볼 수 있다. (스칼라의 Promise 다. 다른 언어 라이브러리들이 모두 Future, Promise 등에 대한 정의/구현이 조금씩 다를 수 있다. 요즘 처럼 동시성 이슈가 많은 시절에는 원할한 의사 소통을 위해 구분 지어야 할 것이다)  따라서 Future 에 대해서 학습했다면 쉽게 이해 가능하다. (여기서 일반화라는 말이 너무추상적이라 헤깔릴수 있을 텐데 좀 참아보자.)

Future 에서는 보통 위임 행동이 강결합 되있었다. 즉 future.( 행동 )  
하지만 Promise 는 Promise 만 먼저 선언해두고 나중에 success 를 호출해서 완료된것을 알려준다.
(자바의 CompletableFuture라는 작명이 오히려 더 잘 설명해 주는거 같다. Promise는 완료 시점을 스스로 알려주는 Future라고 볼 수 있다.) 


다음 예를보자.

Promise - 1

import scala.concurrent._
import ExecutionContext.Implicits.global

val p = Promise[String]

p.future foreach {
case text => log(s"Promise p succeeded with '$text'")
}

p success "kept"

- Promise 를 미리 만들어 두었다.
- Promise 가 완료되면 진행할 행동을 foreach 의 케이스 매칭으로 만들어 두었다.
- success 를 호출해서 완료 및 인자를 전달한다.

q failure new Exception("not kept")

q.future.failed foreach {
case t => log(s"Promise q failed with $t")
}

- p sucess 가 아니라 p failure 를 호출하여 실패 상황을 알려 줄 수도 있따.


Promise - 2

import scala.concurrent._
import ExecutionContext.Implicits.global
import scala.util.control.NonFatal

def myFuture[T](body: =>T): Future[T] = {
val p = Promise[T]

global.execute(new Runnable {
def run() = try {
val result = body
p success result
} catch {
case NonFatal(e) =>
p failure e
}
})

p.future
}

val future = myFuture {
"naaa" + "na" * 8 + " Katamari Damacy!"
}

future foreach {
case text => log(text)
}

- 먼저 Promise 객체를 행동과 연결되는게 없이 만든다.
- 어떤 행동을 하게 하고 바로 Future 를 리턴시켜준다.
- 어떤 행동 (registerOnCompleteCallback) 안에서 어떤 행동이 완료되었다면 success 라는 메소드를 실행시켜서 먼저 리턴한 Future 에게 complete 신호를 보낸다. 

아직도 헥깔리실까봐 말하자면 Future 는 자신의 쓰레드를 생성하고 , 행위를 강결합해서 실행했다면  Promise 는 전혀 그러하지 않다. 그냥 다른 쓰레드 (위에서는 ForkjoinPool) 에 스며들어가서 완료시점과 전달값을 세팅만 했을뿐이다.


취소할 수 있는 Promise

퓨처 계산을 중간에 멈추고 싶을 경우가 있다. 사용자가 취소버튼을 눌렀다던가 .. 뭐 그럴 경우 중간에 멈추게 하고 거기 까지의 값을 원할 수 도 있고 등등 ..이럴때는 취소를 위한 퓨처를 다른 퓨처와 함께 조합해서 사용하는 방법인데 이게 꽤 헥깔리다. 

소스를 보자.

def cancellable[T](b: Future[Unit] => T): (Promise[Unit], Future[T]) = {
val p = Promise[Unit]
val f = Future {
val r = b(p.future)
if (!p.tryFailure(new Exception))
throw new CancellationException
r
}
(p, f)
}

val (cancel, value) = cancellable { cancel =>
var i = 0
while (i < 5) {
if (cancel.isCompleted) throw new CancellationException
Thread.sleep(500)
log(s"$i: working")
i += 1
}
"resulting value"
}

Thread.sleep(1500)

cancel trySuccess ()

log("computation cancelled!")

솔직히 이 쯤되면 그냥 저레벨 쓰레드 사용해서 대충 짜고 싶은 마음이 들기도 한다. 세상 살면서 이런 코드를 얼마나 짜겠다고.. 이걸 쉽게 이해하고 체득하려고 노력까지 해야하나 싶다.

이 코드는 말로 설명하긴 어렵겠다. 각자 생존하자. -.-;; (아래 레퍼런스의 책을 천천히 읽으시라 권유하고 싶다).  

힌트 하나만 주자면 Promise p 가 취소용으로 사용됬다는 것이다. 즉 그 동안의 코드와 다르게 사용자 측에서 Promise 을 제어한다. trySuccess() 를 사용해서~~


Await

어느 순간에는 비동기의 연속보다는 무엇인 종료될때까지 기다리고 싶을 때도 있다. 그때 사용된다.

Await.result

import scala.concurrent._
import ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.io.Source

val urlSpecSizeFuture = Future { Source.fromURL("http://www.w3.org/Addressing/URL/url-spec.txt").size }
val urlSpecSize = Await.result(urlSpecSizeFuture, 10.seconds)

log(s"url spec contains $urlSpecSize characters")

Await.result 에서 웹싸이트를 다 긁어 올때까지 기다린다.  10초만 기다린다.


Await.result

val startTime = System.nanoTime

val futures = for (_ <- 0 until 16) yield Future {
Thread.sleep(1000)
}

for (f <- futures) Await.ready(f, Duration.Inf)

val endTime = System.nanoTime

log(s"Total execution time of the program = ${(endTime - startTime) / 1000000} ms")
log(s"Note: there are ${Runtime.getRuntime.availableProcessors} CPUs on this machine")

모든 퓨터가 완료 될 때까지 무한정 기다린다.
Await.ready 는 리턴값이 없다.

16개의 퓨쳐가 실행완료 될 때까지 얼마나 걸리는지 재는 코드이다.
저게 시퀀셜하게 동작했다면 16초이상 걸렸겠지만 내 PC에서는 아래와 같은 결과가 나왔다.

main: Total execution time of the program = 4451 ms (대략 4초)

main: Note: there are 4 CPUs on this machine ( 4코어) 



레퍼런스

스칼라 동시성 프로그래밍 - 오현석 옮김



* 이 글은 일단 발행은 하는데 오류가 있을 가능성도 있으며 수정&발전 될 것입니다. 


동시성을 위한 스칼라 Observable 

스칼라의 Observable 을 배우기 전에 여러가지 것들에 대해서 편하게 읽어보자.

Play 의 Iteratee & Enumerator 

Play 에서 Iteratee / Enumerator / Enumeratee을 간단하게 말하면 :

이름설명
Iteratee [E, A]

 Iteratee [E, A]는 함수형 프로그래밍에서 iteration 컨셉의 일반화. E 입력,A 출력 (소비자역할)

Enumerator [E]컬렉션을 일반화 한 것으로 형태 E를 열거한다. 무한 열거 (Streaming) 할 수도있다. (생산자역할)
Enumeratee [E, A]거의 사용하지 않기 때문에 지금은 생각하지 좋다.

과 같다.

먼저 Iteratee 에 대해서 알아보자. Iteratee 는 보다시피 Iterator 와 먼가 관련이 있어 보이는데 자바에서 Iterator 가 어떻게 활용되는지 살펴보면 

val l = List(1, 234, 455, 987)

var total = 0 // will contain the final total
var it = l.iterator
while( it.hasNext ) {
total += it.next
}

total
=> resXXX : Int = 1677

짧게 정리하면  (1,234,455,987).iterating( .... total += it.next ....)  이렇게 되는데 
앞에 데이터가 있고 그것을 순회하면서 어떤 행동 하게 된다. 

즉 데이터 + 순회 + 행동 으로 이루어져있는데  데이터가 정해져있다. 강결합이다. 

여기서 이것을 느슨하게 만들어 줄 방법에 대해서 생각해보면 iterator 를 일반화 하는것에 미치게 되는데 앞에 데이터가 무엇이 오건간에 순회하면서 행동을 하게 하자는 것이다. 즉 데이터는 주입받자는 것이다. 제어역전!! 

iteratee = 순회(iterating) + 행동(react) 
Enumerator  = 데이터 

Enumerator 는 정해진 콜렉션의 데이터가 아니라 비동기적으로 데이터를 생산하는 것으로 또 일반화된다.  


자바의 Observable

자바의 Observable 에 대한 정말 쉽고 자세히 설명하는 토비님의 동영상 을 먼저 참고하자
토비의 봄 스프링 Reactive programming


에릭마이어의 Duality 

위의 Play 에서 생산자 (Enumerator ) 와 소비자 (Iteratee)가 있듯이, 다른 언어,라이브러리에서도 이와 비슷한 것들이 다양한 이름으로 만들어지고 있다. 범람하고 있다는 표현이 더 어울리겠다.

데이터를 보내주는이 <---> 데이터를 받는이 

이렇게 상반되는 개념을 지칭하는 개념을 "duality"  라고도 부르는 모양이다.
더 정확한 정보는 동영상도 참고하시고..

에릭마이어가 설명하는 Duality 


Gof 의 옵저버 패턴 

이렇게 이벤트를 주고 받는 모습은 기존에 Gof 패턴에서 Observer 패턴이 비슷한 형태를 보여줬고, 예를들어 이 패턴을 설명해보면 어떤 매니져 객체가 있고 , 이 객체에 어떤 이벤트를 받길 원하는 옵저버들이 매니져 객체에 등록한다. 
매니져에서 어떤 이벤트를 감지하면 , 자신에게 등록된 옵저버들에게 이벤트를 notify 해주는게 골자이다.

GUI 프로그래밍에서 특히 당연하게 사용되고 있다. 
어떤 데이터를 다루는 다양한 View 가 있을때, 어떤 하나의 View를 통해 데이터에 변화를 사용자가 주면, 나머지 View에게도 그 변화를 전파하기 위한 설계에 사용되는 것이다. 

이미지에서 Observer 는 알림을 받는 주체이다.
Subject 는 알림을 주는 매니져이다.
Observer 는 자신을 Subject 에게 attach 를 통해 알려주고
Subject 는 어떤 이벤트가 생겼을때 Observer 들에게 update 를 해준다. 


POSA2 의 생산자-소비자 패턴 (멀티쓰레드 패턴중 하나) 

멀티쓰레드 디자인패턴의 꽃이라고 한다면 단연코 "생산자-소비자" 패턴이라고 할 수 있다. 멀티쓰레드/서버코드를 작성할때 거의 무조건 "생산자-소비자" 패턴이 사용되기 마련이며, 다른 고차원 패턴들 (예를들면 node.js 의 기반패턴인 react 패턴) 의 기반이 되면서 동시에 멀티쓰레드 코어패턴을 포함하고 있는 , 즉  "허리" 역할을 제대로 하고 있는 패턴이라고 볼수 있기 때문에 아주 중요하다고 볼 수 있다.

이것도 역시 이벤트(데이터) 를 주는 놈이 있고 받는 놈이 있는데 


보다시피 매우 단순하다. Thread 1이 생산자 Thread 2가 소비자가 된다.


Scala 에서 Observable 사용하기 

이제 스칼라에서  Observable 를 사용하는 방식에 대해서 알아보자.
주거니 받거니 (Polling 이 아니라 Push 를 통해)  하는 기본적인 매커니즘은 같다. 다만 기능이 추가되었고 비동기로 사용가능하도록 확장 되었다. 


강결합된 데이터를 이미 가지고 있는 생산자 (Observable)


object ObservablesItems extends App {
import rx.lang.scala._

val o = Observable.items("Pascal", "Java", "Scala")
o.subscribe(name => log(s"learned the $name language"))
o.subscribe(name => log(s"forgot the $name language"))

Thread.sleep(1000)
}

코드를 보면 Observable 은 어떤 데이터(이벤트) 를 가지고 있다. 생산자 역할을 한다.
소비자는 ?? 그렇다 name => log(s"learned the $name language")  이 함수리터럴이 담당한다.

(옵저버 패턴으로 비교하면, 생산자: Subject , 소비자: Observer 이며, 따라서 위의 subscribe 는 
 옵저버패턴에서는 addObserver 나 addListener 메소드와 매칭된다.

생산자(Observable) 은 자신이 보낼(Push) 할 이벤트를 처리할 친구들을 subscribe 메소드로 모집하고 있다.

* 위의 코드를 실행하려면 아래 rxjava 를 가져와야한다. 

libraryDependencies += "com.netflix.rxjava" % "rxjava-scala" % "0.19.1"


정해진 시간 이후에 알림을 주는 생산자 (Observable)

val o = Observable.timer(5.second)
o.subscribe(_ => log(s"Timeout!"))
o.subscribe(_ => log(s"Another timeout!"))

5초 이후에 로그를 찍는다.


에러가 발생했음을 알려주는 생산자 (Observable)

val o = Observable.items(1, 2) ++ Observable.error(new RuntimeException) ++ Observable.items(3, 4)
o.subscribe(
x => log(s"number $x"),
t => log(s"an error occurred: $t")
)

subscribe 는 2개의  매개변수로 받는다. 두번째는 예러 발생시 핸들링할 함수이다.
에러(예외) 이 후의 데이터 처리는 하지 않는다.

지금까지는 subscribe 메소드에 

def subscribe(onNext: T => Unit): Subscription = {
asJavaObservable.subscribe(scalaFunction1ProducingUnitToAction1(onNext))
}

onNext 에 해당하는 함수 리터럴을 넣었지만

이제 Observer 객체를 넣어 보도록하자.


Observer (소비자) 에게 데이터를 건네주는 생산자 (Observable)

val classics = List("Il buono", "Big", "Die Hard")
val o = Observable.from(classics)

o.subscribe(new Observer[String] {
override def onNext(m: String) = log(s"Movies Watchlist - $m")
override def onError(e: Throwable) = log(s"Ooops - $e!")
override def onCompleted() = log(s"No more movies.")
})

생산자(Observable) 에 강결합된 리스트를 넣어주었다.
소비자에 Observer 객체를 만들어서 넣어준다.

onNext : 생산자에서 주는 데이터(이벤트) 를 처리하는 메소드
onError : 생산자에서 발생한 예외를 처리하는 메소드
onCompleted : 생산자에서 이제 모든 데이터를 처리하였다고 알려주는 메소드 


Observable 과 Future 

val f = Future {
Thread.sleep(500)
"Back to the Future(s)"
}

val o = Observable.create[String] { obs =>
f foreach {
case s =>
obs.onNext(s)
obs.onCompleted()
}
f.failed foreach {
case t => obs.onError(t)
}
Subscription()
}

o.subscribe(log _)

- Future 를 하나 만들었다. 3초 있다가 문자열 하나를 리턴해주는 행동을 하는 ~
- Observable 에는 기존처럼 컬렉션의  강결합된 데이터가 엮여있는게 아니라 , Future 가 실행되고 그 리턴값들이 source 데이터로 활용된다.
- 그 데이터 (Future 가 리턴한 값) 를 소비자에게 전달한다. 
- 소비자는 log _   즉 로그를 찍는 함수이다 

val o = Observable.from(Future {
Thread.sleep(500)
"Back to the Future(s)"
})

o.subscribe(log _)

다음처럼 Future 로 부터 직접 생산될 수도 있다.


Observable 과 Combinator

val roles = Observable.items("The Good", "The Bad", "The Ugly")
val names = Observable.items("Clint Eastwood", "Lee Van Cleef", "Eli Wallach")
val zipped = names.zip(roles).map { case (name, role) => s"$name - $role" }

zipped.subscribe(log _)

함수형 개발의 특징 답게 두개의 생산자로 부터 데이터를 결합하여 소비자에게 전달할 수도 있다.

main: Clint Eastwood - The Good
main: Lee Van Cleef - The Bad
main: Eli Wallach - The Ugly

이렇게 출력된다.

* zip 은 2개의 컬렉션에서 하나씩 가져와서 튜플을 만든다. zipWithIndex 는 하나를 1,2 등 숫자로 매핑 
* map 은 컬렉션을 변화시켜서 새로운 컬렉션을 만든다. 그 자신을 변형시키진 않는다. 


Observable 과 Subscription

import rx.lang.scala._
import org.apache.commons.io.monitor._

def modifiedFiles(directory: String): Observable[String] = {
Observable.create { observer =>
val fileMonitor = new FileAlterationMonitor(1000)
val fileObs = new FileAlterationObserver(directory)
val fileLis = new FileAlterationListenerAdaptor {
override def onFileChange(file: java.io.File) {
observer.onNext(file.getName)
}
}
fileObs.addListener(fileLis)
fileMonitor.addObserver(fileObs)
fileMonitor.start()

Subscription { fileMonitor.stop() }
}
}

log(s"starting to monitor files")
val subscription = modifiedFiles(".").subscribe(filename => log(s"$filename modified!"))
log(s"please modify and save a file")

Thread.sleep(10000)

subscription.unsubscribe()
log(s"monitoring done")

- apache.commons 는 스칼라 사용자에게도 보물창고이다. 
- monitor 는 파일의 변화를 감지해서 우리의 생산자에게 알려준다.
- 우리의 생산자는 파일의 이름을 자연스럽게 (논블럭) 소비자에게 보내준다. 



레퍼런스:

Learning Concurrent Programming in Scala by Aleksandar Prokopec



동시성을 위한 스칼라 Collections 


스칼라도 자바5에서 등장한 자바동시성 도구들 처럼 (을 사용한) 쓰레드 안전한 콜렉션들이 있다.  대표적으로 ConcurrentHashMap. LinkedBlockingQueue 등...concurrent.TrieMap 같이 스칼라 전용으로 추가된 것들도 있다. 

대략적인 것은 안다고 가정하고 스칼라 코드를 살펴보자.

* 이런 컬렉션들은 여러 쓰레드들에 의해 공유된다. 공유되지 않는 방법으로 동시성을 처리하는 경우도 있는데 이런 경우 공유 되지 않기 때문에 락을 걸 필요가 없어진다. (액터 등)


코드로 말해요.

LinkedBlockingQueue

object CollectionsIterators extends App {
import java.util.concurrent._

val queue = new LinkedBlockingQueue[String]
for (i <- 1 to 5500) queue.offer(i.toString)
execute {
val it = queue.iterator
while (it.hasNext) log(it.next())
}
for (i <- 1 to 5500) queue.poll()
}

- 생산자 소비자 패턴에서 유용하게 사용될 수 있는 쓰레드 안전 큐이다.


ConcurrentHashMap


object CollectionsConcurrentMap extends App {
import java.util.concurrent.ConcurrentHashMap
import scala.collection.convert.decorateAsScala._

def execute(body: =>Unit) = ExecutionContext.global.execute(new Runnable {
def run() = body
})
val emails = new ConcurrentHashMap[String, List[String]]().asScala

execute {
emails("James Gosling") = List("james@javalove.com")
log(s"emails = $emails")
}

execute {
emails.putIfAbsent("Alexey Pajitnov", List("alexey@tetris.com"))
log(s"emails = $emails")
}

execute {
emails.putIfAbsent("Alexey Pajitnov", List("alexey@welltris.com"))
log(s"emails = $emails")
}

}

- execute  : 함수를 넘기면 자동으로 쓰레드풀을 사용해서 실행해주는 함수 

- 0 ~ 32 까지, 즉 32번 execute 를 실행한다. 

- execute 뒤에 나오는 { } 사이의 것은 함수 리터럴인데 다음과 같이 정의 되는 것과 같다. 

def func() = {
   
emails("James Gosling") = List("james@javalove.com") log(s"emails = $emails")
}

execute {func(i)}

- execute 뒤에 { } 는 스칼라에서는 하나의 매개변수만 있는 함수호출시 a () 대신 a {} 를 사용 할 수 도 있기 때문에 이상한게 아니다. 이전 포스트를 참고하자 : 스칼라에서 {} 익숙해지기

val emails = new ConcurrentHashMap[String, List[String]]().asScala : 자바의 것을 스칼라에서 활용 

- 여러 개의 쓰레드에서 동시에 HashMap에 리스트를 추가해도 쓰레드 안전하다. 

- putIfAbsent : 만약 해당 키에 대한 요소가 없다면 추가하라. 


다른 예 


object CollectionsConcurrentMapBulk extends App {
import scala.collection._
import scala.collection.convert.decorateAsScala._
import java.util.concurrent.ConcurrentHashMap

val names = new ConcurrentHashMap[String, Int]().asScala
names("Johnny") = 0
names("Jane") = 0
names("Jack") = 0

execute {
for (n <- 0 until 10) names(s"John $n") = n
}

execute {
for (n <- names) log(s"name: $n")
}

}

이것의 결과는 아래와 같다. 어라 요소가 많이 빠졌네? 

scala-execution-context-global-12: name: (Johnny,0)

scala-execution-context-global-12: name: (Jack,0)

scala-execution-context-global-12: name: (John 4,4)

scala-execution-context-global-12: name: (Jane,0)

scala-execution-context-global-12: name: (John 3,3)

scala-execution-context-global-12: name: (John 2,2)

scala-execution-context-global-12: name: (John 1,1)

즉 아래의 execute 에서 맵의 요소를 가져오는 타이밍에 첫번째 execute 에 0~10까지 모든 숫자가 들어가지 않았기 때문이다. 결국 쓰레드들 끼리 맵에 번갈아가면서 접근 한다는 것. 


concurrent.TrieMap

아래 경우는 어떻게 될까?


object CollectionsTrieMapBulk extends App {
import scala.collection._

def execute(body: =>Unit) = ExecutionContext.global.execute(new Runnable {
def run() = body
})

val names = new concurrent.TrieMap[String, Int]
names("Janice") = 0
names("Jackie") = 0
names("Jill") = 0

execute {
for (n <- 10 until 100) names(s"John $n") = n
}

execute {
log("snapshot time!")
for (n <- names.map(_._1).toSeq.sorted) log(s"name: $n")
}
Thread.sleep(1000* 10)

}

결과는 모든 요소들이 다 출력된다. 

이유는 ? TrieMap 경우는 다른 쓰레드의 액션을 방해 하지 않는다. 따라서 성능에 문제가 생길 소지도 있다.

적절한 곳에 사용해야한다. ( 파일 관련 처리에서 파일을 다 찾은 후에 어떤 액션을 하게 하는 경우) 


CounterDownLatch

여러 쓰레드들을 실행시켜두고 모두 종료되기만을 기다릴 때 사용. 
분산 시스템에서도 Zookeeper 등을 활용하여 이러한 전역 동기락은 자주 사용된다. 

val startLatch = new CountDownLatch(4)

class Printer(val message: String, val iterations: Int) extends Runnable {
def run(): Unit = {

for (i <- 0 until iterations)
println(message)

startLatch.countDown()
}
}

val iterations = 1

val threads = List(
new Thread(new Printer("the first message", iterations)),
new Thread(new Printer("the second message", iterations)),
new Thread(new Printer("another message", iterations)))

threads.foreach(_.start())
startLatch.await()

log ("end")

"end" 는 과연 출력 될 까요?

답은 안된다. startLatch.await() 는 startLatch.countDown() 이 4번 호출 될 때 까지 기다린다.



레퍼런스

Learning Concurrent Programming in Scala by Aleksandar Prokopec




ExecutorContext 


스칼라도 JVM 상에 돌아가는 언어 (네이티브 스칼라도 개발중이긴 하다.)라 자바의 라이브러리를 사용 할 수 도 있으며 자신의 라이브러리가 있다면 주로 그것을 사용하는 편 이다.

쓰레드 생성 관련해서도 자바에 있는 Executor 와 ExecutorService 를 사용할수도 있겠지만 스칼라에서는 기능이 보강된 자신만의 것도 있다. 이름하여 ExecutorContext!!

이번 포스트에서는  ExecutorContext 등 스칼라의 저레벨  (쓰레드를 직접 사용하는것은 원시레벨) 동시성 도구들에 대해 알아보자. 

먼저 자바의 경우 

Executor 는 그냥 Runnable 타입을 내부 쓰레드풀을 이용해서 실행시켜주는거다.기본 Executor구현으로는  ThreadPoolExecutor (JDK 5), ForkJoinPool (JDK 7) 등이 있다.

ExecutorService  는 Executor의 서브인터페이스로써 라이프싸이클 관리 (셧다운등) + Future +  Callable 을 이용해서 리턴값 받기 정도가 추가되었다.

이제 부터 스칼라식의 코딩을 살펴보자. (이 포스트는 기초적인 내용은 대략 안다고 가정하고 코드만 살펴봄) 


서론 

스칼라에 있는 ExecutionContext 가 먼지 알아보자.

1.scala.concurrent 패키지에 있는 Executor 객체와 비슷한 기능을 제공하는 ExecutionContext 는 
주로 implicit 매개 변수로 사용된다. ExecutionContext는 두개의 추상 메소드 execute (Java Executor 메소드와 동일)와 reportFailure (Throwable 객체를 이용해서 일부 태스크가 예외를 throw 할 때마다 호출 됨)가 있다.

2. ExecutionContext에는 Java Executor 또는 ExecutorService (Java와 Scala 사이의 다리 역할을 함)에서 ExecutionContext 객체를 만들기 위해 몇 가지 메소드가 추가된  companion 객체가 있다.

3. ExecutionContext companion 객체는 내부적으로 ForkJoinPool 인스턴스를 사용하는 global이라는 기본 실행 컨텍스트를 포함한다.


코드로 말해요.


기본 쓰레드 생성

val hello = new Thread(new Runnable {
def run() {
println("hello world")
}
})

hello.start()

 ExecutorService 활용 

import java.net.{Socket, ServerSocket}
import java.util.concurrent.{Executors, ExecutorService}
import java.util.Date

class NetworkService(port: Int, poolSize: Int) extends Runnable {
val serverSocket = new ServerSocket(port)
val pool: ExecutorService = Executors.newFixedThreadPool(poolSize)

def run() {
try {
while (true) {
// This will block until a connection comes in.
val socket = serverSocket.accept()
pool.execute(new Handler(socket))
}
} finally {
pool.shutdown()
}
}
}

class Handler(socket: Socket) extends Runnable {
def message = (Thread.currentThread.getName() + "\n").getBytes

def run() {
socket.getOutputStream.write(message)
socket.getOutputStream.close()
}
}

(new NetworkService(2020, 2)).run

- 서버소켓을 생성하는 예제이다.

- NetworkService 클래스에서 클라이언트를 받아서 Pool 에 있는 쓰레드를 활용하여 멀티쓰레딩 해주고있다.

- 제공되는 고정 크기 Pool (Executors.newFixedThreadPool(poolSize)) 사용했기 때문에 기존 쓰레드의 재사용이 가능하지만 개별 쓰레드에서 굉장히 오래 걸리는 처리를 한다면 사용 가능한 쓰레드가 모잘라서 클라이언트는 화가 날 것이다. 

- 핸들러에서는 메세지를 받아서 종료하는 매우 단순한 작업을 하고 있다. 물론 Handler 에서도 while 을 돈다면 서버와 계속 통신할 수 있겠다.  이런 경우는 쓰레드풀을 사용하는 의미가 없어지긴 하겠지만


 ForkJoinPool 활용 

import scala.concurrent._
val executor = new java.util.concurrent.ForkJoinPool
executor.execute(new Runnable {
def run() = log("This task is run asynchronously.")
})

Java7 부터 등장한 ForkJoinPool 을 사용하고 있다. 이것에 관해 포스팅한 자료가 있으니 참고하도록 하자. ForkJoinPool 이란? ->  http://hamait.tistory.com/612


ExecutionContext.global 활용

import scala.concurrent._
val ectx = ExecutionContext.global
ectx.execute(new Runnable {
def run() = log("Running on the execution context.")
})

-  ForkJoinPool 을 사용하고 있는 global 


ExecutionContext 생성하기 

import scala.concurrent._
val ectx = ExecutionContext.fromExecutorService(new forkjoin.ForkJoinPool)
ectx.execute(new Runnable {
def run() = log("Running on the execution context again.")
})

- ForkJoinPool 을 이용한 ExecutorService 를 통해서 ExecutionContext 를 생성


ExecutionContext 활용 

import scala.concurrent._

def execute(body: =>Unit) = ExecutionContext.global.execute(new Runnable {
def run() = body
})

for (i <- 0 until 32) execute {
Thread.sleep(2000)
log(s"Task $i completed.")
}
Thread.sleep(10000)

- execute  : 함수를 넘기면 자동으로 쓰레드풀을 사용해서 실행해주는 함수 

- 0 ~ 32 까지, 즉 32번 execute 를 실행한다. 

- execute 뒤에 나오는 { } 사이의 것은 함수 리터럴인데 다음과 같이 정의 되는 것과 같다. 

def func(i : Int) = {
Thread.sleep(2000)
log(s"Task $i completed.")
}

for (i <- 0 until 32) execute {func(i)}

- execute 뒤에 { } 는 스칼라에서는 하나의 매개변수만 있는 함수호출시 a () 대신 a {} 를 사용 할 수 도 있기 때문에 이상한게 아니다. 이전 포스트를 참고하자 : 스칼라에서 {} 익숙해지기



레퍼런스:

https://blog.knoldus.com/2016/07/25/java-executor-vs-scala-executioncontext/

Learning Concurrent Programming in Scala by Aleksandar Prokopec

https://twitter.github.io/scala_school/ko/concurrency.html  (Scala school)



{} 는 어디서 사용되는가?


이름 있는 함수


def timesTwo(i: Int): Int = {
  println("hello world")
  i * 2
}

이름 없는 함수 :  {} 를 통해 묶었다. 

scala> { i: Int => println("hello world") i * 2 }

이렇게 스칼라 코드를 읽을때 개인적으로 자바와 가장 큰 차이점 중 하나는 시도 때도 없이 나타나는 {} 이다. 

def tweets = WebSocket.acceptWithActor[String, JsValue]{ request => out =>
TwitterStreamer.props(out)
}

예를들어 여기서 { } 는 뭘까? 

object Test {
type HandlerProps = String => String
def apply(f : String => HandlerProps): Unit = {
val g = f("HELLO")
log(g("world"))
}
}


object FuturesDataType extends App {
Test{
x => y => x + y
}

}

여기 Test 에서 { } 는 뭐냔 말이다. 여기서는 apply 가 생략되고 매개변수로 { } 사이의 값이 들어간다. 

스칼라에서는 즉 { } 가 () 랑 같이 사용되기도 한다.



val l = List(1, 2, 3)
val m = l.map({_.toString})

여기서 부터 냄새가 좀 날 것이다.  ㅇㅋ 위의 map 에서 ( ) 를 생략 할 수 있다.  그렇다 스칼라는 생략 할 수 있을거 같은 것들은 죄다 생략 가능하게 만들어 놨다;;;  아래와 같이 작성 가능하다.


val m = l map{_.toString}

점도 생략했고 (대신 빈칸을 넣음) 괄호 () 를 생략했다.


val m = l.map(_.toString)

근데 이렇게도 된다.  () 랑 {} curly braces 랑 둘다 되네?  


val lst = Seq(1,2,3,4,5)
lst foreach {x => println(s"you get the idea, $x")}
lst foreach (x => println(s"you get the idea, $x"))

이렇게도 되고~

val a =  List(1, 2, 3).reduceLeft{_ + _} // 6

val b = List(1, 2, 3).reduceLeft(_ + _) // 6

이렇게도 된다.


하지만 패턴매칭의 경우에는 

val map = Map("foo" -> "bar")
map foreach { case (k, v) => println(s"key: $k, value: $v") }

이건 좋습니다요~

map foreach ( case (k, v) => println(s"key: $k, value: $v") )

하지만 패턴매칭에서는 {} 빼면 에러이다.


for comprehension 문에서도 {} 는 필수이다.

val res = for {
x <- coll1
y <- coll2
} yield (x, y)



이제 알짜배기가 나온다. 잘 보시라~

스칼라에서는 함수호출시  ()  대신 {} 중괄호를 사용 할 수 있다.  단 !!! 인재가 하나일 경우 !!

println ("Hello World") 대신해서 println { "Hello World" } 를 사용 할 수 있다는 의미이다.


스칼라에서 이렇게 한 이유는 클라이언트 프로그래머가 중괄호 내부에 함수 리터럴을 사용하도록 하기 위해서이다. 이렇게 작성한 메소드는 호출 시 좀 더 제어 추상화 구문과 비슷해진다.


def withPrintWriter(file: File, op: PrintWriter => Unit ) {

......
}


이런 함수를 호출 할 때는 

withPrintWriter ( new File ("date.txt") , 

writer => writer.println(new java.util.Date) ) 


이렇게 할 수 있는데 , 인자가 2개이기 때문에 { } 를 사용할 수 없다.


하지만 !!!!


def withPrintWriter(file: File) (op: PrintWriter => Unit ) {

...
}


이러한 함수라면 {} 를 사용가능하다.

val file = new File("date.txt")
withPrintWriter (file) {
writer => writer.println(new java.util.Date)
}



왜 그러냐면 커링을 통해서 file 을 매개변수로 먼저 넣은 새로운 함수가 생겨졌고, 

그 새로운 함수는  매개변수 하나를 필요로 하는 함수이기 때문이다.




이런것들이 온갖 문법에 스며들어 있기 때문에 편하게 스칼라를 사용하기 위해서는 처음엔 고생 좀 해야한다.

+ Recent posts