'play 강좌'에 해당하는 글 2건


  Iteratee Enumerator / Enumeratee 란?


번역  :  http://qiita.com/sunny4381/items/a711fa72db26c9263b3f

Play Framework의 Iteratee / Enumerator / Enumeratee 는 공식 문서를 읽어도 잘 모르고, 또한 수학적 설명이 적혀 있기도 해서 불필요하게 이해하기 어렵다.  따라서  몇 가지 예를 통해 이해를 하려합니다.

또한, Understanding Play2 Iteratees for Normal Humans (한글번역)
를 참고하며 Play Framework 2.1.0, Scala 2.10을 대상으로하고 있습니다.  

Iteratee / Enumerator / Enumeratee을 간단하게 말하면 :

이름설명
Iteratee [E, A]

루프의 내용. 형태 E에서 형태 A를 생성한다.  (소비자역할)

Enumerator [E]컬렉션을 일반화 한 것으로 형태 E를 열거한다. 무한 열거 (Streaming) 할 수도있다. (생산자역할)
Enumeratee [E, A]

많이 사용하지 않기 때문에 지금은 생각하지 좋다.

Iterator와 Iteratee의 차이

Scala는 Iterator [E]라는 친숙한 인터페이스가 있습니다. 
Iterator [E]와 Iteratee [E, A]의 차이는 Iterator [E]는 어떤 컬렉션에서 생성되는 반면 Iteratee [E, A]는 순수 함수형 프로그래밍에서 iteration 컨셉의 일반화 입니다. 컬렉션 요소들과 독립하며 불변 ( immutable)에서 논블럭 비동기 처리하에 입력 형식 E출력 형식 A가 정적으로 정의되어 있다는 특징 외에도 결과가 필요할 때까지 아무런 실행되지 않는다는 특징이 있습니다.

먼저 iterator 를 상기시켜 봅시다.

1
2
3
4
5
6
7
8
9
10
val l = List(1, 234, 455, 987)

var total = 0 // will contain the final total
var it = l.iterator
while( it.hasNext ) {
  total += it.next
}

total
=> resXXX : Int = 1677

예 1 : Int 형 요소와 Enumerator [Int]과 그 합계를 계산하는 Iteratee [Int, Int]

val initialValue : Int = 0 val sumIteratee : Iteratee [ Int , Int ] = Iteratee . fold ( initialValue ) { ( total , e ) => total + e } val intEnumerator1 : Enumerator [ Int ] = Enumerator ( 1 , 234 , 455 , 987 ) val futureTotal1 : Future [ Int ] = intEnumerator1 . run ( sumIteratee ) val total1 : Int = Await . result ( futureTotal1 , Duration . Inf ) println ( "total =" + total1 . toString ) val intEnumerator2 : Enumerator [ Int ] = Enumerator ( 431 , 57 , 668 ) val futureTotal2 : Future [ Int ] = intEnumerator2 . run ( sumIteratee ) val total2 : Int = Await . result ( futureTotal2 , Duration . Inf ) println ( "total =" + total2 . toString )

다음과 같은 결과가 출력됩니다.

total = 1677
total = 1156

Iteratee [E, A] 는 컬렉션과 독립하고 있기 때문에, intEnumerator1과 intEnumerator2 라는 각각 다른 컬렉션에 대해 수행 할 수 있으며 Iteratee [E, A] 는 비동기적으로 작업을 수행하기 위해 Future [Int ]를 리턴받지요.

Future [Int]에서 결과를 꺼내기 위해서 일부러 Await.result () 메소드를 사용하지 않으면 안되기 때문에 조금 귀찮은 느낌일 것입니다. 저도 그렇게 생각합니다.

예 2 : 무한 Enumerator (Streaming Enumerator)

Enumerator는 무한히 열거 할 수 있습니다.

// 500 밀리 초마다 문자열 "current time % s"를 생성하고 계속 Enumerator val stringGenerator : Enumerator [ String ] = Enumerator . generateM ( play . api . libs . concurrent . Promise . timeout ( Some ( "current time % s " . format (( new java . util . Date ()))) 500 ) ) // 콘솔에 출력하는 Iteratee val printIteratee : Iteratee [ String , Unit ] = Iteratee . foreach { println _ } // 5 초 기다리고 그 동안 stringGenerator가 생성한 요소 각각에 대해 printIteratee를 실행하는 val future : Future [ Unit ] = stringGenerator . run ( printIteratee ) Await . result ( future , Duration ( 5 , "seconds" )

다음과 같은 결과가 5초동안 출력됩니다.

current time Tue Feb 26 11:55:58 JST 2013
current time Tue Feb 26 11:55:58 JST 2013
current time Tue Feb 26 11:55:59 JST 2013
current time Tue Feb 26 11:55:59 JST 2013
current time Tue Feb 26 11:56:00 JST 2013
current time Tue Feb 26 11:56:00 JST 2013
current time Tue Feb 26 11:56:01 JST 2013
current time Tue Feb 26 11:56:01 JST 2013
current time Tue Feb 26 11:56:02 JST 2013
current time Tue Feb 26 11:56:02 JST 2013

예 3 : Iteratee [E, A]와 Future [Iteratee [E, A]]

Iteratee [E, A]에서 Future [Iteratee [E, A]]로 변환하고 반대로 Future [Iteratee [E, A]]에서 Iteratee [E, A]로 변환 할 수 있습니다. 양측은 교환 법칙이 성립합니다.

val  initialValue :  Int  =  0 
val  sumIterator :  Iteratee [ Int , Int ]  =  Iteratee . fold ( initialValue )  {  ( total ,  e )  =>  total  +  e  } 
val  futureSumIterator :  Future [ Iteratee [ Int , Int ]]  =  sumIterator . unflatten . map ( _ . it ) 
val  sumIteratorAgain :  Iteratee [ Int , Int ]  =  Iteratee . flatten ( futureSumIterator )

예 4 : Iteratee [E, A] .feed와 Input

Iteratee[E, A].feed() 메소드를 사용하면 Enumerator [E]를 사용하지 않고 요소를 하나씩 Iteratee [E, A]에 전달 할 수 있습니다. Iteratee[E, A].feed() 메서드는 요소를 일반화 한 Input 클래스의 서브클래스인 El, Empty, EOF 중 하나를 제공합니다.  다음은 Input 3 개의 서브 El, Empty, EOF의 개요를 보여줍니다.

이름설명
Input.El [E]형태 E 요소를 나타냅니다.
Input.Empty빈 요소를 나타냅니다.
Input.EOF반복을 종료합니다.

다음의 예는 Enumerator를 사용하지 않고, Iteratee[E, A].feed() 메소드를 사용하여 요소 열 1, 234, 455, 987을 처음부터 순서대로 하나씩줍니다.

val initialValue : Int = 0 var sumIteratee : Iteratee [ Int , Int ] = Iteratee . fold ( initialValue ) { ( total , e ) => total + e } var futureSumIterator : Future [ Iteratee [ Int , Int ] = null // 1을 Iteratee에 공급하는 futureSumIterator = sumIteratee . feed ( Input . El ( 1 )) sumIteratee = Iteratee . flatten ( futureSumIterator ) futureSumIterator = sumIteratee . feed ( Input . El ( 234 )) sumIteratee = Iteratee . flatten ( futureSumIterator ) futureSumIterator = sumIteratee . feed ( Input . El ( 455 )) sumIteratee = Iteratee . flatten ( futureSumIterator ) futureSumIterator = sumIteratee . feed ( Input . El ( 987 )) sumIteratee = Iteratee . flatten ( futureSumIterator ) // 합계를 계산하는 val futureTotal : Future [ Int ] = sumIteratee . run val total : Int = Await . result ( futureTotal , Duration . Inf ) println ( "0 + 1 + 234 + 455 + 987 =" + total . toString )

다음과 같은 결과가 출력됩니다.

0 + 1 + 234 + 455 + 987 = 1677

만약 당신이 Future 사랑한다면

Future [Iteratee [E, A]]를 Iteratee [E, A] 로 일부러 변환없이 Future [Iteratee [E, A]] 그대로 동일한 작업을 수행 할 수 있습니다.

val initialValue : Int = 0 var sumIteratee : Iteratee [ Int , Int ] = Iteratee . fold ( initialValue ) { ( total , e ) => total + e } var futureSumIteratee : Future [ Iteratee [ Int , Int ] = null futureSumIteratee = sumIteratee . feed ( Input . El ( 1 )) futureSumIteratee = futureSumIteratee . flatMap ( _ . feed ( Input . El ( 234 ))) futureSumIteratee = futureSumIteratee . flatMap ( _ . feed ( Input . El ( 455 ))) futureSumIteratee = futureSumIteratee . flatMap ( _ . feed ( Input . El ( 987 ))) val totalFuture : Future [ Int ] = futureSumIteratee . flatMap ( _ . run ) val total = Await . result ( totalFuture , Duration . Inf ) println ( "0 + 1 + 234 + 455 + 987 =" + total . toString )

예 5 : Iteratee [E, A]의 불변성 (Immutability)

Iteratee [E, A]는 인스턴스화 된 후 불변의 내부 상태를 변화시키지 않습니다. 

val initialValue : Int = 0 val sumIteratee0 : Iteratee [ Int , Int ] = Iteratee . fold ( initialValue ) { ( total , e ) => total + e } val sumIteratee1 : Iteratee [ Int , Int ] = Iteratee . flatten ( sumIteratee0 . feed ( Input . El ( 1 )))

val sumIteratee2 : Iteratee [ Int , Int ] = Iteratee . flatten ( sumIteratee1 . feed ( Input . El ( 234 )))

val sumIteratee2_1 : Iteratee [ Int , Int ] = Iteratee . flatten ( sumIteratee2 . feed ( Input . El ( 583 )))

val sumIteratee2_2 : Iteratee [ Int , Int ] = Iteratee . flatten ( sumIteratee2 . feed ( Input . El ( 162 ))) println ( "0 =" + Await . result ( sumIteratee0 . run , Duration . Inf )) println ( "0 + 1 =" + Await . result ( sumIteratee1 . run , Duration . Inf )) println ( "0 + 1 + 234 = " + Await . result ( sumIteratee2 . run , Duration . Inf )) println ( "0 + 1 + 234 + 583 = " + Await . result ( sumIteratee2_1 . run , Duration . Inf )) println ( "0 + 1 + 234 + 162 = " + Await . result ( sumIteratee2_2 . run , Duration . Inf ))

다음과 같은 결과가 출력됩니다.

0 = 0
0 + 1 = 1
0 + 1 + 234 = 235
0 + 1 + 234 + 583 = 818
0 + 1 + 234 + 162 = 397

예 6 : Iteratee [E, A]는 결과가 필요할 때까지 아무런 실행되지 않는다

Iteratee [E, A]는 정말 결과가 필요할 때까지 아무것도 실행되지 않습니다.

val  initialValue :  Int  =  0 
val  sumIteratee0 :  Iteratee [ Int , Int ]  =  Iteratee . fold ( initialValue )  {  ( total ,  e )  =>  { 
    println ( "e ="  +  e . toString ) 
    total  +  e 
  } 
} 
// feed 한 것만으로는 계산되지 않는 
val  sumIteratee1  =  Iteratee . flatten ( sumIteratee0 . feed ( Input . El ( 1 ))) 
val  sumIteratee2  =  Iteratee . flatten ( sumIteratee1 . feed ( Input . El ( 234 ))) 
val  sumIteratee3  =  Iteratee . flatten ( sumIteratee2 . feed ( Input . El ( 455 ))) 
val  sumIteratee4  =  Iteratee . flatten ( sumIteratee3 . feed ( Input . El ( 987 ))) 

// 아직 계산되지 않는 
val  futureTotal :  Future [ Int ]  =  sumIteratee4 . run

println ( "caclulate result" ) 
val  total  =  Await . result ( futureTotal ,  Duration . Inf ) 
println ( "0 + 1 + 234 + 455 + 987 ="  +  total . toString )

다음과 같은 결과가 출력됩니다.

caclulate result
e = 1
e = 234
e = 455
e = 987
0 + 1 + 234 + 455 + 987 = 1677

예 7 : Input.Empty과 Input.EOF

Input.Empty을 Iteratee [E, A] 에 부여하면 무시됩니다. 
Input.EOF을 Iteratee [E, A]에 공급하면 반복을 중지합니다.

val  initialValue :  Int  =  0 
val  sumIteratee0 :  Iteratee [ Int , Int ]  =  Iteratee . fold ( initialValue )  {  ( total ,  e )  =>  total  +  e  }

val  sumIteratee1  =  Iteratee . flatten ( sumIteratee0 . feed ( Input . El ( 1 ))) 
val  sumIteratee2  =  Iteratee . flatten ( sumIteratee1 . feed ( Input . Empty )) 
val  sumIteratee3  =  Iteratee . flatten ( sumIteratee2 . feed ( Input . El ( 234 ))) 
val  sumIteratee4  =  Iteratee . flatten ( sumIteratee3 . feed ( Input . EOF )) 
val  sumIteratee5  =  Iteratee . flatten ( sumIteratee4 . feed ( Input . El ( 455 ))) 
val  sumIteratee6  =  Iteratee . flatten ( sumIteratee5 . feed ( Input . El ( 987 ))) 

// Input.Empty를 feed 후 feed 값은 계산되는 
println ( "0 + 1 + 234 ="  +  Await . result ( sumIteratee3 . run ,  Duration . Inf )) 

// Input.EOF를 feed 한 이후에 feed 값은 계산되지 않는 
println ( "0 + 1 + 234 + 455 ="  +  Await . result ( sumIteratee5 . run ,  Duration . Inf )) 
println ( "0 + 1 + 234 + 455 + 987 = "  +  Await . result ( sumIteratee6 . run ,  Duration . Inf ))

이 프로그램을 실행하면 다음과 같은 결과가 출력됩니다.

0 + 1 + 234 = 235
0 + 1 + 234 + 455 = 235
0 + 1 + 234 + 455 + 987 = 235

결과에서 알 수 있듯이, Input.EOF을 Iteratee [E, A에 미치는 반복을 멈추기 때문에 Input.EOF을 준 후 Input.El (455)과 Input.El (987)는 주어서 총합은 변화하지 않습니다. 또한 Input.EOF을 공급하면 반복을 중지하는 것을 확인할 수 있습니다.


WRITTEN BY
[前草] 이승현 (wowlsh93@gmail.com)
스타코프 (데이터지능플랫폼pd) (관심분야: 에너지IoT, 시계열(NILM) 데이터, 폴리글랏 프로그래밍 )

트랙백  0 , 댓글이 없습니다.
secret

WebSockets


WebSockets 는 웹 브라우저에서 양방향 통신을 가능케 하는 프로토콜 기반으로 사용 될 수 있습니다. (역주: 서버쪽에서 웹브라우저쪽으로도 메세지를 보낸다는 뜻이죠. 기존에는 브라우저에서 서버쪽으로 요청하는 폴링을 주로 사용 했었음) 클라이언트는 메세지를 보낼 수 있으며 서버는 언제라도 메세지를 받을 수 있습니다. 물론 그들 사이에 WebSocket 연결이 액티브 상태일 동안 말이죠. 

현재 HTML5 구현이 된 웹 브라우저들은 대부분 자바스크립트 웹소켓 API 를 통한 WebSockets를 지원 하고 있습니다. 그러나 웹소켓이 오직 웹 브라우저에서만 그 의미를 갖는 것은 아닙니다. 많은 웹소켓 클라이언트 라이브러리들을 활용 할 수 있으며 예를들어 서버가 다른 서버와 통신하기 위해서도 사용 할 수 있습니다네이티브 모바일 앱과도 말이죠. WebSockets 을 여기서 사용하는것은 플레이 서버가 사용하는 기존 TCP 포트를 재사용할 수 있는 이득을 가져다 줍니다. 

WebSockets 조작

지금까지 우리는 Action 인스턴스를 표준 HTTP 요청과 응답을 위해서 사용 하였습니다. 
WebSockets 은 전체적으로 다르게 사용되며 표준 Action 을 경유해서 조작되지 않습니다.

플레이는 두가지 다른 방식으로 웹 소켓을 지원하는데 첫번째는 액터를 사용하는것이고 두번째는 iteratees 를 사용하는 것입니다. 양쪽 모두WebSocket 에서 제공된 빌더를 사용합니다.

1. WebSockets 을 액터와 함께 조작하기 


WebSocket 의 acceptWithActor 메소드를 사용하여 웹소켓 통신 할 수 있는 환경을 만듭니다. 사용자 정의 액터에  props 의 매개변수로 전달 된 out 즉 외부에 메세지를 전송 할 수 있는 연결 통로를 제공합니다.

import play.api.mvc._
import play.api.Play.current

def socket = WebSocket.acceptWithActor[String, String] { request => out =>
  MyWebSocketActor.props(out)
}

acceptWithActor 메소드의 내부는 아래와 같습니다. 궁금하실까봐..복잡합니다. @@

def acceptWithActor[In, Out](f: RequestHeader => HandlerProps)(implicit in: FrameFormatter[In],
out: FrameFormatter[Out], app: Application, outMessageType: ClassTag[Out]): WebSocket[In, Out] = {
tryAcceptWithActor { req =>
Future.successful(Right((actorRef) => f(req)(actorRef)))
}
}


MyWebSocketActor 의 구현은 다음과 같습니다 : 

import akka.actor._

object MyWebSocketActor {
  def props(out: ActorRef) = Props(new MyWebSocketActor(out))
}

class MyWebSocketActor(out: ActorRef) extends Actor {
  def receive = {
    case msg: String =>
      out ! ("I received your message: " + msg)
  }
}

클라이언트에서 보낸 메세지는 사용자 정의 액터에게 보내 질 것입니다. 그리고 다시 메세지를 돌려 줄 수 있습니다. 위의 액터는 메시지를받은 상태에서 클라이언트에서받은 모든 메시지를 간단히 보냅니다.


2. WebSockets 을 iteratees 로 조작하기

액터는 분리 된 메시지를 처리하기위한 더 나은 추상화이지만, iteratees 는 스트림 처리 에있어 더 좋은 추상화입니다. WebSocket 요청을 처리하려면 Action 대신 WebSocket을 사용합니다.

import play.api.mvc._
import play.api.libs.iteratee._
import play.api.libs.concurrent.Execution.Implicits.defaultContext

def socket = WebSocket.using[String] { request =>
  // Log events to the console
  val in = Iteratee.foreach[String](println).map { _ =>
    println("Disconnected")
  }

  // Send a single 'Hello!' message
  val out = Enumerator("Hello!")

  (in, out)
}

WebSocket은 요청 헤더 (WebSocket 연결을 시작하는 HTTP 요청)에 액세스 할 수 있으므로 표준 헤더와 세션 데이터를 검색 할 수 있습니다. 그러나 요청 본문이나 HTTP 응답에는 액세스 할 수 없습니다. 이 방법으로 WebSocket을 구성 할 때 우리는 in과 out 채널을 모두 반환해야합니다.

 - in 채널은 각 메시지에 대해 통지 될 Iteratee [A, Unit] (여기서 A는 메시지 유형 - 여기서 우리는 String을 사용함)이며 소켓이 클라이언트 측에서 닫힐 때 EOF를 수신합니다.

- out 채널은 웹 클라이언트에 보낼 메시지를 생성하는 열거 자 [A]입니다. EOF를 전송하여 서버 측의 연결을 닫을 수 있습니다.

이 예제에서는 각 메시지를 콘솔에 출력하는 간단한 iteratee를 생성합니다. 메시지를 보내려면 간단한 더미 enumerator 를 만들어 하나의 Hello! 메시지를 전송하세요.

Tip: WebSockets 은  https://www.websocket.org/echo.html. 여기서 테스트 가능합니다.  위치는 그냥 ws://localhost:9000. 로 설정하시구요.


Hello!를 전송 한 직후 입력 데이터를 버리고 소켓을 닫는 다른 예제를 작성해 보겠습니다. 

import play.api.mvc._
import play.api.libs.iteratee._

def socket = WebSocket.using[String] { request =>

  // Just ignore the input
  val in = Iteratee.ignore[String]

  // Send a single 'Hello!' message and close
  val out = Enumerator("Hello!").andThen(Enumerator.eof)

  (in, out)
}


다음은 입력 데이터가 표준 출력에 기록되고 Concurrent.broadcast를 사용하여 클라이언트에 브로드 캐스팅되는 다른 예입니다.

import play.api.mvc._
import play.api.libs.iteratee._
import play.api.libs.concurrent.Execution.Implicits.defaultContext

def socket =  WebSocket.using[String] { request =>

  // Concurrent.broadcast returns (Enumerator, Concurrent.Channel)
  val (out, channel) = Concurrent.broadcast[String]

  // log the message to stdout and send response back to client
  val in = Iteratee.foreach[String] {
    msg =>
      println(msg)
      // the Enumerator returned by Concurrent.broadcast subscribes to the channel and will
      // receive the pushed messages
      channel push("I received your message: " + msg)
  }
  (in,out)
}


'PlayFramework2' 카테고리의 다른 글

[Play2] Iteratee & Enumerators 간단 정리  (0) 2017.02.18
[Play2] ScalaAsync (번역)  (0) 2016.10.14
[Play2] WebSockets  (0) 2016.10.13
[Play2] 외부의 원격액터와 통신  (0) 2016.10.11
[Play2] WS API (번역)  (0) 2016.10.11
[Play2] 마이크로서비스 (microservices)  (0) 2016.10.11

WRITTEN BY
[前草] 이승현 (wowlsh93@gmail.com)
스타코프 (데이터지능플랫폼pd) (관심분야: 에너지IoT, 시계열(NILM) 데이터, 폴리글랏 프로그래밍 )

트랙백  0 , 댓글이 없습니다.
secret