관리 메뉴

HAMA 블로그

[하마 인사이드] 1. HAMA with K-Means 본문

HAMA

[하마 인사이드] 1. HAMA with K-Means

[하마] 이승현 (wowlsh93@gmail.com) 2015. 5. 5. 23:55


순서

1)    HAMA with K-Means

2)    HAMA  BSP Inside  &  Zookeeper


하마(Apache Hama) 는 빅데이터 분석(머신러닝) 을 위한 솔루션을 제공합니다. 
흔히 알고있는 하둡(Hadoop) 의 맵리듀스 또한 빅데이터 분석 하는데 사용되는데 맵리듀스의 특성상
속도 및 구현에 있어서 굉장히 비효율적인 모습을 보여주는 부분이 있는데, 그런 가려운 부분을 화끈하게
해결
해줍니다.  

하마는 기본적으로  데이터 저장은  하둡 HDFS 를  사용하며  Zookeeper 를 이용하여 분산된 락을 구현하며  
BSP / 그래프 구조를 통하여 각종 알고리즘을 구현할수있게 합니다. 

- 하둡의 맵리듀스 구조 ( 데이타 - 매핑 - 리듀싱 - 결과 저장)

맵 노드간에 혹은 리듀스 노드간에 정보 교환하지 않습니다. 오로지 맵 -> 리듀스 

 - 하마 BSP 구조 ( 데이터 - 로컬 계산 - 노드 상호간 통신 -  블럭 ) 

위의 그림에 보다시피,하둡은 Iterative 작업을 수행하기에 좋지 않습니다. 반면 하마는 그것에 최적화 되어있습니

                        - K-meas 알고리즘 적용시 머하웃( 하둡 맵리듀스 기반) 과의 속도 차이입니다.

위의 표에 보다시피 머신러닝을 할때 ,  엄청난 속도차이를 볼수있습니다. 하둡 맵리듀스 (머하웃) 와는 비교가 안되게 빠릅니다. 자신이 활용하고자 하는 목표 ( 현재 에코 시스템, 계산복잡도로 인한 속도, 러닝커브, 라이브러리 활용성) 에 따라서 선택해야함.  요즘은 딥러닝이 뜨고 있는데  딥러닝을 위해서도 충분히 활용가능하며 BSP + GPGPU 를 적극활용하면 실시간성에 더욱 가까히 갈수있겠습니다. 


K-Means 알고리즘 이란 ? 

머신러닝의 큰 바운더리로  1. 추천시스템  2. 분류  3. 군집을 들수있는데 이 알고리즘은 군집분석을 할때  활용되는 기본 알고리즘 입니다. 군집이란 , 쉽게 말해 도서관에서 여러분이 어떤 기준에 따라서 책을 그룹핑할때 하는 행동 . 그때 책을 군집화 한다라고 말할수있습니다. 군집할때는 어떤 데이터간의 유사성을 정량적으로 측정하여 수치화하는게 핵심이며, 수치화가 된후 K-Means 알고리즘으로 자동 그루핑 할수있습니다. K-Means 는 말 그래도 K 개로 그루핑하라~ 와 같은데 각 군집의 평균데이터를 구해주며 평균데이터와 비교하여 , 개개의 요소들을 특성을 규정할수있습니다.  굉장히 클리어한 알고리즘이며 간단한데에 비해 많은곳에서 주요 알고리즘으로 사용됩니다.

수행과정을  보면 

1) 임의의 K개의 군집수를 결정하고, 각 군집에 초기치 또는 군집 중심을 1개씩 할당하여 위치 설정한다.
2) 각각의 데이터에 대해 K개의 위치까지의 거리를 구하고 가장 가까운 군집에 소속시킨다.(유클리드 거리를 이용)
3) 군집으로 나뉘어진 데이터를 기준으로 새로운 군집 중앙의 위치를 최소가 되도록 재설정한다.
4) 새롭게 구한 군집 중앙의 위치가 기존과 동일하면 알고리즘 종료하고 다르면 두 번째부터 재수행한다.

이 과정을 통하여 K개의 군집으로 데이터를 구분하고 K값에 따라 clustering에 많은 영향을 받는다


[출처] K-means 알고리즘|작성자 푸키

Apache Hama 로 구현한 K-Means  

하마를 통한 K-Means 알고리즘이 하나의 컴퓨터에서 알고리즘을 구현한것과 다른 점은 오직 여러대의 컴퓨터간에 통신하면서 수행된다는 점입니다. 스텝마다 각자의 컴퓨터에서 계산을 수행한후에 다른 컴퓨터로 결과를 서로 넘겨서 재 조정되면서 알고리즘이 진행됩니다.

아래는 X,Y 좌표에서 가까운것 끼리 군집을 만들기위한 소스와 설명입니다.
먼저 소스 부터 보고 
한단계식 이미지를 통해 확인해가며 학습해보겠습니다.

1. Mean 포인트 ( 그룹의 중간 포인트, 그룹별 하나씩 있음) 

2. 일반 포인트  ( 모든  그룹으로 나누어질 값들  예) 책종류, 영화 장르구분, 다음 아고라 댓글  정치성 분류 )

아래 코드는 하마가 설치된 모든 컴퓨터에서 실행된다.

public void bsp(final BSPPeerProtocol bspPeer) throws IOException,KeeperException, InterruptedException {

	if (isMaster(bspPeer)) {    
                //  1)  초기화 ( 랜덤포인트들 생성 ,  Mean 포인트 생성) , 마스터노드에서만 실행
		masterInitialize(bspPeer); 

	}

	while (true) {    //  K-Means 알고리즘 시작 	

		bspPeer.sync();    // 2) 일단 모든 컴퓨터가 여기까지 실행될때까지 모두 대기!!!! 	
		boolean converged = processMessages(bspPeer);  // 3)다른노드로부터 온 데이터 업데이트 					

		if (converged) {    // 4) 원하는 만큼 중간값이 정해졌으면 해당 노드 (군집) 종료!!
			break;
		}	

		assignmentStep(bspPeer);  // 5) 포인트들을 Mean 포인트에 재 할당	
		// 6)  Mean 값 과 재할당된 포인트들 사이의  유클리드 거리 평균에 따라 위치 조정 
               updateStep(bspPeer);

	} 			

	double wcss = wcss(bspPeer);
	LOG.info("My WCSS is " + wcss);
	writeFinalOutput(bspPeer);	// 7) 결정된 Mean 값을 출력함 ( 하둡 HDFS 나 Hbase 에 )

}


private boolean processMessages(BSPPeerProtocol bspPeer) throws IOException {
			
	boolean converged = true;			
	BSPMessage msg;

	while ((msg = bspPeer.getCurrentMessage()) != null) {
		if (isPointMessage(msg)) {    
			addPoints(msg);    // 8) 다른 노드로부터 온 메세지가 포인트이면 삽입
		} else if (isMeanMessage(bspPeer, msg)) {
			converged = updateMeanMap(msg) && converged; // 9) 다른 노드로부터 온 메세지가 Mean 포인트일때
		} else {
			throw new RuntimeException("Unknown msg tag: " + new String(msg.getTag()));
		}
	}
			
	LOG.info("New Mean Map = " + peerMeanMap);
	return converged;
}



// 10) Mean 포인트들과 포인트들을 계산하여 , 각각의 Mean 포인트에서 가까운 포인트들끼리 그룹핑한다.
private int assignmentStep(final BSPPeerProtocol bspPeer) throws IOException {

	final Map<String, List<Point3D>> peerNewPoints = new HashMap<String, List<Point3D>>		

	for (String peer : peerMeanMap.keySet()) {
		peerNewPoints.put(peer, new ArrayList<Point3D>());
	}

	int changeCount = 0;			

	for (Iterator<Point3D> pointItr = points.iterator(); pointItr.hasNext();) {

		final Point3D obs = pointItr.next();	
		double min = Double.MAX_VALUE;
		String minPeer = null;		
		for (Map.Entry<String, Point3D> peer : peerMeanMap.entrySet()) {

			double distance = obs.distance(peer.getValue());
			if (distance < min) {
				min = distance;
				minPeer = peer.getKey();
			}
		}
		
		if (minPeer.equals(bspPeer.getPeerName())) {
			//I don't send updates for points I already own
			continue;

		}		

		//Remove the point from my collection as I no longer own it.
		pointItr.remove();
		changeCount += 1;
		peerNewPoints.get(minPeer).add(obs);	

	}

	//Notify other clusters of new points	
	for (Map.Entry<String, List<Point3D>> peerPoints : peerNewPoints.entrySet()) {

		if (peerPoints.getValue().size() == 0) {
			continue;
		}	

		LOG.info("Send " + peerPoints.getValue().size() + " to " + peerPoints.getKey());
		bspPeer.send(peerPoints.getKey(), 
		pointToByteMessage(new PointMessage(POINT_MSG_TAG, peerPoints.getValue())));
	}
	return changeCount;

}


// 10) 새로 그룹핑된 포인트들의 중간값을 계산하여 새로운 Mean 포인트를 계산한후에 브로드캐스트.
private void updateStep(BSPPeerProtocol bspPeer) throws IOException {			

		LOG.info("My point count is now: " + points.size());

		if (0 == points.size()) {
			//Catch initial case where we have no points, and thus can't change our mean.
			return;
		}
		broadcastMyMean(bspPeer, calculateCenter(points));
	}

private void broadcastMyMean(BSPPeerProtocol bspPeer, Point3D mean) throws IOException {

	final BSPMessage msg = pointToByteMessage(new PointMessage(bspPeer.getPeerName(), mean));

	for (String peer : bspPeer.getAllPeerNames()) {
		bspPeer.send(peer, msg);    // 11) 각각의 노드로 재 군집된 포인트들을 보내줍니다.
	}

}

private Point3D calculateCenter(List<Point3D> points) {  // 12) 포인트들의 센터를 다시 정합니다.
	double x = 0;
	double y = 0;
	double z = 0;
	
	for (Point3D p : points) {
		x += p.x / points.size();
		y += p.y / points.size();
		z += p.z / points.size();

	}

	return new Point3D(x, y, z);
}

자 다음은 위의 소스에서 일어나는 일을 이해하기 쉽게 그림으로 그려보았습니다.  감에 의지하여 그린것이라 거리차등이 정확하지 않습니다. :-)   시작해보지요.  

아래 그림은 소스에서 1) 번에 해당합니다. 
빨강색  / 연두색 / 노란색  포인트들이 초기 Mean (군집) 포인트 입니다. 3개의 초기점이 점점 포인트들 사이에 3개의 중간지점으로 이동될 것입니다. ( 당연히 회색 포인트들은 이동하지 않음) 


그림에서 0~5번까지의 순서에 의해 알고리즘이 진행됩니다.

소스에서 5) 6) 번에 해당합니다.  서버1에서 계산을 한후에  서버2 / 서버 3 으로  각 서버(군집) 에 해당하는 포인트들을 보내주며 

Mean 포인트들은 모든 노드가 공유합니다. (각 노드는 노드별로 자신의 포인트 들을 가지고 있게 됩니다. 즉 같은 군집들끼리 ) 

- 빨강색 / 연두색 / 노란색 점들 간의 그룹들이 설정되었습니다. (점선테두리) 

- 빨강색 그룹의 포인트들은 서버2로 이동될것이고, 노란색 그룹의 포인트들은 서버3으로 이동됩니다.

- 연두색 Mean 포인트는 위치가 재조정되며  서버2,서버3 에 조정된 위치를 알려줍니다.



자 한스텝이 진행되었습니다. 모든 노드는 각각의 노드들이 한 스텝을 완성할때까지 블럭됩니다.
모두 진행된후에는 모두 블럭을 풀고  받은 데이터로 자신의 Mean 값과 포인트들을 업데이트 합니다.

-  이동되어진 포인트들의 모습입니다.


다시 반복합니다.

- 빨강색 / 연두색 / 노란색 점들 간의 그룹들이 설정되었습니다. (점선테두리) 

- 빨강색 / 노란색 Mean 포인트들이 자신의 그룹 포인트들읜 중간값으로 이동합니다.

- 이동된 Mean 포인트들의 위치를  서로 공유합니다.


- 빨강색 과 노란색의 Mean포인트 위치가 그룹들의 중간으로 이동하였습니다.


- 빨강색 / 노란색의 Mean 포인트의 위치가 이동하고 나니,  어떤 포인트들은 연두색에 더 가까워 졌습니다.

- 연두색에 더 가까워진 포인트들은 서버1으로 보내줍니다. 


- 연두색으로 넘어간 포인트의 모습입니다.


자 먼가 그룹이 이루어져있지요? 

이렇게  블럭 - 재 그룹핑 - Mean 포인트 위치조정 -  각각의 노드로 브로드캐스트 을 반복해서 적당하게 그룹핑되면 종료합니다.

하둡 맵리듀스 기반의  K-means 알고리즘을 구현해놓은 머하웃 라이브러리를 잠깐 공부해보았는데, 머신러닝은 이런 알고리즘 구현이 어려운게 아니라,  머신러닝 학습을 시키기위해 데이터를 가공하거나,  어떤것이 군집되어야하는지 결정하는데 더 어려운거 같았습니다.  이상 K-Means 알고리즘을 살펴보았으며  저 또한  빅데이터/머신러닝에 대한 전문가가 아니기때문에  부족한 부분이 많았을거라고 생각됩니다만 이 글을 통해서 머신러닝에 대한 흥미를 일깨우는데 일조했다면 그것으로 소기의 목적을 이루었다고 생각합니다.  


https://github.com/willmore/hama-kmeans   ( 위에 예제에 사용된 소스) 

http://en.wikipedia.org/wiki/K-means_clustering

http://www.slideshare.net/udanax/k-means-29238792

https://hama.apache.org/

'HAMA' 카테고리의 다른 글

HAMA 쉘 분석  (0) 2015.09.30
HAMA 시작하기  (0) 2015.09.30
Comments