일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
- 스위프트
- hyperledger fabric
- 플레이프레임워크
- 파이썬 동시성
- Actor
- Golang
- 스칼라
- 스칼라 동시성
- Hyperledger fabric gossip protocol
- 블록체인
- CORDA
- Akka
- play 강좌
- 하이퍼레저 패브릭
- Play2 로 웹 개발
- 주키퍼
- 그라파나
- 파이썬 머신러닝
- 이더리움
- akka 강좌
- 파이썬 데이터분석
- 엔터프라이즈 블록체인
- Play2
- 하이브리드앱
- Adapter 패턴
- 안드로이드 웹뷰
- 파이썬 강좌
- play2 강좌
- 스칼라 강좌
- 파이썬
- Today
- Total
HAMA 블로그
예제로 보는 아카(akka) - 6. 액터 관리 (고장/예외 처리하기) 본문
- Scala 2.11 기반
- Akka 2.4.11 기반
- Learning Concurrent Programming in Scala 참고
액터 관리 (고장/예외 처리하기)
모든 액터는 자신의 자식에 대한 관리자 (Supervisor) 역할을 수행한다.
어떤 자식 액터가 어떤 이유로 실패했을때, 부모에게 어떻게 처리 할 지 요청한다.
그때 부모가 처리하는 방법을 관리 정책이라 하면 다음과 같이 이루어진다.
재시작한다. (Restart)
별 거 아니면 무시하고 계속 실행한다. (Resume)
액터를 영구히 중단시킨다. (Stop)
더 상위 액터에게 책임을 넘긴다. (Escalate)
심플 예제 )
class Naughty extends Actor {
val log = Logging(context.system, this)
def receive = {
case s: String => log.info(s)
case msg => throw new RuntimeException
}
override def postRestart(t: Throwable) = log.info("naughty restarted")
}
class Supervisor extends Actor {
val child = context.actorOf(Props[Naughty], "victim")
def receive = PartialFunction.empty
override val supervisorStrategy =
OneForOneStrategy() {
case ake: ActorKilledException => Restart
case _ => Escalate
}
}
object SupervisionKill extends App {
val s = ourSystem.actorOf(Props[Supervisor], "super")
ourSystem.actorSelection("/user/super/*") ! Kill
ourSystem.actorSelection("/user/super/*") ! "sorry about that"
ourSystem.actorSelection("/user/super/*") ! "kaboom".toList
Thread.sleep(1000)
ourSystem.stop(s)
Thread.sleep(1000)
ourSystem.terminate()
}
- Supervisor 관리액터는 아무 메세지를 처리하지 않고 하위 액터가 오류가 생겼을때 전략을 세운다.
- 여기선 OneForOneStrategy 라고 하나의 액터에 대해서 처리를 정의한다. AllForOneStarategy 는 모두
에게 연대책임을 묻는다. (즉, 말썽을 부린 액터의 형제/자매 액터들에게) 한꺼번에 적용시킬 수도 있다.
- Supervisor 관리액터는 자식 액터에서 ActorKilledException 예외가 발생하면 Restart 로 재시작시킨다.
( 자식액터에게 ! Kill 메세지를 보내면 자식 액터에서 ActorKilledException이 발생한다 )
- Supervisor 관리액터는 자식 액터에서 기타 예외가 발생하면 자신의 부모 액터에게 책임을 넘긴다.
- 자식인 Naughty 액터는 String 만 처리할 수 있는데 그 외의 메세지가 오면 RuntimeException 발생시킨다.
분산 다운로드 예제 )
관리 소스만 보면 알거 같다. Supervisor 액터가 여러개의 worker 액터를 만들어서 각 액터에게 파일을 다운로드 받아서 파일로 저장하게 하는 예제이다. Supervisor 액터가 하는일을 정리하면
- 일을 워커에서 지시
- worker 의 갯수에 비해 일거리가 많으면 일거리를 큐에 저장
- 놀고 있는 worker 큐에 저장
- 일하고 있는 worker 일과 워커를 매핑해서 저장
- worker 액터에 문제가 생기면 확인해서 다시 시작시킴. 이 예제에선 별 문제 아니라서 Resume 이용.
import akka.actor._
import akka.event.Logging
import akka.actor.SupervisorStrategy._
import org.apache.commons.io.FileUtils
import scala.io.Source
import scala.collection._
import scala.concurrent.duration._
import mysystem._
object mysystem {
lazy val ourSystem = ActorSystem("OurExampleSystem")
}
class Downloader extends Actor {
def receive = {
case DownloadManager.Download(url, dest) =>
val content = Source.fromURL(url)
FileUtils.write(new java.io.File(dest), content.mkString)
sender ! DownloadManager.Finished(dest)
}
}
class DownloadManager(val downloadSlots: Int) extends Actor {
val log = Logging(context.system, this)
val downloaders = mutable.Queue[ActorRef]()
val pendingWork = mutable.Queue[DownloadManager.Download]()
val workItems = mutable.Map[ActorRef, DownloadManager.Download]()
override def preStart(): Unit = {
for (i <- 0 until downloadSlots) downloaders.enqueue(context.actorOf(Props[Downloader], s"dl$i"))
}
private def checkMoreDownloads(): Unit = {
if (pendingWork.nonEmpty && downloaders.nonEmpty) {
val dl = downloaders.dequeue()
val workItem = pendingWork.dequeue()
log.info(s"$workItem starting, ${downloaders.size} download slots left")
dl ! workItem
workItems(dl) = workItem
}
}
def receive = {
case msg @ DownloadManager.Download(url, dest) =>
pendingWork.enqueue(msg)
checkMoreDownloads()
case DownloadManager.Finished(dest) =>
workItems.remove(sender)
downloaders.enqueue(sender)
log.info(s"Down to '$dest' finished, ${downloaders.size} down slots left")
checkMoreDownloads()
}
override val supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 6, withinTimeRange = 30 seconds) {
case fnf: java.io.FileNotFoundException =>
log.info(s"Resource could not be found: $fnf")
workItems.remove(sender)
downloaders.enqueue(sender)
Resume
case _ =>
Escalate
}
}
object DownloadManager {
case class Download(url: String, dest: String)
case class Finished(dest: String)
}
object SupervisionDownloader extends App {
import DownloadManager._
val manager = ourSystem.actorOf(Props(classOf[DownloadManager], 4), "manager")
manager ! Download("http://www.w3.org/Addressing/URL/url-spec.txt", "url-spec.txt")
Thread.sleep(1000)
manager ! Download("https://github.com/scala/scala/blob/master/README.md", "README.md")
Thread.sleep(5000)
ourSystem.stop(manager )
Thread.sleep(5000)
ourSystem.shutdown()
}
'Akka' 카테고리의 다른 글
예제로 보는 아카(akka) - 8. 원격 액터 (0) | 2016.10.07 |
---|---|
예제로 보는 아카(akka) - 7. 상태머신 ( 상태에 따른 행동변화 ) (0) | 2016.10.06 |
예제로 보는 아카(akka) - 5. 액터 사이의 통신및 액터 멈추기 (0) | 2016.10.05 |
예제로 보는 아카(akka) - 4. 액터 생명 주기 (0) | 2016.10.05 |
예제로 보는 아카(akka) - 3. 부모,자식 액터 (0) | 2016.10.05 |