관리 메뉴

HAMA 블로그

[코틀린 코딩 습작] Object Pool 본문

Kotlin

[코틀린 코딩 습작] Object Pool

[하마] 이승현 (wowlsh93@gmail.com) 2021. 6. 17. 12:44


소프트웨어 엔지니어링에서 풀의 종류는 다양한데요.

쓰레드풀,메모리풀,캐쉬풀,커넥션풀,객체풀 등등이 있습니다. "풀"어서 말하면 미리 만들어두고 돌려막기로 사용하자 라고 볼 수 있는데요. 미리 만들어 두는 방식 / 쓰레드가 태스크를 처리하는 방식/ 동기,비동기에 따라서 다양한 풀의 구현체들이 있을 수 있습니다.  이 글에서는 Kotlin으로 객체풀을 만드는 간단한 예제를 보여 줍니다. 

1. 리스트를 이용한 고정크기 동기 객체풀 

import java.lang.IllegalStateException
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock

interface ObjectPool<T> {
  fun take(): T
  fun release(obj: T)
  fun poll(waitTime: Long, waitUnit: TimeUnit): T
  fun newInstance(): T

  fun <R> use(function: (T) -> R): R {
      take().apply {
        return try {
          function(this)
        } finally {
          release(this)
        }
      }
    }

  fun <R> useAsync(function: (T) -> CompletableFuture<R>): CompletableFuture<R> {
    return CompletableFuture.supplyAsync{
      take()
    }.thenCompose {
      try{
        function(it)
      }
      finally {
        release(it)
      }
    }
   }
}

interface ObjectFactory<T> {
  fun newInstance(): T
}

class FixedObjectPool<T>(factory: ObjectFactory<T>, size: Int, val waitSize: Int = 0 ): ObjectPool<T>{
  
  val factory : ObjectFactory<T>
  val lock = ReentrantLock()
  val condition = lock.newCondition()
  val pool = mutableListOf<T>()
  var pause = false
  
  
  init {
    this.factory = factory
    for (i in 1 .. size) {
      pool.add(newInstance())
    }
  }
  
  fun size(): Int {
    return pool.size
  }
  
  override fun take(): T {
    lock.withLock {
      while (pool.isEmpty()){
        condition.await()
      }
      val obj = pool.first()
      pool.removeFirst()
      return obj
    }
  }
  
  override fun poll(waitTime: Long, waitUnit: TimeUnit): T {
    lock.withLock {
      while (pool.isEmpty()){
        if (!condition.await(waitTime, waitUnit)) throw IllegalStateException("fail to get object")
      }
      val obj = pool.first()
      pool.removeFirst()
      return obj
    }
  }
  
  override fun release(obj: T) {
    lock.withLock {
      if (pool.isEmpty()) {
        pause = true;
      }
      
      pool.add(obj)
      
      if (pause && pool.size > waitSize) {
        condition.signalAll()
        pause = false;
      }
    }
  }
  
  override fun newInstance(): T {
    return factory.newInstance()
  }
}

class EthereumWallet{
  /*
   something
  */
}

class EthereumWalletObjectFactory : ObjectFactory<EthereumWallet>{
  override fun newInstance(): EthereumWallet {
    return EthereumWallet()
  }
}

fun main() {
  println("object pool start")
  
  val pool = FixedObjectPool(EthereumWalletObjectFactory(), 4)
  println(pool.size())
  
  val obj1 = pool.take()
  println(obj1)
  val obj2 = pool.take()
  println(obj2)
  val obj3 = pool.take()
  println(obj3)
  
  val result = pool.use { it -> it.print() }
  println("use")
  println(result)
  
  val result2 = pool.useAsync { it -> CompletableFuture.supplyAsync{it.print()} }
  println("useAsync")
  println(result2.get())
  
  val obj5 = runCatching {
    pool.poll(1, TimeUnit.MILLISECONDS)
  }.onFailure {
    println("failed")
  }
  
  println("object pool end")
  
}

- 고정된 숫자를 가진 객체풀이다. 
- mutable list를 통해 객체가 관리되며, 쓰레드안전을 위해서 lock 과 condition을 직접 구현하였다. 
- use를 이용하여 take와 release를 신경 안 쓸수도 있다. 
- 눈여겨 봐야 할 부분은 2개가 있는데 첫째로 take의 while 문인데, 여러개의 쓰레드가 동시에 signal을 받을때, 뒤늦게 깨어난 쓰레드가 풀의 갯수를 확인해야 문제가 생기지 않는다. 
- 두번째로는 waitSize가 있는데, 이는 빈번한 wait/signal을 사용하지 않기 위해, 어느정도 일정 갯수의 객체가 풀안에 들어 왔을때만 동작하게 함으로써 성능을 개선 시킬 수 있다. 

2. DisruptorBlockingQueue 를 이용한 단순한 객체풀 

import com.conversantmedia.util.concurrent.DisruptorBlockingQueue
import java.util.concurrent.TimeUnit

interface ObjectPool<T> {
  fun take(): T
  fun release(obj: T)
  fun poll(waitTime: Long, waitUnit: TimeUnit): T
  fun newInstance(): T
}

interface ObjectFactory<T> {
  fun newInstance(): T
}

class FixedObjectPool<T>(factory: ObjectFactory<T>, size: Int): ObjectPool<T>{
  
  private val pool: DisruptorBlockingQueue<T>
  private val factory: ObjectFactory<T>
  
  init {
    this.factory = factory
    this.pool = DisruptorBlockingQueue<T>(size).apply {
      for (i in (1 .. size)) {
        add(newInstance())
      }
    }
  }
  
  fun size(): Int {
    return pool.size
  }
  
  override fun take(): T {
    return pool.take()
  }
  
  override fun release(obj: T) {
    pool.add(obj)
  }
  
  override fun newInstance(): T {
    return factory.newInstance()
  }
  
  override fun poll(waitTime: Long, waitUnit: TimeUnit): T {
    return pool.poll(waitTime,waitUnit)
  }
}


class EthereumWallet{
  /*
    something
  */
}

class EthereumWalletObjectFactory : ObjectFactory<EthereumWallet>{
  override fun newInstance(): EthereumWallet {
    return EthereumWallet()
  }
}


fun main() {
  val pool = FixedObjectPool(EthereumWalletObjectFactory(), 4)
  println(pool.size())
}

- LinkedBlockingQueue를 사용 할 수도 있지만... 
DisruptorBlockingQueue는 초고속 Concurrent 라이브러리를 이용한 것으로써 아래 링크를 참고한다.
https://github.com/conversant/disruptor

0 Comments
댓글쓰기 폼