관리 메뉴

HAMA 블로그

[ 하둡 인사이드 ] 1. Hadoop RPC 본문

Hadoop

[ 하둡 인사이드 ] 1. Hadoop RPC

[하마] 이승현 (wowlsh93@gmail.com) 2015. 4. 27. 10:40


순서

1) 하둡 RPC
2) 하둡 스트리밍
3) 하둡 & 보안
4) 하둡 HDFS 읽기
5) 하둡 HDFS 쓰기
6) 하둡 IO  (Writable / Avro)
7) 하둡 & 가용성  (Zookeeper) 
8) 하둡 쉘 스크립트 및 환경


하둡은 HDFS 라는 분산파일시스템과 맵리듀스라는 그것을 이용하여 계산을 하는 도구를 가지고있습니다.  (YARN 이전) 계산에는 간단한 배치성 작업이 주를 이루며 다양한 머신러닝 알고리즘 (머하웃 라이브러리) 을 실행할수도 있습니다.계산복잡도,알고리즘형태에 따라서  지라프,하마같은 다른 도구를 사용할수도 있으며 , 하둡 YARN 과 함께 다양한 빅데이터 솔루션들이 하모니를 이루고 있습니다. Storm-yarn 같은 도구를 사용하여 실시간 분석을 용이하게 할수도있으며, 메모리를 적극적으로 활용한 Spark 라는 제품도 각광을 받고 있습니다.



HDFS 라는 분산파일 시스템은 여러대의 컴퓨터에 파일을 분산하여 놓고 읽기/쓰기를 지원해주는 단순한 기능을 합니다.
NameNode 는 분산된 파일의 위치/크기등의 정보를 가지고있으며
DataNode 는 실제 데이터를 가지고있습니다.

하둡 클라이언트에서 파일을 가져오기 위해서는

0. KDC (케버로스센터) 에 네임노드 / 데이터노드 / 클라이언트는 각각 인증합니다.
1. 네임노드와 데이터노드간에 서로 비밀키를 교환합니다.
2. 클라이언트는 KDC 에 TGT 를 요청합니다.
3. 클라이언트는 KDC 에 네임노드 서비스 티켓을 요청합니다.
4. NameNode 에게 내가 원하는 파일의 위치 (블럭ID) 를 알려달라고 요청합니다. (티켓과 함께)
5. 1번에서 획득한 블럭ID  및 블록 접근 토큰을 가지고 해당 DataNode 에게 데이터를 달라고 요청합니다.

위에

4번에서 NameNode 의 특정 함수를 호출하여 원하는바를 이루는데. 그때 하둡RPC 가 사용됩니다.
5번에서 데이타를 가져올때는 하둡 스트리밍 (TCP/IP) 를 이용합니다.
나머지 0~3번은 보안과 관련된 이슈이므로 다음 연재때 자세히 다뤄보도록 하겠습니다.


하둡 RPC 를 이해하기위해서는 자바기술에 대한 몇가지 선행학습이 필요합니다. (하둡은 자바로 만들어짐) 


1. 자바 리플렉션 ( http://socurites.com/60 )

public class TestServer implements RPCProtocol {

Class  cls  = Class.forName(“RPCProtocol“);

Class paramTypes[] = new Class[2];
pramTypes[0] = String.TYPE;
pramTypes[2] = Integer.TYPE;

Method meth = cls.getMethod(“heartbeat”, paramTypes);

Object arglist[] = new Object[2];
arglist[0] = new String(“hello world”)
arglist[1] = new Integer(2);


RPCProtocol rp  =  new  TestServer ();

Object retObj = meth.invoke(rp, arglist);

Integer retval = (Integer) retObj;

중요포인트 ( 필요사항 ) :    실제 객체 /  클래스타입 /  메소드이름 /  인자타입 / 실제 인자 /    

Hadoop  에서 각각에 대한 변수명 

                     instance (실제객체) 
                     protocol   (클래스타입) 
                     Invoocation  ( 메소드이름, 인자클래스, 실제 인자값)


2. 자바 Dynamic Proxy (http://javacan.tistory.com/entry/115 

public class MyDebugInvocationHandler   implements java.lang.reflect.InvocationHandler {    
        private Object target = null;
          public void setTarget(Object target_) {
            this.target = target_;
        }
        
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                System.out.println("Going to execute method : " + method.getName);
                Object retObject = method.invoke(target, args);
                System.out.println("After execute method : " + method.getName());
                return retObject;
        }
    }


-----------


 IMyBusinessObject bo = new MyBusinessObject(); // 실제 비즈니스 객체
  MyDebugInvocationHandler aMyDebugInvocationHandler = new MyDebugInvocationHandler();
  aMyDebugInvocationHandler.setTarget(bo);
    
 IMyBusinessObject proxyObject =    (IMyBusinessObject) Proxy.newProxyInstance  // 프록시 객체 생성
          (IMyBusinessObject.class.getClassLoader(),
          new Class[] { IMyBusinessObject.class }, // 프록시할 인터페이스
          aMyDebugInvocationHandler); // InvocationHandler
    
    // 프록시 객체 사용
    System.out.println(proxyObject.doExecute("Hello World"));


3. 자바 IO /NIO ( http://javacan.tistory.com/entry/87 

* InputStream / OutputStream
* Selctor
* Channel
* ByteBuffer
* Socket


이제 하둡 RPC 코드를 보도록 합시다.

먼저 그림을 통해 NIO 서버 흐름도를 살펴보도록 하겠습니다.




* Listener 를 통해서 대기하고있다가 Reader 쓰레드중 하나가 응답을 받아서 큐에 메세지를 넣음.
* Handler 에서 실제 함수를 리플렉션을 이용해 호출해주고 리턴값을 받아서 Response 함.


다음은 패킷의 구성을 살펴보겠습니다.


*  헤더를 통해서 인가 과정을 거치고 Invocation 객체가 직렬화되어 전달되어 집니다.


클라이언트 와 서버간의 인터렉션을 나열해 보면

클라이언트

1. 호출할 함수를 선언한 NameNode (ClientProtocol)  인터페이스로  다이나믹 프록시를 생성
2. 생성된 프록시로 함수호출 
3. 내부에서 클라이언트 객체 하나를 풀에서 꺼내어 서버와 connection 을 맺음
4. 서버로 인터페이스 타입, 메소드 이름/타입, 메소드인자타입/ 값등 을 보내줌
5. 응답이 올때까지 대기
6. 응답이 오면 connection 을 끊고 리턴값을 완전한 writable 객체로 리턴함.  
7. writable 객체에서 값을 꺼내어 리턴.  끝.


서버 

1. 실제 구현이 있는 함수를 가진 인터페이스를 구현한 NameNode 객체에서 서버객체 생성
2. 서버객체는 Listenning 을 하며 대기.
3. 클라이이언트과 connection  을 맺고 패킷을 받은후 Read 객체는 그것을 큐에 삽
4. Handler 객체는 큐에서 패킷을 꺼내어 자바리플렉션기술을 이용하여 호출할 함수와 
    자신이 가지고있는  NameNode 객체를 이용하여 NameNode 의 함수를 호출함.
5. 함수결과값을 Response 객체를 통해서 클라이언트에 전송  



이제  DFSClient   (클라이언트) ------ RPC ------>  NameNode (서버)  의 코드를 짧게 살펴보겠습니다.


DFSClient (클라이언트 측) 

DFSClient

(ClientProtocol)RPC.getProxy(ClientProtocol.class,
        ClientProtocol.versionID, nameNodeAddr, ugi, conf,
        NetUtils.getSocketFactory(conf, ClientProtocol.class), 0,..)

 this.namenode = createNamenode(this.rpcNamenode, conf);
 
 namenode.getBlockLocations(src, start, length);

DFSClient 클래스에서 원격에 ClientProtocol  인터페이스를 구현한 객체가 가지고있는 함수 (getBlockLocations) 
하나를  호출하기위해  Proxy 를 만듭니다. 저 Proxy 는 원격함수를 호출하는 공통코드를 가지고있습니다. 
보통 Proxy or Decorator  는 웹개발에서 트랜잭션설정, 로그삽입, 필터기능등을 위하여 사용됩니다. 

RPC

1)
   final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
        rpcTimeout, connectionRetryPolicy);
    VersionedProtocol proxy = (VersionedProtocol)Proxy.newProxyInstance(
        protocol.getClassLoader(), new Class[]{protocol}, invoker);

2)

  private static class Invoker implements InvocationHandler {
    private Client.ConnectionId remoteId;
    private Client client;
   
    private Invoker(Class<? extends VersionedProtocol> protocol,
                     InetSocketAddress address, 
                     UserGroupInformation ticket,
                     Configuration conf, 
                     SocketFactory factory,
                     int rpcTimeout, 
                     RetryPolicy connectionRetryPolicy) throws IOException {
   
       this.remoteId = Client.ConnectionId.getConnectionId(address, protocol,
          ticket, rpcTimeout, connectionRetryPolicy, conf);
       this.client = CLIENTS.getClient(conf, factory);
    }

    public Object invoke(Object proxy, Method method, Object[] args)
      throws Throwable {
    
      ObjectWritable value = (ObjectWritable)
        client.call(new Invocation(method, args), remoteId);
     
      return value.get();
    }

1) 번은  Dynamic proxy 를 만드는 코드입니다.  

2) 번은 InvocationHandler 구현클래스의 내용입니다. 클라이언트객체풀에서 하나 가져와서 
    클래스 타입, 보안관련, 메소드이름, 인자, 인자타입등을 Invocation 클래스에 담아서 서버로 보내줍니다. 


Client


public Writable call(Writable param, ConnectionId remoteId)  
                       throws InterruptedException, IOException {
    Call call = new Call(param);
    Connection connection = getConnection(remoteId, call);
    connection.sendParam(call);                 // send the parameter

 하둡에서는 자바의 직렬화 및 RPC 를 사용하지 않습니다. 범용적으로 구현되있기때문에 성능에
 문제를 가져오기때문입니다.  따라서 Writble 이라는 인터페이스를 통해 아주 간단히 직렬화를 합니다. 
 Invocation 클래스는 Writble 을 상속받았음을 알수있습니다. 




NameNode (서버측) 

NameNode 

public class NameNode implements ClientProtocol, DatanodeProtocol,

   this.server = RPC.getServer(this, socAddr.getHostName(),
        socAddr.getPort(), handlerCount, false, conf, namesystem
        .getDelegationTokenSecretManager());

실제 분산노드에 저장되어있는 파일을 관리하고있는 클래스입니다.  ClientProtocol 를 구현했다는것을 알수있습니다.
클라이언트측에서 ClientProtocol  를 이용하여 Proxy 를 만든것을 보았을것입니다.  
NameNode 객체는 RPC 서버를 만드는데 포인트는 첫번째 인자로 자신을 넣어준다는것입니다. 
자바리플렉션에서 함수호출할경우 첫번째 인자로 호출하려면 함수를 가지고있는 객체를 받기때문입니다. 나머지는 소켓서버를 생성하기위한 포트 및 환경설정등입니다) 


RPC

public static Server getServer(...){    
      return new Server(instance, conf, bindAddress, port, numHandlers, verbose, secretManager);
  }

 public static class Server extends org.apache.hadoop.ipc.Server {

  public Writable call(Class<?> protocol, Writable param, long receivedTime) 
        Invocation call = (Invocation)param;
        if (verbose) log("Call: " + call);

        Method method =
          protocol.getMethod(call.getMethodName(),
                                   call.getParameterClasses());
        method.setAccessible(true);

        long startTime = System.currentTimeMillis();
        Object value = method.invoke(instance, call.getParameters());

추상클래스 Server 를 상속받아서 자신의 Server 클래스를 만듭니다. call 메소드를 오버라이드 한것을 볼수있습니다.
Server 에서  클라이언트 측에서 보낸 Invocation 객체를 해석하고 호출해주는 코드가 담겨져있습니다.

Server
 Server
   listener = new Listener();
  
 Listener 
      void doAccept(SelectionKey key) 

  Reader 
    void doRead(SelectionKey key) 

  Connection 
         public int readAndProcess() 
         private void processData(byte[] buf) 
   
  Call 
  call = new Call(id, param, this); 
  callQueue.put(call); // queue the call; 

  Handler 
    final Call call = callQueue.take(); // pop the queue; 
    value = call(call.connection.protocol, call.param, call.timestamp); 

 Response
서버 클래스입니다.

일반적인 서버측 행위를 수행합니다. 클라이언트를 기다리고 , 받아드리고 , 보낸 데이터를 읽어서 처리합니다. 
processData 함수에서 큐에 넣고 핸들러에서 pop 해서 call 합니다. call 에서는 리플렉션기능

 Invocation call = (Invocation)param;
        
 Method method = protocol.getMethod(call.getMethodName(),call.getParameterClasses());
      
 Object value = method.invoke(instance, call.getParameters());
으로  함수를 호출해주고
리턴객체를 클라이언트로 보내줍니다. 리턴객체또한 Writble 로 직렬화 됩니다.


이상 마치고  다음에는 하둡 스트리밍에 대해서 살펴보겠습니다.
(시간 날때마다 코드조각들을 더 넣고 설명도 더 추가하겠습니다. 조금 짧은감이있군요 -.-a) 



관련 클래스 모음 



Comments