관리 메뉴

HAMA 블로그

예제로 보는 아카(akka) - 6. 액터 관리 (고장/예외 처리하기) 본문

Akka

예제로 보는 아카(akka) - 6. 액터 관리 (고장/예외 처리하기)

[하마] 이승현 (wowlsh93@gmail.com) 2016. 10. 5. 18:04

- Scala 2.11 기반 

- Akka 2.4.11 기반 

Learning Concurrent Programming in Scala 참고 


액터 관리  (고장/예외 처리하기)

모든 액터는 자신의 자식에 대한 관리자 (Supervisor) 역할을 수행한다. 
어떤 자식 액터가 어떤 이유로 실패했을때, 부모에게 어떻게 처리 할 지 요청한다. 
그때 부모가 처리하는 방법을 관리 정책이라 하면 다음과 같이 이루어진다.

재시작한다. (Restart)

별 거 아니면 무시하고 계속 실행한다. (Resume) 

액터를 영구히 중단시킨다. (Stop)

더 상위 액터에게 책임을 넘긴다. (Escalate)

심플 예제 )


class Naughty extends Actor {
val log = Logging(context.system, this)
def receive = {
case s: String => log.info(s)
case msg => throw new RuntimeException
}
override def postRestart(t: Throwable) = log.info("naughty restarted")
}


class Supervisor extends Actor {
val child = context.actorOf(Props[Naughty], "victim")
def receive = PartialFunction.empty
override val supervisorStrategy =
OneForOneStrategy() {
case ake: ActorKilledException => Restart
case _ => Escalate
}
}

object SupervisionKill extends App {
val s = ourSystem.actorOf(Props[Supervisor], "super")
ourSystem.actorSelection("/user/super/*") ! Kill
ourSystem.actorSelection("/user/super/*") ! "sorry about that"
ourSystem.actorSelection("/user/super/*") ! "kaboom".toList
Thread.sleep(1000)
ourSystem.stop(s)
Thread.sleep(1000)
ourSystem.terminate()
}

 - Supervisor 관리액터는 아무 메세지를 처리하지 않고 하위 액터가 오류가 생겼을때 전략을 세운다.

 - 여기선 OneForOneStrategy 라고 하나의 액터에 대해서 처리를 정의한다. AllForOneStarategy 는 모두
    에게 연대책임을 묻는다. 
(즉, 말썽을 부린 액터의 형제/자매 액터들에게) 한꺼번에 적용시킬 수도 있다. 

 - Supervisor 관리액터는 자식 액터에서 ActorKilledException 예외가 발생하면 Restart 로 재시작시킨다. 
    ( 자식액터에게  ! Kill 메세지를 보내면 자식 액터에서 ActorKilledException이 발생한다 ) 

 - Supervisor 관리액터는 자식 액터에서 기타 예외가 발생하면 자신의 부모 액터에게 책임을 넘긴다. 

 - 자식인 Naughty 액터는 String 만 처리할 수 있는데 그 외의 메세지가 오면 RuntimeException 발생시킨다.

  

분산 다운로드 예제 )

 관리 소스만 보면 알거 같다. Supervisor 액터가 여러개의 worker 액터를 만들어서 각 액터에게 파일을 다운로드 받아서 파일로 저장하게 하는 예제이다. Supervisor  액터가 하는일을  정리하면

- 일을 워커에서 지시 
- worker 의 갯수에 비해 일거리가 많으면 일거리를 큐에 저장 
- 놀고 있는 worker 큐에 저장
- 일하고 있는 worker 일과 워커를 매핑해서 저장 
- worker 액터에 문제가 생기면 확인해서 다시 시작시킴. 이 예제에선 별 문제 아니라서 Resume 이용. 


import akka.actor._
import akka.event.Logging
import akka.actor.SupervisorStrategy._
import org.apache.commons.io.FileUtils
import scala.io.Source
import scala.collection._
import scala.concurrent.duration._
import mysystem._

object mysystem {
lazy val ourSystem = ActorSystem("OurExampleSystem")
}


class Downloader extends Actor {
def receive = {
case DownloadManager.Download(url, dest) =>
val content = Source.fromURL(url)
FileUtils.write(new java.io.File(dest), content.mkString)
sender ! DownloadManager.Finished(dest)
}
}


class DownloadManager(val downloadSlots: Int) extends Actor {
val log = Logging(context.system, this)
val downloaders = mutable.Queue[ActorRef]()
val pendingWork = mutable.Queue[DownloadManager.Download]()
val workItems = mutable.Map[ActorRef, DownloadManager.Download]()

override def preStart(): Unit = {
for (i <- 0 until downloadSlots) downloaders.enqueue(context.actorOf(Props[Downloader], s"dl$i"))
}

private def checkMoreDownloads(): Unit = {
if (pendingWork.nonEmpty && downloaders.nonEmpty) {
val dl = downloaders.dequeue()
val workItem = pendingWork.dequeue()
log.info(s"$workItem starting, ${downloaders.size} download slots left")
dl ! workItem
workItems(dl) = workItem
}
}

def receive = {
case msg @ DownloadManager.Download(url, dest) =>
pendingWork.enqueue(msg)
checkMoreDownloads()
case DownloadManager.Finished(dest) =>
workItems.remove(sender)
downloaders.enqueue(sender)
log.info(s"Down to '$dest' finished, ${downloaders.size} down slots left")
checkMoreDownloads()
}

override val supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 6, withinTimeRange = 30 seconds) {
case fnf: java.io.FileNotFoundException =>
log.info(s"Resource could not be found: $fnf")
workItems.remove(sender)
downloaders.enqueue(sender)
Resume
case _ =>
Escalate
}
}


object DownloadManager {
case class Download(url: String, dest: String)
case class Finished(dest: String)
}


object SupervisionDownloader extends App {
import DownloadManager._
val manager = ourSystem.actorOf(Props(classOf[DownloadManager], 4), "manager")
manager ! Download("http://www.w3.org/Addressing/URL/url-spec.txt", "url-spec.txt")
Thread.sleep(1000)
manager ! Download("https://github.com/scala/scala/blob/master/README.md", "README.md")
Thread.sleep(5000)
ourSystem.stop(manager )
Thread.sleep(5000)
ourSystem.shutdown()
}


Comments