관리 메뉴

HAMA 블로그

예제로 보는 아카(akka) - 9. akka I/O (TCP 통신) 본문

Akka

예제로 보는 아카(akka) - 9. akka I/O (TCP 통신)

[하마] 이승현 (wowlsh93@gmail.com) 2016. 10. 7. 10:14

- Scala 2.11 기반 

- Akka 2.4.11 기반 

akka.io 공식문서 정리

만약 Akka IO 를 사용하지 않고, 직접 로우레벨 도구(Selection, Channel, Event, Mutex, Completion Port 등) 를 사용하여 직접 기틀(프레임워크)를 만든다고 할 지라도 이 글은 도움이 될 것입니다. 자신의 통신모듈을 어떻게 조직화하고 ,어떤 것 을 포함해야 할 지에 대한 정보와 힌트를 얻을 수 있을 테니까요. 하지만 웬간하면 바퀴를 또 발명할 필요는 없을 것이고  akka 같은 도구를 이용해서 바퀴에 신경쓰기 보단 바퀴로 수레를 만들지, 기차를 만들지에 대한 상상력의 날개를 펼쳐보는게 어떨까요. 바퀴 만드는 사람에 대한 고마움은 마음 속에 간직하시고..

이 글은 공식 문서를 이해해 가면서  쓰긴 했지만 몇 군데는 정확한 이해를 못한 부분이 있기 때문에 공식 문서를 참고 하세요. 공식 문서도 굉장히 딱딱 하고 기초적인것을 설명해 주고 있지 않기 때문에 이 글이나 공식문서를 쉽게 읽기 위해서는 소켓통신, 비동기I/O패턴, Scala , Akka 기초지식은 있어야 할 거 같습니다.

 하마의 스칼라 강좌 바로가기 

Akka IO  (TCP통신)  

아래 내용에 사용된 코드는 요기를 참고하세요: On GitHub

sbt 파일에 추가 디펜던시를 추가할 필요는 없습니다


클라이언트 


이 블로그 글 에서 나오는 예제들은 다음것들을 임포트 했다고 가정합니다.

  1. import akka.actor.{ Actor, ActorRef, Props }
  2. import akka.io.{ IO, Tcp }
  3. import akka.util.ByteString
  4. import java.net.InetSocketAddress

모든 Akka I/O API 들은 매니저 객체를 통해 접근되며  I/O API 를 사용할때 첫번째 단계는 원하는 매니저에 대한 참조를 얻는것이다.  아래 코드는 Tcp 매니저에 대한 참조를 얻는 방법을 보여주네요.

  1. import akka.io.{ IO, Tcp }
  2. import context.system //IO(Tcp) 에 의해 사용됨.
  3.  
  4. val manager = IO(Tcp) // Tcp 매니저에 대한 참조를 얻음.

매니저도 액터이며 하는일이 꽤 많은데 로우레벨 I/O (셀렉터,채널) 을 다루며, 특정 업무 (커넥션에 대해서 리스닝 하고 있는 것 등등) 를 하기 위한 별개의 Worker 객체들을 관리합니다.

Connecting 

  1. object Client {
  2. def props(remote: InetSocketAddress, replies: ActorRef) =
  3. Props(classOf[Client], remote, replies)
  4. }
  5.  
  6. class Client(remote: InetSocketAddress, listener: ActorRef) extends Actor {
  7.  
  8. import Tcp._
  9. import context.system
  10.  
  11. IO(Tcp) ! Connect(remote)
  12.  
  13. def receive = {
  14. case CommandFailed(_: Connect) =>
  15. listener ! "connect failed"
  16. context stop self
  17.  
  18. case c @ Connected(remote, local) =>
  19. listener ! c
  20. val connection = sender()
  21. connection ! Register(self)
  22. context become {
  23. case data: ByteString =>
  24. connection ! Write(data)
  25. case CommandFailed(w: Write) =>
  26. // O/S buffer was full
  27. listener ! "write failed"
  28. case Received(data) =>
  29. listener ! data
  30. case "close" =>
  31. connection ! Close
  32. case _: ConnectionClosed =>
  33. listener ! "connection closed"
  34. context stop self
  35. }
  36. }
  37. }

IO(Tcp) ! Connect(remote)

원격 주소에 접근하는 첫번째 단계는 Connect 메세지를 TCP 매니저에게 보내는것 입니다.
InetSocketAddress 를 인자로 보내서  접속할 주소를 바인딩하며 소켓옵션도 설정합니다.

Note

아카에서 SO_NODELAY(윈도우에서는 TCP_NODELAY)  는 디폴트로 True 이다. (즉 나글알고리즘 사용 안함) 

  1. case CommandFailed(_: Connect) =>
  2. listener ! "connect failed"
  3. context stop self


TCP 매니저는  접속에 먼가 문제가 생기면 CommandFailed 를 보내서 멈춘니다. 


  1. case c @ Connected(remote, local) =>
  2. listener ! c
  3. val connection = sender()
  4. connection ! Register(self)


접속이 성공하면  원격 포인트와 매핑되는 커넥션 액터가 만들어지면서  Client 액터에게 Connected 메세지를 돌려줄것입니다. 위에 sender() 는 새로 만들어진 커넥션 액터를 말합니다. 새로운 커넥션 액터를 활성화 시키기 위해서는 위의 예처럼 Register 메세지가 보내어 져야 합니다.  소켓으로 부터 받은 데이터를  처리할 액터를 알려주면서 말이죠. 여기선 self 즉 Client 액터가 되네요.

이 Register 작업이 완료되기 전에 커낵션은 사용될 수 없으며  내부적으로 타임아웃을 가지고 있어서 해당 시간 전에 Register 메세지를 받지 못하면 그냥 셧다운 됩니다.이 커넥션 액터는 등록된 핸들러를 감시하고 하나가 종료되면 저 커넥션에 연관된  모든 내부 리소스들을 청소하고 커넥션을 닫습니다.  


  1. context become {
  2. case data: ByteString =>
  3. connection ! Write(data)
  4. case CommandFailed(w: Write) =>
  5. // O/S buffer was full
  6. listener ! "write failed"
  7. case Received(data) =>
  8. listener ! data
  9. case "close" =>
  10. connection ! Close
  11. case _: ConnectionClosed =>
  12. listener ! "connection closed"
  13. context stop self
  14. }


 akka 에서 상태에 따른 행동변경을 위해 become 을 사용합니다. 이전 (초기) 상태에 매핑되는 행동 메소드는 Receive 메소드이고 이 후에는 become 이후 코드가 행동 메소드가 되죠. 즉 이전에는 커넥션하는게 주요 행동이고 커넥션이 끝나면 메세지 주고,받는게 주요 행동이란 야그입니다. 

코드를 살펴보면 
ByteString 메세지가 오면 커넥션 액터에 데이터를 써주고요.
CommandFailed (w: Write) 메세지는 OS 버퍼가 가득 차 있어서 쓰기에 실패 했다는 뜻 입니다.
Received(data) 는 원격으로부터 데이터를 받았다. 리스너에게 중계 해주네요. 여기서 리스너는 님이 만든 액터입니다. Client 에서 모든것을 처리하면 병목이 됩니다. 데이터를 DB 에 넣던 파일에 쓰던 그 비지니스 로직은 님이 직접 만드는게 좋습니다.  

Bootstrap

val client = system.actorOf(Client.props(listen, testActor), "client1")

client ! ByteString("hello")
client ! "close"

Client 액터를 생성하며  InetSocketAddress 타입의 listen 과 받은 데이터를 처리 할 testActor 액터를 파라미터로 넣어 줍니다.  원격으로 "hello" 라는 문자열을 보내고 바로 커넥션을 닫네요. 


서버

Accepting connections

  1. class Server extends Actor {
  2.  
  3. import Tcp._
  4. import context.system
  5.  
  6. IO(Tcp) ! Bind(self, new InetSocketAddress("localhost", 0))
  7.  
  8. def receive = {
  9. case b @ Bound(localAddress) =>
  10. // do some logging or setup ...
  11.  
  12. case CommandFailed(_: Bind) => context stop self
  13.  
  14. case c @ Connected(remote, local) =>
  15. val handler = context.actorOf(Props[SimplisticHandler])
  16. val connection = sender()
  17. connection ! Register(handler)
  18. }
  19.  
  20. }

IO(Tcp) ! Bind(self, new InetSocketAddress("localhost", 0))

TCP 서버를 만들고 인바운드 연결에 대한  listen 을 하기 위해 Bind 명령을 매니저에 전달 합니다.
Bind 명령은 TCP 매니저에게 각 연결들에 대해 InetSocketAddress 를 통한 설정 상에서 listen 하도록 지시하죠. 
- 포트는 0으로 설정하여 랜덤포트로 바인드하였습니다.

  1. case b @ Bound(localAddress) =>
  2. // do some logging or setup ...


- Bind 메세지를 보내고 나서 Bound 메세지를 받을 건데, 서버가 접속을 받을 준비가 됬다는 신호를 말합니다.
- 이 메세지는  또한 소
켓에 실제 바운드 ( 즉 IP 주소와 포트) 된  InetSocketAddress 를 포함한다.

  1. case c @ Connected(remote, local) =>
  2. val handler = context.actorOf(Props[SimplisticHandler])
  3. val connection = sender()
  4. connection ! Register(handler)


- 클라이언트로부터 연결이 되면 Connected(remote, local) 메세지가 옵니다. - 위에서 sender() 는 해당 클라이언트와 통신을 책임질 새로 만들어진 커넥션 액터입니다. - 클라이언트와 통신 할 전용 핸들러  SimplisticHandler 를 Register을 통해 등록합니다. 

간단한 핸들러의 예제가 아래에 있습니다.

  1. class SimplisticHandler extends Actor {
  2. import Tcp._
  3. def receive = {
  4. case Received(data) => sender() ! Write(data)
  5. case PeerClosed => context stop self
  6. }
  7. }

- Received(data) 메세지의 데이터를 처리 하는 코드를 넣으면 됩니다. DB 에 쓰거나 파일에 쓰거나 분석하거나..
  여기선 다시 되돌려 주네요. 
- PeerClosed 메세지를 받으면 액터가 종료됩니다. PeerClosed 메세지는 원격 에서 접속을 닫았다는 것입니다. 
좀 더 복잡한 예제는 아래에 소개할  Throttling Reads and Writes  를 참고 하세요.

Closing connections

- 커넥션 액터에 다음 명령 (Close, ConfirmedClose , Abort  중 하나를 발송함으로써 연결은 닫혀 집니다.
- Close 는 FIN 메세지를 보내는 역할을 해서 연결을 닫습니다. 원격으로 부터의 확인을 기다리지 않을 것 이고 보류 해 놓은 쓰기 데이터는 어쨋든 flush 됩니다. 만약 연결 닫힘이 성공되면, 리스너는 Closed 와 함께 고지될 것 입니다.
- ConfirmedClose 는 FIN 메세지를 보내는것으로 전송하는 방향의 연결을 닫을 것 입니다. 그러나 원격 지점이 연결을 닫을 때 까지는 계속 해서 데이터를 수신 할 것입니다. 보류된 쓰기는 flush 될 것이며 만약 연결 닫힘이 성공하면 리스너는 ConfirmedClosed 롤 고지 받을 것입니다.
- Abort 는 RST 메세지를 원격 지점에 보내는 것 으로 즉시 연결을 종료 할 것입니다. 남은 쓰기 작업은 flush 되지 못할것이며 만약 닫힘이 성공하면 리스너는 Aborted 를 고지 받을 것입니다. 
- PeerClosed 는 연결이 원격 지점에 의해 닫혀지면 리스너에 의해 보내어 집니다. 디폴트에서 연결은 자동적으로 닫혀질 것입니다만 half-closed 연결을 지원하기 위해 Register 메세지에 keepOpenOnPeerClosed 를 true로 설정 할 수 있는데  이 상태에서 연결은 위의 close 명령 중의 하나를 받을 때 까지 열려 있게 될 것입니다. 
- ErrorClosed 는 리스너에게 에러가 발생 할 때마다 보내어 질 것이며 강제로 연결을 닫히게 할 것입니다.

모든 close 알림은 ConnectionClosed 서브타입이며 리스너는 close 를 더 자잘하게 구현해야 할 필요없는 모든 close 이벤트를 동일한 방식으로 다룰 것 입니다.  

Writing to a connection

- 일단 접속이 맺어지면 데이터는 Tcp.WriteCommand 라는 형태의 액터로부터 보내어 질 수 있게 됩니다. 
- Tcp.WriteCommand  는 3가지의 구체 구현을 가지고 있는 추상 클래스입니다.

Tcp.Write
- 가장 단순한 WriteCommand 구현이고 ByteString (this section인스턴스와 "ack" 이벤트를  감싸고 있습니다.
- ByteString 은 2 GB 의 최대 크기를 가질 수 있는 변경 불가능한 인 메모리 데이터 하나 또는 더 많은 청크를 모델링 한 것입니다.
Tcp.WriteFile
- 만약 파일로 부터의 로우 데이터를 보내고 싶으면 Tcp.WriteFile 명령을 사용하면 엄청 효율적으로 보낼 수 있을 것입니다.
- JVM 메모리로 모두를 적재할 필요 없이 송신 할 수 있게 하기 위해 on-disk 바이트의 (contiguous) 청크를 지정 할 수 있습니다.
- Tcp.WriteFile 은 원하면 
2GB 보다 많은 데이터와 "ack" 이벤트를 "hold" 할 수 있습니다.
Tcp.CompoundWrite

때때로 그룹핑하거나 엮고 싶을 것입니다.  여러 Tcp.Write 와 Tcp.WriteFile 명령어는 하나의 원자적 쓰기 명령어로 한방에 갈 수 있습니다. Tcp.CompoundWrite 는 아래 3 가지 이점을 줄 것입니다.

  1. TCP 커넥션 액터는 오직 하나의 쓰기 명령만 한 시점에 다룰 수 있습니다. 여러 쓰기를 하나로 묶음 으로써 최소한의 오버헤드를 가지게 하며  ACK 기반 메세지 프로토콜을 경유하는 커넥션 액터에 그것들을 각개적으로 부담 지우지 않게 합니다.  
  2. Because a WriteCommand is atomic you can be sure that no other actor can "inject" other writes into your series of writes if you combine them into one single CompoundWrite. In scenarios where several actors write to the same connection this can be an important feature which can be somewhat hard to achieve otherwise.
  3. The "sub writes" of a CompoundWrite are regular Write or WriteFile commands that themselves can request "ack" events. These ACKs are sent out as soon as the respective "sub write" has been completed. This allows you to attach more than one ACK to a Write or WriteFile (by combining it with an empty write that itself requests an ACK) or to have the connection actor acknowledge the progress of transmitting the CompoundWrite by sending out intermediate ACKs at arbitrary points.


데이터 읽고 쓰기 (고급)  

Throttling Reads and Writes

기본 TCP 액터 모델은 내부 버퍼링이 없습니다. 따라서 쓰고 읽기에 대한 처리 능력은 유저 레벨에서 조절될 필요가 있는데요.  ( O/S 커널이 가득 찰 경우에 대비한 처리를 해야 한다는 의미 ) 

쓰기/읽기 각각 하나의 모형만 살펴보도록 하죠. 다른 모형은 공식문서를 참고하세요.사실 이런 복잡한 모델들은 단일스레드나 멀티쓰레드에서 동기적 I/O 를 하면 알 필요도 없는거지만 Reactor, Proactor 패턴같은 비동기 모형을 쓸 때 의미를 갖습니다. 

back-pressuring writes 에 관한 모델 

  • ACK-based:  우리 동네 우체국안의 짐 보관소가 꽉 찰 경우에 어떻게 할 것인가 입니다. 
    이 모델에서는 우체국에서 "니가 보낸 짐이 보내졌다 ( 보낼 수 있다 ) 그러니 또 보내도 된다." 라는 이벤트를 받으면 보내는 모델입니다. 즉 우체국 상황 생각않하고 무조건 보내지 않는 거죠. 


back-pressuring reads 에 대한 모델 

  • Push-reading:   우체국안의 짐 보관소가 받은 짐들로 인해 꽉 찰 경우에 어떻게 할 것인가 입니다. 
    이 모델에서는 너무 많은 짐을 받게 될 경우엔 다른 동네 우체국에 그만 좀 보내라라고 말하면서 
    받는것을 대기 합니다. 충분히 받은것을 처리 할 능력이 생기면 다시 보내라고 커넥션 액터가 푸쉬해줍니다. 

ACK-Based Write Back-Pressure 


  1. case Connected(remote, local) =>
  2. log.info("received connection from {}", remote)
  3. val handler = context.actorOf(Props(handlerClass, sender(), remote))
  4. sender() ! Register(handler, keepOpenOnPeerClosed = true)

- 클라이언트와 접속이 맺어지면 Connected 메세지가 날라옵니다. 
- 클라이언트와 데이터를 처리할 핸들러 액터를 만듭니다. 인자로 커넥션 액터(sender()) 를 주네요.
- 커넥션 액터에 우리가 만든 핸들러를 등록해 줍니다.
keepOpenOnPeerClosed  는 half-closed connection 을 말하는데  TCP 특성상 양쪽에서 받고 보내는 연결이 2중이 되는데 , 한 쪽만 닫는것을 말합니다. 종료시에 보통 입력 스트림을 살리고 전송 스트림은 닫습니다. 


  1. class SimpleEchoHandler(connection: ActorRef, remote: InetSocketAddress)
  2. extends Actor with ActorLogging {
  3.  
  4. import Tcp._
  5.  
  6. context watch connection
  7.  
  8. case object Ack extends Event
  9.  
  10. def receive = {
  11. case Received(data) =>
  12. buffer(data)
  13. connection ! Write(data, Ack)
  14.  
  15. context.become({
  16. case Received(data) => buffer(data)
  17. case Ack => acknowledge()
  18. case PeerClosed => closing = true
  19. }, discardOld = false)
  20.  
  21. case PeerClosed => context stop self
  22. }
  23.  
  24. // storage omitted ...
  25. }
  1. case Received(data) =>
  2. buffer(data)
  3. connection ! Write(data, Ack)


- Received (data) 메세지를 받으면 데이터를 처리 합니다. 파일에 쓰던 데이터 분석을 하든..
- 여기선 클라이언트로 돌려 줍니다. 근데 Ack 를 같이 보내네요? 네 이겁니다.  여기서 보낸 Ack 를 처리 했다는 신호를 우체국(커넥션액터. 더 깊히는 OS) 이 나중에 보내 준다는거죠. 나중에 Ack 를 다시 돌려받으면 또 전송 할 수 있게 됩니다. 


  1. context.become({
  2. case Received(data) => buffer(data)
  3. case Ack => acknowledge()
  4. case PeerClosed => closing = true
  5. }, discardOld = false)


-  접속이 완성 된 후에는 역시 상태를 바꾸어 줍니다. 따라서 핸들러는 앞으로  become 이후의 행동 메소드를 처리하게 됩니다.
- Received(data) 메세지를 받으면 버퍼링 해줍니다. ( 아래에 자세히 설명) 
- Ack 메세지를 받는다는것은 보낸 데이터가 잘 보내어 졌다는 것을 의미합니다. ( 아래에 자세히 설명) 
- PeerClosed 메세지는 원격에서 접속 종료를 알립니다. 클로징을 시작합니다. 


  1. private def buffer(data: ByteString): Unit = {
  2. storage :+= data
  3. stored += data.size
  4.  
  5. if (stored > maxStored) {
  6. log.warning(s"drop connection to [$remote] (buffer overrun)")
  7. context stop self
  8.  
  9. } else if (stored > highWatermark) {
  10. log.debug(s"suspending reading")
  11. connection ! SuspendReading
  12. suspended = true
  13. }
  14. }
  15.  

사용자 레벨 버퍼링에서는 다음과 같은 일을 합니다.
-  받은 데이터를 저장해 놓습니다. (나중에 보내기 위해서. 이 예제는 아시다시피 에코서버입니다.)
     private var storage = Vector.empty[ByteString] 벡터를 이용하네요.
- 받은 데이터의 총량을 계산합니다. 받은 데이터의 총량이 지나치게 많으면 종료됩니다. 버퍼오버런~

val maxStored = 100000000L

val highWatermark = maxStored * 5 / 10

- 받은 데이터가 충분히 많으면 이제 그만 보내라고 지연 시킵니다. SuspendReadinig 메세지를 보내 놓네요.


  1. private def acknowledge(): Unit = {
  2. require(storage.nonEmpty, "storage was empty")
  3.  
  4. val size = storage(0).size
  5. stored -= size
  6. transferred += size
  7.  
  8. storage = storage drop 1
  9.  
  10. if (suspended && stored < lowWatermark) {
  11. log.debug("resuming reading")
  12. connection ! ResumeReading
  13. suspended = false
  14. }
  15.  
  16. if (storage.isEmpty) {
  17. if (closing) context stop self
  18. else context.unbecome()
  19. } else connection ! Write(storage(0), Ack)
  20. }

acknowledge 메소드를 보겠습니다. 참고로 이 메소드는 쓰기가 완료 되었을때 커넥션 액터로 부터 Ack 와 함께 고지 된 후에 실행 됩니다. 

-  받은 데이터를 다시 돌려 주는데 성공 했기때문에 벡터에서 데이터 청크를 하나 빼 줍니다.

  1. storage = storage drop 1

 
- 현재 받기가 지연 된 상태이고 , 저장된 데이터가 충분히 적으면 다시 받을 수 있다는것을 알려줍니다.

connection ! ResumeReading


저장소에 클라이언트로 부터 받은 데이터가 없고 클로징이 시작된 상태면 종료하고, 아니면 초기 상태로 돌아갑니다. 즉 다시 데이터 받아서 쓸 준비를 한다는 거죠. 

  1. if (storage.isEmpty) {
  2. if (closing) context stop self
  3. else context.unbecome()


-  저장소에 클라이언트로 부터 받은 데이터가 있다면 바로  클라이언트로 보내줍니다. 

  1. } else connection ! Write(storage(0), Ack)

Akka IO 는 Akka 액터와  streams 의 기반이다. !!!

Comments