관리 메뉴

HAMA 블로그

[ 하둡 인사이드 ] 2. Hadoop Streamming 본문

Hadoop

[ 하둡 인사이드 ] 2. Hadoop Streamming

[하마] 이승현 (wowlsh93@gmail.com) 2015. 5. 1. 17:32
순서


1) 하둡 RPC
2) 하둡 스트리밍
3) 하둡 & 보안
4) 하둡 HDFS 읽기
5) 하둡 HDFS 쓰기
6) 하둡 IO  (Writable / Avro)
7) 하둡 & 가용성  (Zookeeper) 
8) 하둡 쉘 스크립트 및 환경



하둡은 HDFS 라는 분산파일시스템과 맵리듀스라는 그것을 이용하여 계산을 하는 도구를 가지고있습니다.  (YARN 이전) 계산에는 간단한 배치성 작업이 주를 이루며 다양한 머신러닝 알고리즘 (머하웃 라이브러리) 을 실행할수도 있습니다.계산복잡도,알고리즘형태에 따라서  지라프,하마같은 다른 도구를 사용할수도 있으며 , 하둡 YARN 과 함께 다양한 빅데이터 솔루션들이 하모니를 이루고 있습니다. Storm-yarn 같은 도구를 사용하여 실시간 분석을 용이하게 할수도있으며, 메모리를 적극적으로 활용한 Spark 라는 제품도 각광을 받고 있습니다.



HDFS 라는 분산파일 시스템은 여러대의 컴퓨터에 파일을 분산하여 놓고 읽기/쓰기를 지원해주는 단순한 기능을 합니다.
NameNode 는 분산된 파일의 위치/크기등의 정보를 가지고있으며
DataNode 는 실제 데이터를 가지고있습니다.

하둡 클라이언트에서 파일을 가져오기 위해서는

0. KDC (케버로스센터) 에 네임노드 / 데이터노드 / 클라이언트는 각각 인증합니다.
1. 네임노드와 데이터노드간에 서로 비밀키를 교환합니다.
2. 클라이언트는 KDC 에 TGT 를 요청합니다.
3. 클라이언트는 KDC 에 네임노드 서비스 티켓을 요청합니다.
4. NameNode 에게 내가 원하는 파일의 위치 (블럭ID) 를 알려달라고 요청합니다. (티켓과 함께)
5. 1번에서 획득한 블럭ID  및 블록 접근 토큰을 가지고 해당 DataNode 에게 데이터를 달라고 요청합니다.

위에

5번에서 데이타를 가져올때는 하둡 스트리밍 (TCP/IP) 를 이용합니다.
나머지 0~3번은 보안과 관련된 이슈이므로 다음 연재때 자세히 다뤄보도록 하겠습니다.


하둡 스트리밍을  이해하기위해서는 자바IO/NIO 에 대한 몇가지 선행학습이 필요합니다. 
(하둡은 자바로 만들어짐. NIO2가 안정화되면 코어구현도 바뀌겠지요. 네티가 들어갈수도) 


1. 자바 IO /NIO ( http://javacan.tistory.com/entry/87 

* InputStream / OutputStream
* Selctor
* Channel
* ByteBuffer
* Socket


소켓 통신 일반  




전형적인 소켓 프로그래밍 인터렉션입니다. 서버는 대기하고, 클라이언트가 접속하면 서버는 받아드리고, 

받아드리면서 클라이언트와 통신하기위한 소켓하나 만들어서 쓰레드하나 만들어서

던진후에 그 쓰레드에서 클라이언트와 입/출력.  




자바 NIO 



  자바NIO (New I/O)는 비동기 입출력은 아닙니다.   

  쪼 위에 blocking 되있는걸 보실수있습니다. (타임아웃 가능) 

  Select 모델이라고 하지요. 소켓통신 모델에는 다양한 모델이 있으며, IOCP 모델이 빠르기로 유명함.

  개인적으로 자바 NIO/NIO2 프로그래밍이 C++ 을 이용한 IOCP 프로그래밍 보다는 애매하게 느꼈던것 

  같습니다. (추상화 하다보니 좀 희미하다고 해야하나? 그에 반해 IOCP 모델은 굉장히 클리어 하죠.)


하둡 스트리밍 


사실 하둡 스트리밍에 대한 글을 쓸까 말까 좀 고민을 했었습니다.  왜냐? 할게 없기때문에.. 

그냥 자바 소켓통신입니다. 따라서 자바소켓통신에 대한 소스레벨 분석 글을 보신다고 생각하면 편하실듯합니다. :-)  독자분이 소켓통신을 구현할때 가져다 쓰면 좋겠지요. 구태여 바퀴를 또 만들필요가 없지 않겠습니까?   (이런 의미에서 Netty 를 사용하는게 좋죠. Vert.x / Couchbase / OpenTSDB 등이 사용)




위의 그림은 클라이언트에서 데이타노드로 블록ID 을 보내어 데이터를 달라고 하는 상황에서

그와 관련된 클래스들의 모식도 입니다.

소스 조각을  하나씩 살펴보겠습니다.

DFSClient

 - 하둡 HDFS  읽기/쓰기 연재에서 설명 예정.

RemoteBlockReader

 - 하둡 HDFS  읽기/쓰기 연재에서 설명 예정.

NetUtils

public static SocketFactory getDefaultSocketFactory(Configuration conf) {

    String propValue = conf.get("hadoop.rpc.socket.factory.class.default");

    if ((propValue == null) || (propValue.length() == 0))

      return SocketFactory.getDefault();

    return getSocketFactoryFromProperty(conf, propValue);

  }

 하둡 환경설정에 따라 소켓팩토리를 선정합니다. getDefaultSocketFactory 가 

 기본.SocketFactory.getDefault(); 참고로 소켓팩토리를 통해서 SSLSocket 을 사용할수있습니다. 


 public static InputStream getInputStream(Socket socket, long timeout) 
                                           throws IOException {
    return (socket.getChannel() == null) ? 
          socket.getInputStream() : new SocketInputStream(socket, timeout);
  }
소켓의 속성에  채널이 있을때와 없을때 , 그러니깐 IO / NIO 에 따라서  리턴되는스트림객체가 다릅니다. 

public static void connect(Socket socket, 
                             SocketAddress endpoint,
                             SocketAddress localAddr,
                             int timeout) throws IOException {
  
...
  
    SocketChannel ch = socket.getChannel();
    
    if (localAddr != null) {
      socket.bind(localAddr);
    }

    if (ch == null) {
      socket.connect(endpoint, timeout);
    } else {
      SocketIOWithTimeout.connect(ch, endpoint, timeout);
    }
    ......
  }

 역시 소켓의 속성이 IO/ NIO 냐에 따라서 connection 방법이 달라집니다. IO 이면 일반 connect 를 맺고NIO 면 SocketIOWithTimeout 클래스를 사용하네요.


SocketInputStream 


생성자 


public SocketInputStream(Socket socket, long timeout) 

                                         throws IOException {

    this(socket.getChannel(), timeout);

  }


 public SocketInputStream(ReadableByteChannel channel, long timeout)

                                                        throws IOException {

    SocketIOWithTimeout.checkChannelValidity(channel);

    reader = new Reader(channel, timeout);

 }


내부에서 Reader 를 사용합니다. 


public int read(byte[] b, int off, int len) throws IOException {

    return read(ByteBuffer.wrap(b, off, len));

  }


 public int read(ByteBuffer dst) throws IOException {

    return reader.doIO(dst, SelectionKey.OP_READ);

  }


원하는 데이터의 양만큼을 ByteBuffer 클래스를 통해 할당하여 요청합니다.

  


SocketOutputStream


 public SocketOutputStream(WritableByteChannel channel, long timeout) 

                                                         throws IOException {

    SocketIOWithTimeout.checkChannelValidity(channel);

    writer = new Writer(channel, timeout);

  }


 public void write(byte[] b, int off, int len) throws IOException {

    ByteBuffer buf = ByteBuffer.wrap(b, off, len);

    while (buf.hasRemaining()) {

      try {

        if (write(buf) < 0) {

          throw new IOException("The stream is closed");

        }

      } catch (IOException e) {

        if (buf.capacity() > buf.remaining()) {

          writer.close();

        }

        throw e;

      }

    }

  }


ByteBuffer 로 쓸것을 저장한후에 남는게 없을동안 씁니다. write 를 호출한 쓰레드는 계속 기다려야합니다.
IOCP 라는 윈도우OS가 제공하는 비동기 입출력 통신 기술에서는 기다리지 않습니다. C++ 은 네이티브라 
당연히 사용가능하며, 자바도 NIO 는 지원안하지만 지원하는 라이브러리도 있는것으로 알고있습니다.


Reader 

: SocketIOWithTimeout 를 상속받은 클래스 

  int performIO(ByteBuffer buf) throws IOException {

      return channel.read(buf);

  }


 채널에서 버퍼의 크기만큼 읽는다. 

 


Writer 

: SocketIOWithTimeout 를 상속받은 클래스 


public int write(ByteBuffer src) throws IOException {

    return writer.doIO(src, SelectionKey.OP_WRITE);

  }

  

 int performIO(ByteBuffer buf) throws IOException {

      return channel.write(buf);

 }



SocketIOWithTimeout  (이게 핵심) 


private static SelectorPool selector = new SelectorPool();


int doIO(ByteBuffer buf, int ops) throws IOException {

 

    if (!buf.hasRemaining()) {

      throw new IllegalArgumentException("Buffer has no data left.");

    }


    while (buf.hasRemaining()) {

     int n = performIO(buf);

       if (n != 0) {

         return n;

        }

       selector.select(channel, ops, timeout);  

    }

    return 0; // does not reach here.

  }


Buffer 에 담을 수있는 데이터가 있는지 확인한후에 ,  -->  if (!buf.hasRemaining()) {

더이상 담을 공간이 없을때까지 -->  while (buf.hasRemaining()) 

입력/출력을 합니다. --> int n = performIO(buf);

입력/출력한게 있으면 리턴해줍니다. --> return n;

입력/출력한게 없으면 다시 입력/출력하기 위해 대기합니다. -->  selector.select(channel, ops, timeout);  


static void connect(SocketChannel channel, 

                      SocketAddress endpoint, int timeout) throws IOException {

    

    boolean blockingOn = channel.isBlocking();

    if (blockingOn) {

      channel.configureBlocking(false);

    }

    

    try { 

      if (channel.connect(endpoint)) {

        return;

      }


      while (true) {

        

        int ret = selector.select((SelectableChannel)channel, 

                                  SelectionKey.OP_CONNECT, timeoutLeft);

        

         ...

      }

    } catch (IOException e) {

   ...

  }


SelectorPool

 private static class SelectorInfo {
      Selector              selector;
      long                  lastActivityTime;
      LinkedList<SelectorInfo> queue; 
      
      void close() {
        if (selector != null) {
          try {
            selector.close();
          } catch (IOException e) {
            LOG.warn("Unexpected exception while closing selector : " +
                     StringUtils.stringifyException(e));
          }
        }
      }    
    }
    
    private static class ProviderInfo {
      SelectorProvider provider;
      LinkedList<SelectorInfo> queue; // lifo
      ProviderInfo next;
    }

int select(SelectableChannel channel, int ops, long timeout) 
                                                   throws IOException {
     
      SelectorInfo info = get(channel);
      
      SelectionKey key = null;
      int ret = 0;
      
      try {
        while (true) {
          long start = (timeout == 0) ? 0 : System.currentTimeMillis();

          key = channel.register(info.selector, ops);
          ret = info.selector.select(timeout);
          
         ...
        }
      } finally {
      ...
    }


  private synchronized SelectorInfo get(SelectableChannel channel) 

                                                         throws IOException {

      SelectorInfo selInfo = null;

      

      SelectorProvider provider = channel.provider();

      

      // pick the list : rarely there is more than one provider in use.

      ProviderInfo pList = providerList;

      while (pList != null && pList.provider != provider) {

        pList = pList.next;

      }      

      if (pList == null) {

        //LOG.info("Creating new ProviderInfo : " + provider.toString());

        pList = new ProviderInfo();

        pList.provider = provider;

        pList.queue = new LinkedList<SelectorInfo>();

        pList.next = providerList;

        providerList = pList;

      }

      

      LinkedList<SelectorInfo> queue = pList.queue;

      

      if (queue.isEmpty()) {

        Selector selector = provider.openSelector();

        selInfo = new SelectorInfo();

        selInfo.selector = selector;

        selInfo.queue = queue;

      } else {

        selInfo = queue.removeLast();

      }

      

      trimIdleSelectors(System.currentTimeMillis());

      return selInfo;

    }

    

   셀렉터를 하나만 만들면 , 여러 소켓들이 하나의 셀렉터를 통해서 , 신호를 받기때문에 효율적이지 못합니다.

   따라서 각 이벤트들마다 셀렉터를 만들면, 효율적이게 운영할수있는데, 셀렉터를 풀로 만들어서 

   셀렉터 생성에 대한 부담을 줄였습니다. 상황에 따라서 Read / Write / Acceptor 등을 별도의 셀렉터로 

   등록 시키면 좋습니다. 위의 셀렉터풀 말고도 NIO에서는 ByteBuffer  Pool을 만들어서 사용해도 좋습니다.


DataNode 


 - 하둡 HDFS  읽기/쓰기 연재에서 설명 예정.


DataXeiverServer


 - 하둡 HDFS  읽기/쓰기 연재에서 설명 예정.


DataXceiver


 - 하둡 HDFS  읽기/쓰기 연재에서 설명 예정.


헤즐케스트 소개


해즐케스트의 소켓 통신을 위한 클래스들 


Storm 소개


Storm 의 소켓 통신을 위한 클래스들 



.............. 작성중 ................

Comments