일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 | 31 |
- 스칼라
- 파이썬 강좌
- akka 강좌
- 파이썬 머신러닝
- Hyperledger fabric gossip protocol
- 플레이프레임워크
- 파이썬
- play 강좌
- Golang
- Akka
- 스칼라 강좌
- 이더리움
- 스칼라 동시성
- Play2
- hyperledger fabric
- 안드로이드 웹뷰
- play2 강좌
- 하이브리드앱
- 파이썬 데이터분석
- Actor
- 주키퍼
- Adapter 패턴
- CORDA
- 블록체인
- 엔터프라이즈 블록체인
- 파이썬 동시성
- Play2 로 웹 개발
- 그라파나
- 하이퍼레저 패브릭
- 스위프트
- Today
- Total
HAMA 블로그
예제로 보는 아카(akka) - 19. Akka streams [번역] 본문
Akka Streams
저는 현재 사물인터넷 스타트업에서 일하고 있는데요.. 엣지 디바이스에서 전달해주는 데이터 양방향 전송하며 그 역할에 따라 분산 시켜서 다루기 위한 환경으로 Akka 플랫폼을 선택&구현했으며, 추가적으로 마이크로서비스간에 데이터의 흐름을 좀 더 부드럽게 처리하기 위하여 Akka Streams을 살펴보고 있습니다. 본문의 글은 아래 링크글의 번역&정리입니다.
Introduction to Akka Streams – Getting started
Reactive Streams
Akka project 의 새로운 실험 모듈인 Akka Streams 는 몇 달 간의 개발과 몇 가지 마일스톤 및 RC 버전을 거쳐 2015년 7 월에 마침내 출시되었습니다. 본 글에서는 라이브러리의 개념을 간략히 소개하고 실제 스트림 처리를 해결하는 위해 이 라이브러리를 사용하는 방법을 보여줄 것입니다.
한 문장으로 정리하면 Akka Streams는 액터 기반 동시성 모델을 사용하는 Akka 툴킷 위에 Reactive Streams 사양을 구현 한 것입니다. Reactive Stream 스펙은 비동기식, 비 블로킹, 이벤트 기반 데이터 처리에 관심이있는 여러 회사들이 모여 시스템 경계와 기술 스택 전반에 걸쳐 소통,적용 할 수 있도록 만들어졌습니다.
비동기 프로세싱 with non-block back pressure
Reactive Streams Initiative는 비동기 처리에서 가장 중요하면서도 문제가 되는 것 중 하나를 해결하는것을 목표로 하기 때문에 관심을 가질만 한데요. 즉, 시스템 리소스를 효율적으로 사용하면서 메시지의 생산자와 소비자 간의 처리 속도를 조정하는 기능입니다. 기본적으로, 생산자가 빠른속도로 전송하는 메시지는 처리가 느린 소비자에게는 잠재적으로 문제를 일으킬 수 있습니다. 이러한 상황은 일반적으로 데이터 소스가 적절하게 back-pressured 를 조절하지 못하는 경우 어디선가 리소스 소모가 일어나게 됩니다.
윽~~ 해야할 일이 너무 많어...
내가 요청한 만큼만 보내 줘~
과거에는 소비자가 자신의 페이스대로 메시지를 처리하기를 기다리는 동안 생산자를 차단함으로써 일반적인 back pressure 를 수행했습니다. 시스템 간 메시지의 동기 처리에 의존하는 이 접근법은 비효율적이며 비동기 처리 (훨씬 뛰어난 확장성 과 자원 활용)의 장점을 무효로 하므로 back pressure 을 구현하기 위한 논블럭 솔루션이 필요합니다. reactive stream 과 관련하여 back pressure 은 비동기 처리 모델의 필수 요소이며 비동기 메시지 전달을 통해 구현됩니다.
Getting started
Akka Streams를 시작합시다! 먼저 SBT 프로젝트를 만들고 build.sbt에 다음을 추가해 보겠습니다.
libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-stream-experimental" % "1.0" )
스트림을 구성하고 실행하려면 ActorSystem 및 ActorMaterializer를 만들어야합니다.
import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream.scaladsl._ object MyFirstStream { def main(args: Array[String]): Unit = { implicit val system = ActorSystem("MyActorSystem") implicit val materializer = ActorMaterializer() // stream definition and execution code goes here } }
ActorMaterializer는 처리 흐름을 실행할 액터를 생성하는 책임이 있으며 암묵적으로 ActorSystem을 사용할 수 있어야합니다.
기본 스트림 빌딩 블록
가장 기본적인 스트림을 조합하여 실행하기에 앞서 Akka Streams에서 제공하는 몇 가지 기본 구성 요소를 살펴 보겠습니다. 이 시리즈 글의 후속 부분에서는 다른 고급 처리 유형을 소개합니다.
Source
소스는 스트림의 시작점이며, 시작 스트림을 통해 데이터가 흐르기 시작됩니다. 소스는 원격 TCP 소켓,콜렉션, 데이터베이스 조회 또는 HTTP 요청과 같은 메시지를 생성 할 수있는 모든 것이 될 수 있습니다. Akka Streams는 다양한 데이터 생성 엔티티에서 소스를 생성 할 수 있습니다.
val sourceFromRange = Source(1 to 10) val sourceFromIterable = Source(List(1,2,3)) val sourceFromFuture = Source(Future.successful("hello")) val sourceWithSingleElement = Source.single("just one") val sourceEmittingTheSameElement = Source.repeat("again and again") val emptySource = Source.empty
Sink
싱크는 스트림을 통해 흐르는 모든 메시지의 최종 목적지입니다. 라이브러리는 여러 가지 즉시 사용 가능한 싱크 구현을 지원합니다.
val sinkPrintingOutElements = Sink.foreach[String](println(_)) val sinkCalculatingASumOfElements = Sink.fold[Int, Int](0)(_ + _) val sinkReturningTheFirstElement = Sink.head val sinkNoop = Sink.ignore
Flow
흐름은 스트림 내의 처리 단계입니다. 하나의 수신 채널과 하나의 송신 채널을 묶으며 그것을 통과하는 메시지를 변형합니다. Akka Streams는 사용자 정의 동작을 포함하는 다양한 유형의 간단한 흐름을 정의하는 데 도움이되는 풍부한 DSL을 제공합니다.
val flowDoublingElements = Flow[Int].map(_ * 2) val flowFilteringOutOddElements = Flow[Int].filter(_ % 2 == 0) val flowBatchingElements = Flow[Int].grouped(10) val flowBufferingElements = Flow[String].buffer(1000, OverflowStrategy.backpressure) // back-pressures the source if the buffer is full
stream 정의하기
스트림은 임의의 처리 그래프 구조 또는 네트워크를 표현 할 수 있으며 Akka Streams는 강력한 DSL을 사용하여 이러한 처리 그래프를 매우 쉽게 작성할 수 있습니다. 이 연재 전반에 걸쳐 우리는 라이브러리를 사용하여 점점 더 복잡한 처리 그래프를 정의하는 간단한 방법으로 시작하는 다양한 방법을 모색 할 것입니다.
모든 스트림 정의에 포함되어야하는 두 가지 필수 처리 단계는 소스와 싱크이므로 가장 간단한 스트림은 다음과 같이 작성 될 수 있습니다.
val streamCalculatingSumOfElements: RunnableGraph[Future[Int]] = sourceFromIterable.toMat(sinkCalculatingASumOfElements)(Keep.right)
불행하게도 이것처럼 스트림을 함께 놓는것은 우리에게 처리단계의 materialised 방식을 후속 단계 (위의 예제에서 싱크)와 결합되어 다뤄지도록 강제합니다. 안타깝게도 스트림을 하나로 합치면 처리 단계의 구체화 된 값이 처리되야 하므로 toMat 메소드를 사용하면 구체화 된 값을 지정해야 합니다. 그리하여 어떤 materialised 값)을 특정하게 하는 toMat 메소드를 사용은 스트림 materialisation 동안 노출되야 합니다. runWith 또는 runFold를 사용하여 전체 스트림을 하나의 싱글 메소드로 돌리는거나 싱크를 붙히는것을 막습니다.
Stream materialization과 materialized 값을 다루는 것은 꽤 복잡한 영역이며 명확한 별도의 게시물이 필요합니다.
스트림에 변환을 포함 시키려면 소스와 싱크 사이에 Flow 를 포함시킬 수 있습니다.
val streamCalculatingSumOfDoubledElements: RunnableGraph[Future[Int]] = sourceFromIterable.via(flowDoublingElements).toMat(sinkCalculatingASumOfElements)(Keep.right)
모든 기본 스트림 구성 요소가 완벽하게 구성 가능하므로 연결된 흐름 (또는 다중 흐름)이있는 소스를 소스 자체로 취급 할 수 있습니다 (자세한 내용은 시리즈 뒷부분 참조).
Running the stream
이제 스트림을 정의 했으므로 (즉, 모든 처리 단계를 설명하는 청사진을 작성하여)이를 실행할 수 있습니다. 이 프로세스를 스트림의 materialization이라고하며, 소스에서 생성 된 모든 데이터를 처리하고 마지막으로 스트림의 구체화 된 값을 생성 할 수있는 인프라를 만들기 위해 Akka 액터를 만들고 연결합니다. ).
val sumOfElements: Future[Int] = streamCalculatingSumOfElements.run() sumOfElements.foreach(println) // we expect to see 6 val sumOfDoubledElements: Future[Int] = streamCalculatingSumOfDoubledElements.run() sumOfDoubledElements.foreach(println) // we expect to see 12
이제는 기본적인 산술 계산을 수행하는 코드가 많은 것처럼 보일 수 있지만 Akka Streams를 사용하여 스트림을 정의하고 실행하는 기본 사항을 보여주기 위해 사용했습니다. 실제로 이 라이브러리는 간단한 작업을 수행하는 몇 가지 다른 방법을 제공하며 훨씬 적은 코드로 구성될 수 있습니다.
// runs the stream by attaching specified sink sourceFromIterable.via(flowDoublingElements).runWith(sinkCalculatingASumOfElements).foreach(println) // runs the stream by attaching sink that folds over elements on a stream Source(List(1,2,3)).map(_ * 2).runFold(0)(_ + _).foreach(println)
'Akka' 카테고리의 다른 글
Akka 의 Akka.HTTP 및 클러스터 샤딩을 이용한 스케일 확장 [번역] (0) | 2017.12.05 |
---|---|
예제로 보는 아카(akka) - 20. Akka HTTP [번역] (0) | 2017.03.27 |
예제로 보는 아카(akka) - 18. 서비스로 설치 ( Deploying ) (0) | 2016.12.27 |
예제로 보는 아카(akka) - 17. 설치 ( Deploying ) (0) | 2016.12.26 |
예제로 보는 아카(akka) - 16. 로깅 (Logging) (0) | 2016.12.26 |