관리 메뉴

HAMA 블로그

예제로 보는 아카(akka) - 5. 액터 사이의 통신및 액터 멈추기 본문

Akka

예제로 보는 아카(akka) - 5. 액터 사이의 통신및 액터 멈추기

[하마] 이승현 (wowlsh93@gmail.com) 2016. 10. 5. 18:03

- Scala 2.11 기반 

- Akka 2.4.11 기반 

Learning Concurrent Programming in Scala 참고 


액터 사이의 통신 

- 액터 사이에는 위치투명성을 보장 한다. 즉 동일한 방식으로 동일한 메모리 혹은 원격 액터와의 통신가능.

말하기(tell)

- 기본적인 연산이고  ! 연산자를 사용한다. 비블로킹 연산이다. 
- Fire and Forget 이다. 즉 메세지를 보낸 다음에 잊어버린다. (제대로 배달된다는 보장무)
- 많아야 한 번만 배달하는것을 보장한다.
- 메세지 배달 순서를 보장한다. 


import akka.actor._
import akka.event.Logging
import akka.pattern.{ask, pipe}
import akka.util.Timeout
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import mysystem._


object mysystem {
lazy val ourSystem = ActorSystem("OurExampleSystem")
}


class Pongy extends Actor {
val log = Logging(context.system, this)
def receive = {
case "ping" =>
log.info("Got a ping -- ponging back!")
sender ! "pong"
context.stop(self)
}
override def postStop() = log.info("pongy going down")
}


class Pingy extends Actor {
val log = Logging(context.system, this)
def receive = {
case pongyRef: ActorRef =>
pongyRef ! "ping"
case "pong" =>
log.info("got a pong back!")
context.stop(self)
}
override def postStop() = log.info("ping going down")
}


class Master extends Actor {
val log = Logging(context.system, this)
val pingy = ourSystem.actorOf(Props[Pingy], "pingy")
val pongy = ourSystem.actorOf(Props[Pongy], "pongy")

def receive = {
case "start" =>
pingy ! pongy
}
override def postStop() = log.info("master going down")
}


object CommunicatingAsk extends App {
val masta = ourSystem.actorOf(Props[Master], "masta")
masta ! "start"
Thread.sleep(5000)
ourSystem.terminate()
}

-  Pingy 는 Pongy 에게 "ping" 메세지 전달하고
-  Pongy 는 메세지를 받아서 다시 보낸 액터에게 돌려주고 

물어보기(ask)

- ? 연산자를 사용한다. 
- 비블로킹 연산이다.
- Future 를 통해서 ack  메세지를 돌려 받는다. 


import akka.actor._
import akka.event.Logging
import akka.pattern.{ask, pipe}
import akka.util.Timeout
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import mysystem._


object mysystem {
lazy val ourSystem = ActorSystem("OurExampleSystem")
}



class Pongy extends Actor {
val log = Logging(context.system, this)
def receive = {
case "ping" =>
log.info("Got a ping -- ponging back!")
sender ! "pong"
context.stop(self)
}
override def postStop() = log.info("pongy going down")
}


class Pingy extends Actor {
val log = Logging(context.system, this)
def receive = {
case pongyRef: ActorRef =>
implicit val timeout = Timeout(2 seconds)
val future = pongyRef ? "ping"
pipe(future) to sender
}
override def postStop() = log.info("Pingy going down")
}


class Master extends Actor {
val log = Logging(context.system, this)
val pingy = ourSystem.actorOf(Props[Pingy], "pingy")
val pongy = ourSystem.actorOf(Props[Pongy], "pongy")
def receive = {
case "start" =>
pingy ! pongy
case "pong" =>
log.info("got a pong back!")
context.stop(self)
}
override def postStop() = log.info("master going down")
}


object CommunicatingAsk extends App {
val masta = ourSystem.actorOf(Props[Master], "masta")
masta ! "start"
Thread.sleep(1000)
ourSystem.shutdown()
}

-  말하기와 거의 똑같다. 다른점을 살펴보자
-  먼저 Pingy 는 tell 대신해서 ask (?) 로 메세지를 전달한후에 Future 객체를 받고 리턴된다.
-  Future 객체는 인자로 들어온 시간 내에 Pongy 로 부터 메세지를 전달 받으면 , Master 에게 
   메세지를 전달해준다. 
  val future = pongyRef ? "ping"
pipe(future) to sender
를 대신해서

val future = pongyRef ? "ping"

future onComplete { case v => log.info(s"Response : $v") }

이렇게도 가능하나 , 그에 따른 비동기 계산 안에서 변경 가능한 액터의 상태를 엑세스 해서는 절대 안된다.
액터 상태는 해당 액터 자신에게만 보여야 하기 때문에, 액터의 상태에 동시에 접근하면 데이터 경합과 경합 상태를 발생 시킬 가능성이 생긴다. log 객체는 오직 그 객체를 소유한 액터에서만 접근해야한다. 

마찬가지로 onComplete  핸들러 안에서 sender 메소드를 호출해서도 안된다. 응답 메세지를 가지고 퓨처가 완료된 시점을 생각해보면 이 액터는 다른 송신자가 보낸 다른 메세지를 처리하고 있을 것이기 때문이다. 

전달하기(Router)

- 오직 메세지를 다른 액터로 넘겨주기 위해서만 사용된다.
- 예를들어 라운르 로빈방식의 로드밸런싱 같은?
- foward 메소드를 사용한다.  


import akka.actor._
import akka.event.Logging
import mysystem._


object mysystem {
lazy val ourSystem = ActorSystem("OurExampleSystem")
}


class StringPrinter extends Actor {
val log = Logging(context.system, this)
def receive = {
case msg => log.info(s"child got message '$msg'")
}
override def preStart(): Unit = log.info(s"child about to start.")
override def postStop(): Unit = log.info(s"child just stopped.")
}


class Router extends Actor {
var i = 0
val children = for (_ <- 0 until 4) yield context.actorOf(Props[StringPrinter])
def receive = {
case "stop" => context.stop(self)
case msg =>
children(i) forward msg
i = (i + 1) % 4
}
}


object CommunicatingRouter extends App {
val router = ourSystem.actorOf(Props[Router], "router")
router ! "Hi."
router ! "I'm talking to you!"
Thread.sleep(1000)
router ! "stop"
Thread.sleep(1000)
ourSystem.terminate()
}

- Router 액터는 자식 액터에게 균등하게 msg 를 포워딩하고 있다. 


액터 멈추기

지금까지는 context.stop 을 이용해 액터를 중단시켰는데 이렇게 되면 현재 처리중인 메세지만 처리하고 
액터를 중단한다. 근데 현실에선 중단 시킬때 타이밍을 다양하게 가져가고 싶을때가 있다. 다음처럼..

0. 멈추게하자. (우편함의 메세지 날라감) 

context.stop (self) 

1. 멈추진 말고 단순 재시작하게 하자.  (우편함의 메세지 살아있음)

Kill      (예 :  actor ! Kill ) 

2. 어떤 메세지까지는 처리하고 중단하게하자. (PoisonPill 메세지전까지 모두 처리함) 

PoisonPill  (예 :  actor ! PoisonPill  

3. 다른 액터가 끝날때까지 기다리게 하자. (액터 내부에서 기다림) 

watch

class GracefulPingy extends Actor {
val pongy = context.actorOf(Props[Pongy], "pongy")
context.watch(pongy)

def receive = {
case GracefulPingy.CustomShutdown =>
context.stop(pongy)
case Terminated(`pongy`) =>
context.stop(self)
}
}


object GracefulPingy {
object CustomShutdown
}

- pongy 액터를 watch 를 통해 감시한다.
- pongy 가 postStop 메소드를 수행하고 난 후에
- GracefulPingy 는 Terminated(`pongy`) 메세지를 받게되고 그 후에 자신을 종료한다. 


4. 다른 액터가 끝날때까지 기다리게 하자. (액터 외부에서 기다림) 

greacefulStop 

object CommunicatingGracefulStop extends App {
val grace = ourSystem.actorOf(Props[GracefulPingy], "grace")
val stopped = gracefulStop(grace, 3.seconds, GracefulPingy.CustomShutdown)
stopped onComplete {
case Success(x) =>
log("graceful shutdown successful")
ourSystem.terminate()
case Failure(t) =>
log("grace not stopped!")
ourSystem.terminate()
}
}

- 보통 애플리케이션의  주 쓰레드에서 사용한다.
- gracefulStop 에 인자로 grace 라는 액터 , 타임아웃, 종료 메세지 넣는다.
- stopped 라는 퓨처를 통해 시간 안에 종료되면 Success(x) 가 호출되고
- 시간안에 종료되지 않으면 Failure(t) 가 호출된다. 

Comments