관리 메뉴

HAMA 블로그

[코틀린 코딩 습작] coroutine & channel 본문

Kotlin

[코틀린 코딩 습작] coroutine & channel

[하마] 이승현 (wowlsh93@gmail.com) 2021. 6. 7. 11:53


Coroutine

// THREAD 방식 

fun main() {
  
  val startTime = System.currentTimeMillis()
  val counter = AtomicInteger(0)
  val numberOfCoroutines = 100_00
  val jobs = List(numberOfCoroutines) {
    thread(start = true) {
      Thread.sleep(100L)
      counter.incrementAndGet()
    }
  }
  jobs.forEach { it.join() }

  val timeElaspsed = System.currentTimeMillis() - startTime
  println(timeElaspsed)
}
// Coroutine 방식

import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.util.concurrent.atomic.AtomicInteger
fun main() {
  val startTime = System.currentTimeMillis()
  runBlocking<Unit> {
    val counter = AtomicInteger(0)
    val numberOfCoroutines = 100_00

    val jobs = List(numberOfCoroutines) {
      launch {
        delay(100L)
        counter.incrementAndGet()
      }
    }
    jobs.forEach { it.join() }
  }

  println(System.currentTimeMillis() - startTime)
}

Thread를 통해서 작업을 하면 2130밀리초가 걸리는 작업을 Coroutine을 통해서하면 209 밀리초 밖에 걸리지 않는다. Coroutine은 경량쓰레드를 제공하기 때문에 가능한데, 이 말은 코루틴 여러개가 몇개의 쓰레드를 다시 나누어 사용 한다는 의미이다. 즉 시작되는 쓰레드와 종료 시점의 쓰레드가 달라질 수 있다. Go언어에서 제공되는 경량쓰레드(goroutine)와는 달리 내부적으로 자바 쓰레드풀을 사용하기 때문에, 진정한 경량쓰레드를 쓰냐에 관련된 말이 있지만, 어차피 Go언어도 자신만의 추상층을 가지고 있기 때문에 별 의미는 없어 보인다. (스택오버플로우에서 설명된 go vs kotlin 차이)

(* python의 코루틴에서 주로 사용하는 generator / send / yield 같은 기능은 없어 보인다. 다만 아래서 설명할 channel로 비슷하게 만들 순 있다.)

fun main() {
  runBlocking<Unit> {

    val time = measureTimeMillis {
      // given
      val one = async {
        delay(1000L)
      }
      val two = async {
        delay(2000L)
      }
      
      // when
      runBlocking {
        one.await()
        two.await()
      }
    }
    println(time) // 2013 mills
  }
}

launch와 async의 주요 차이점은 launch는 job을 리턴하고, async는 deffered를 리턴하는 것이다. 
job은 코루틴 자체를 의미하므로, launch로 실행되는 코루틴을 취소 할 수도 있고 기다릴 수도 있다. 즉 라이프 사이클에 관심이 있으며, deffered는 future처럼 미래의 결과 값을 의미하므로 완료 되길 기다리다가 리턴되는 값에 관심이 있다.
deffered는 job을 상속받으므로 job의 역할도 할 수 있다. 즉 non-blocking cancellable future 라 볼 수 있다.  

따라서 결과값에 관심이 있으면 async를 사용하고, 아니면 launch를 사용하면 된다. 


Channel 

개인적으로 golang을 매우 좋아하는데, 그 이유는 오로지 심플한 goroutine과 go channel의 존재에 있다. CSP기법의 하나인 이것은 동시성 프로그래밍을 매우 간단하고 직관적으로 만들어 준다. 중요한건 재밌다는 사실!!  kotlin에도 go channel식으로 개발하는 것을 지원해 주니 사용 안 할 이유가 없다. 

https://www.baeldung.com/kotlin/channels 에서 코드를 가져왔다. 별다른 설명이 필요 없을 정도로 코드가 깔끔하다.

import kotlinx.coroutines.async
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Test
import kotlin.test.assertEquals

internal class ChannelTest {
  
  @Test
  fun `should_pass_data_from_one_coroutine_to_another`(){
    runBlocking {
      // given
      val channel = Channel<String>()
      
      // when
      launch { // coroutine1
        channel.send("Hello World!")
      }
      val result = async { // coroutine2
        channel.receive()
      }
      // then
      assertEquals(result.await(),"Hello World!")
    }
    
  }
}

하나의 코루틴에서 다른 코루틴으로 데이터를 전송하고 받는 기본적인 코드이다. 

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun main() {
  runBlocking {
    // given
    val channel = Channel<Channel<String>>()
    val eventChannel = Channel<String>()
    
    // when
    launch { // coroutine1
      channel.send(eventChannel)
      print(eventChannel.receive())
    }
    launch { // coroutine2
      val eventChannel = channel.receive()
      eventChannel.send("hi there")
    }
  }
}

go channel에서는 채널을 채널에 전송 할 수 있어서, 해 보았는데 코틀린도 잘 된다.

아래는 Pub-Sub패턴의 코드이다. 

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun CoroutineScope.producePizzaOrders(): ReceiveChannel<String> = produce {
  var x = 1
  while (true) {
    send("Pizza Order No. ${x++}")
    delay(100)
  }
}

fun CoroutineScope.pizzaOrderProcessor(id: Int, orders: ReceiveChannel<String>) = launch {
  for (order in orders) {
    println("Processor #$id is processing $order")
  }
}

fun main() = runBlocking {
  val pizzaOrders = producePizzaOrders()
  repeat(3) {
    pizzaOrderProcessor(it + 1, pizzaOrders)
  }
  
  delay(1000)
  pizzaOrders.cancel()
}

produce는 ReceiveChannel<T>을 리턴하는 코루틴이다. (async는 deffered, launch는 job)

마지막으로 채널을 파이프라이닝으로 연결 할 수도 있다.
병렬로 파이프라이닝/체이닝/필터링 패턴을 적용 할 때 좋을 거 같다. 

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.produce

fun CoroutineScope.baking(orders: ReceiveChannel<PizzaOrder>) = produce {
  for (order in orders) {
    delay(200)
    println("Baking ${order.orderNumber}")
    send(order.copy(orderStatus = BAKED))
  }
}

fun CoroutineScope.topping(orders: ReceiveChannel<PizzaOrder>) = produce {
  for (order in orders) {
    delay(50)
    println("Topping ${order.orderNumber}")
    send(order.copy(orderStatus = TOPPED))
  }
}

fun CoroutineScope.produceOrders(count: Int) = produce {
  repeat(count) {
    delay(50)
    send(PizzaOrder(orderNumber = it + 1))
  }
}

fun main() = runBlocking {
  val orders = produceOrders(3)
  
  val readyOrders = topping(baking(orders))
  
  for (order in readyOrders) {
    println("Serving ${order.orderNumber}")
  }
  
  delay(3000)
  coroutineContext.cancelChildren()
}
0 Comments
댓글쓰기 폼