일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- Akka
- 파이썬 동시성
- 파이썬 머신러닝
- 파이썬
- 엔터프라이즈 블록체인
- 플레이프레임워크
- 스칼라
- play 강좌
- Actor
- 블록체인
- CORDA
- akka 강좌
- 파이썬 데이터분석
- Golang
- Play2 로 웹 개발
- 스칼라 동시성
- 하이브리드앱
- Adapter 패턴
- 파이썬 강좌
- 하이퍼레저 패브릭
- Hyperledger fabric gossip protocol
- Play2
- 주키퍼
- 안드로이드 웹뷰
- 그라파나
- 스칼라 강좌
- 이더리움
- hyperledger fabric
- play2 강좌
- 스위프트
- Today
- Total
HAMA 블로그
예제로 보는 아카(akka) - 5. 액터 사이의 통신및 액터 멈추기 본문
- Scala 2.11 기반
- Akka 2.4.11 기반
- Learning Concurrent Programming in Scala 참고
액터 사이의 통신
- 액터 사이에는 위치투명성을 보장 한다. 즉 동일한 방식으로 동일한 메모리 혹은 원격 액터와의 통신가능.
말하기(tell)
- 기본적인 연산이고 ! 연산자를 사용한다. 비블로킹 연산이다.
- Fire and Forget 이다. 즉 메세지를 보낸 다음에 잊어버린다. (제대로 배달된다는 보장무)
- 많아야 한 번만 배달하는것을 보장한다.
- 메세지 배달 순서를 보장한다.
import akka.actor._
import akka.event.Logging
import akka.pattern.{ask, pipe}
import akka.util.Timeout
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import mysystem._
object mysystem {
lazy val ourSystem = ActorSystem("OurExampleSystem")
}
class Pongy extends Actor {
val log = Logging(context.system, this)
def receive = {
case "ping" =>
log.info("Got a ping -- ponging back!")
sender ! "pong"
context.stop(self)
}
override def postStop() = log.info("pongy going down")
}
class Pingy extends Actor {
val log = Logging(context.system, this)
def receive = {
case pongyRef: ActorRef =>
pongyRef ! "ping"
case "pong" =>
log.info("got a pong back!")
context.stop(self)
}
override def postStop() = log.info("ping going down")
}
class Master extends Actor {
val log = Logging(context.system, this)
val pingy = ourSystem.actorOf(Props[Pingy], "pingy")
val pongy = ourSystem.actorOf(Props[Pongy], "pongy")
def receive = {
case "start" =>
pingy ! pongy
}
override def postStop() = log.info("master going down")
}
object CommunicatingAsk extends App {
val masta = ourSystem.actorOf(Props[Master], "masta")
masta ! "start"
Thread.sleep(5000)
ourSystem.terminate()
}
물어보기(ask)
- ? 연산자를 사용한다.
- 비블로킹 연산이다.
- Future 를 통해서 ack 메세지를 돌려 받는다.
import akka.actor._
import akka.event.Logging
import akka.pattern.{ask, pipe}
import akka.util.Timeout
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import mysystem._
object mysystem {
lazy val ourSystem = ActorSystem("OurExampleSystem")
}
class Pongy extends Actor {
val log = Logging(context.system, this)
def receive = {
case "ping" =>
log.info("Got a ping -- ponging back!")
sender ! "pong"
context.stop(self)
}
override def postStop() = log.info("pongy going down")
}
class Pingy extends Actor {
val log = Logging(context.system, this)
def receive = {
case pongyRef: ActorRef =>
implicit val timeout = Timeout(2 seconds)
val future = pongyRef ? "ping"
pipe(future) to sender
}
override def postStop() = log.info("Pingy going down")
}
class Master extends Actor {
val log = Logging(context.system, this)
val pingy = ourSystem.actorOf(Props[Pingy], "pingy")
val pongy = ourSystem.actorOf(Props[Pongy], "pongy")
def receive = {
case "start" =>
pingy ! pongy
case "pong" =>
log.info("got a pong back!")
context.stop(self)
}
override def postStop() = log.info("master going down")
}
object CommunicatingAsk extends App {
val masta = ourSystem.actorOf(Props[Master], "masta")
masta ! "start"
Thread.sleep(1000)
ourSystem.shutdown()
}
val future = pongyRef ? "ping"
pipe(future) to sender
val future = pongyRef ? "ping"
future onComplete { case v => log.info(s"Response : $v") }
전달하기(Router)
- 오직 메세지를 다른 액터로 넘겨주기 위해서만 사용된다.
- 예를들어 라운르 로빈방식의 로드밸런싱 같은?
- foward 메소드를 사용한다.
import akka.actor._
import akka.event.Logging
import mysystem._
object mysystem {
lazy val ourSystem = ActorSystem("OurExampleSystem")
}
class StringPrinter extends Actor {
val log = Logging(context.system, this)
def receive = {
case msg => log.info(s"child got message '$msg'")
}
override def preStart(): Unit = log.info(s"child about to start.")
override def postStop(): Unit = log.info(s"child just stopped.")
}
class Router extends Actor {
var i = 0
val children = for (_ <- 0 until 4) yield context.actorOf(Props[StringPrinter])
def receive = {
case "stop" => context.stop(self)
case msg =>
children(i) forward msg
i = (i + 1) % 4
}
}
object CommunicatingRouter extends App {
val router = ourSystem.actorOf(Props[Router], "router")
router ! "Hi."
router ! "I'm talking to you!"
Thread.sleep(1000)
router ! "stop"
Thread.sleep(1000)
ourSystem.terminate()
}
지금까지는 context.stop 을 이용해 액터를 중단시켰는데 이렇게 되면 현재 처리중인 메세지만 처리하고
액터를 중단한다. 근데 현실에선 중단 시킬때 타이밍을 다양하게 가져가고 싶을때가 있다. 다음처럼..
0. 멈추게하자. (우편함의 메세지 날라감)
context.stop (self)
1. 멈추진 말고 단순 재시작하게 하자. (우편함의 메세지 살아있음)
Kill (예 : actor ! Kill )
2. 어떤 메세지까지는 처리하고 중단하게하자. (PoisonPill 메세지전까지 모두 처리함)
PoisonPill (예 : actor ! PoisonPill )
3. 다른 액터가 끝날때까지 기다리게 하자. (액터 내부에서 기다림)
watch
class GracefulPingy extends Actor {
val pongy = context.actorOf(Props[Pongy], "pongy")
context.watch(pongy)
def receive = {
case GracefulPingy.CustomShutdown =>
context.stop(pongy)
case Terminated(`pongy`) =>
context.stop(self)
}
}
object GracefulPingy {
object CustomShutdown
}
- pongy 액터를 watch 를 통해 감시한다.
- pongy 가 postStop 메소드를 수행하고 난 후에
- GracefulPingy 는 Terminated(`pongy`) 메세지를 받게되고 그 후에 자신을 종료한다.
4. 다른 액터가 끝날때까지 기다리게 하자. (액터 외부에서 기다림)
greacefulStop
object CommunicatingGracefulStop extends App {
val grace = ourSystem.actorOf(Props[GracefulPingy], "grace")
val stopped = gracefulStop(grace, 3.seconds, GracefulPingy.CustomShutdown)
stopped onComplete {
case Success(x) =>
log("graceful shutdown successful")
ourSystem.terminate()
case Failure(t) =>
log("grace not stopped!")
ourSystem.terminate()
}
}
- 보통 애플리케이션의 주 쓰레드에서 사용한다.
- gracefulStop 에 인자로 grace 라는 액터 , 타임아웃, 종료 메세지 넣는다.
- stopped 라는 퓨처를 통해 시간 안에 종료되면 Success(x) 가 호출되고
- 시간안에 종료되지 않으면 Failure(t) 가 호출된다.
'Akka' 카테고리의 다른 글
예제로 보는 아카(akka) - 7. 상태머신 ( 상태에 따른 행동변화 ) (0) | 2016.10.06 |
---|---|
예제로 보는 아카(akka) - 6. 액터 관리 (고장/예외 처리하기) (0) | 2016.10.05 |
예제로 보는 아카(akka) - 4. 액터 생명 주기 (0) | 2016.10.05 |
예제로 보는 아카(akka) - 3. 부모,자식 액터 (0) | 2016.10.05 |
예제로 보는 아카(akka) - 2. 생성 (Hello Akka) 및 메세지 수신 (0) | 2016.10.04 |