[코틀린 코딩 습작] Object Pool
소프트웨어 엔지니어링에서 풀의 종류는 다양한데요.
쓰레드풀,메모리풀,캐쉬풀,커넥션풀,객체풀 등등이 있습니다. "풀"어서 말하면 미리 만들어두고 돌려막기로 사용하자 라고 볼 수 있는데요. 미리 만들어 두는 방식 / 쓰레드가 태스크를 처리하는 방식/ 동기,비동기에 따라서 다양한 풀의 구현체들이 있을 수 있습니다. 이 글에서는 Kotlin으로 객체풀을 만드는 간단한 예제를 보여 줍니다.
1. 리스트를 이용한 고정크기 동기 객체풀
import java.lang.IllegalStateException
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
interface ObjectPool<T> {
fun take(): T
fun release(obj: T)
fun poll(waitTime: Long, waitUnit: TimeUnit): T
fun newInstance(): T
fun <R> use(function: (T) -> R): R {
take().apply {
return try {
function(this)
} finally {
release(this)
}
}
}
fun <R> useAsync(function: (T) -> CompletableFuture<R>): CompletableFuture<R> {
return CompletableFuture.supplyAsync{
take()
}.thenCompose {
try{
function(it)
}
finally {
release(it)
}
}
}
}
interface ObjectFactory<T> {
fun newInstance(): T
}
class FixedObjectPool<T>(factory: ObjectFactory<T>, size: Int, val waitSize: Int = 0 ): ObjectPool<T>{
val factory : ObjectFactory<T>
val lock = ReentrantLock()
val condition = lock.newCondition()
val pool = mutableListOf<T>()
var pause = false
init {
this.factory = factory
for (i in 1 .. size) {
pool.add(newInstance())
}
}
fun size(): Int {
return pool.size
}
override fun take(): T {
lock.withLock {
while (pool.isEmpty()){
condition.await()
}
val obj = pool.first()
pool.removeFirst()
return obj
}
}
override fun poll(waitTime: Long, waitUnit: TimeUnit): T {
lock.withLock {
while (pool.isEmpty()){
if (!condition.await(waitTime, waitUnit)) throw IllegalStateException("fail to get object")
}
val obj = pool.first()
pool.removeFirst()
return obj
}
}
override fun release(obj: T) {
lock.withLock {
if (pool.isEmpty()) {
pause = true;
}
pool.add(obj)
if (pause && pool.size > waitSize) {
condition.signalAll()
pause = false;
}
}
}
override fun newInstance(): T {
return factory.newInstance()
}
}
class EthereumWallet{
/*
something
*/
}
class EthereumWalletObjectFactory : ObjectFactory<EthereumWallet>{
override fun newInstance(): EthereumWallet {
return EthereumWallet()
}
}
fun main() {
println("object pool start")
val pool = FixedObjectPool(EthereumWalletObjectFactory(), 4)
println(pool.size())
val obj1 = pool.take()
println(obj1)
val obj2 = pool.take()
println(obj2)
val obj3 = pool.take()
println(obj3)
val result = pool.use { it -> it.print() }
println("use")
println(result)
val result2 = pool.useAsync { it -> CompletableFuture.supplyAsync{it.print()} }
println("useAsync")
println(result2.get())
val obj5 = runCatching {
pool.poll(1, TimeUnit.MILLISECONDS)
}.onFailure {
println("failed")
}
println("object pool end")
}
- 고정된 숫자를 가진 객체풀이다.
- mutable list를 통해 객체가 관리되며, 쓰레드안전을 위해서 lock 과 condition을 직접 구현하였다.
- use를 이용하여 take와 release를 신경 안 쓸수도 있다.
- 눈여겨 봐야 할 부분은 2개가 있는데 첫째로 take의 while 문인데, 여러개의 쓰레드가 동시에 signal을 받을때, 뒤늦게 깨어난 쓰레드가 풀의 갯수를 확인해야 문제가 생기지 않는다.
- 두번째로는 waitSize가 있는데, 이는 빈번한 wait/signal을 사용하지 않기 위해, 어느정도 일정 갯수의 객체가 풀안에 들어 왔을때만 동작하게 함으로써 성능을 개선 시킬 수 있다.
2. DisruptorBlockingQueue 를 이용한 단순한 객체풀
import com.conversantmedia.util.concurrent.DisruptorBlockingQueue
import java.util.concurrent.TimeUnit
interface ObjectPool<T> {
fun take(): T
fun release(obj: T)
fun poll(waitTime: Long, waitUnit: TimeUnit): T
fun newInstance(): T
}
interface ObjectFactory<T> {
fun newInstance(): T
}
class FixedObjectPool<T>(factory: ObjectFactory<T>, size: Int): ObjectPool<T>{
private val pool: DisruptorBlockingQueue<T>
private val factory: ObjectFactory<T>
init {
this.factory = factory
this.pool = DisruptorBlockingQueue<T>(size).apply {
for (i in (1 .. size)) {
add(newInstance())
}
}
}
fun size(): Int {
return pool.size
}
override fun take(): T {
return pool.take()
}
override fun release(obj: T) {
pool.add(obj)
}
override fun newInstance(): T {
return factory.newInstance()
}
override fun poll(waitTime: Long, waitUnit: TimeUnit): T {
return pool.poll(waitTime,waitUnit)
}
}
class EthereumWallet{
/*
something
*/
}
class EthereumWalletObjectFactory : ObjectFactory<EthereumWallet>{
override fun newInstance(): EthereumWallet {
return EthereumWallet()
}
}
fun main() {
val pool = FixedObjectPool(EthereumWalletObjectFactory(), 4)
println(pool.size())
}
- LinkedBlockingQueue를 사용 할 수도 있지만...
DisruptorBlockingQueue는 초고속 Concurrent 라이브러리를 이용한 것으로써 아래 링크를 참고한다.
https://github.com/conversant/disruptor