순서

1) 하둡 RPC
2) 하둡 스트리밍
3) 하둡 & 보안
4) 하둡 HDFS 읽기
5) 하둡 HDFS 쓰기
6) 하둡 IO  (Writable / Avro)
7) 하둡 & 가용성  (Zookeeper) 
8) 하둡 쉘 스크립트 및 환경


하둡은 HDFS 라는 분산파일시스템과 맵리듀스라는 그것을 이용하여 계산을 하는 도구를 가지고있습니다.  (YARN 이전) 계산에는 간단한 배치성 작업이 주를 이루며 다양한 머신러닝 알고리즘 (머하웃 라이브러리) 을 실행할수도 있습니다.계산복잡도,알고리즘형태에 따라서  지라프,하마같은 다른 도구를 사용할수도 있으며 , 하둡 YARN 과 함께 다양한 빅데이터 솔루션들이 하모니를 이루고 있습니다. 
Storm-yarn 같은 도구를 사용하여 실시간 분석을 용이하게 할수도있으며, 메모리를 적극적으로 활용한 Spark 라는 제품도 각광을 받고 있습니다.



이전에 하둡 RPC 를 살펴보았고 , 3번째 순서인 보안으로 넘어갑니다. 저도 보안에 대해서 특별히 잘 아는게 아니기때문에 간단히 소개정도의 글이 될거 같습니다. 그리고  하둡  HDFS 의 보안에 한정되며 
하둡 보안관련 이슈중에 일부분만 소개해 드립니다. (다른 에코 시스템들과의 연계같은거 제외) 
하둡 보안을 살펴보기 전에  먼저 일반적인 보안기술 및 자바 보안관련 라이브러리에 대해서 살펴볼 필요가 있습니다. ( 보다 정확한 정보를 위해서는 반드시 스펙문서등을 참고하십시요. 버전마다 차이가 있습니다. ) 

아래 몇가지 개념을 소개하고 있는데  편하게 쭈욱 읽어내려가시면 될거 같습니다.

-  인증 (Authentication)

   접근하려는 사람이 누구인지 확인하는 행위 ( 예: A 는.비밀번호를 확인하여  회원인것을 인증함
   커버로스는 인증부분을 책임집니다. ( 오직 인증만, 접근허가 (ACL) 정책은 하둡내에서 구현해야합니다.)

-  인가 (Authorization)

   접근하려는 사람이 어떤 권한을 가지고 있는지 확인하는 행위 ( 예:A 는  /data1  에 접근권한이 있다)  

-  암호화  

암호화 :  어떤 문자열 "hello world"  를  "332XF3FX_*)3" 로 알아보지 못하게 바꾸는것.

복호화 :   "332XF3FX_*)3"  을 다시  "hello world" 로 환원하는것.


-  해쉬함수


"E1FW4OMXF#%FWE()&E32RFEF2F" 이렇게 긴 문장을 

"F3F34"  요렇게 짧게 바꾸는 작업을 말한다. 

해쉬된 문장을 원래 문장으로 바꾸는건 불가능하다.

해쉬를 왜 (why) 하냐면 , 짧으면 무엇이 좋을까?  


1. 암호화, 복호화 할때 시간이 단축된다.

2. 원래 문장을 전송에 사용할 필요가 없어진다. 따라서 비밀번호같은것의 노출을 피할수있다.

   예를들어 비밀번호를 해쉬함수를 통해서 걸러진 문장을 DB 에 저장해두면,  진짜 비밀번호를 

   알수는 없지만, 동일한 비밀번호로부터 나온 결과라는것은 알수있게된다.


MD5 / SHA1 같은것을 사용한다.



-  대칭키 기반 암호화 
  1)  A 와 B 가 있습니다.  A 를 사람 혹은  클라이언트 / B 는  사람 혹은 서버라고 상상해 봅니다.
  2)  A 와 B 는 사이좋게 동일한 "대칭키" 라는 열쇠를 가지고있습니다. 
  3)  A 는 어떤 문서를 대칭키로 암호화 하여 B 에게 보내줍니다.
  4)  B 는 문서를 받아서 자신도 가지고있는 "대칭키" 로 복호화 하여 무슨 말이 써있나 읽어봅니다. 
  
  특징 :  동일한 키를 가지고  암호화/복호화를 한다. 
  단점 :  B 가  "대칭키" 를 가지고있지 않을때 A 가 B 에게 대칭키를 보내주다가, 중간에 탈취당하면...?
  기술 :  암호화 하는 기술에는 AES / DES 같은것들이 있습니다. 


- 공개키 기반 암호화 
   1)  A 와 B 가 있습니다.  A 를 사람 혹은  클라이언트 / B 는  사람 혹은 서버라고 상상해 봅니다.
   2)  A 와 B 는 각각  "공개키" 와 "개인키" 를 생성합니다. 
   3)  A 는 자신의 "공개키" 를  B 에게 보내줍니다.
   4)  B 는 자신의 "공개키" 를  A 에게 보내줍니다.
   5)  A 와 B 는 상대방의 "공개키" 로 자신의 문서를 암호화 하여 보내줍니다. 
   6)  A 와 B 는 자신의 "개인키" 로 상대방이 보낸 문서를 복호화 하면서 서로 통신합니다.

   특징 :  2개의 키가 필요하고, "공개키" 로 암호화 하고 "개인" 로 복호화합니다.
   단점 :  암호화 / 복호화 하는데 시간이 오래 걸립니다. 그리고 "공개키" 가 상대방것인지 확인불가
   기술:  RSA , DSA 

- SSL / TLS 암호화 



   1)  클라이언트는 서버에게 보안통신을 하자고 요청합니다. (가능한 암호화알고리즘/해쉬함수를  보냄)
   2)  클라이언트가 보낸 리스트중에 서버는 센놈하나를  선택합니다.
   3)  서버는 자신의 공개키가 포함된 전자서명을 클라이언트에 보냅니다.(전자서명은 CA 의 개인키로함)
   4)  클라이언트는 자신의 컴퓨터나 브라우저에 내장된 CA 의 공개키로 서버가 보낸 전자서명을 검증
   5)  클라이언트는 검증후에 전자서명안의 서버의 공개키를 획득합니다.
   6)  클라이언트는 서버의 공개키로 자신이 만든 "대칭키" 를 암호화하여 서버로 보내줍니다.
   7) 서버는 자신의 개인키로 클라이언트가 보낸 "대칭키" 를 복호화합니다.
   8)  클라이언트와 서버는 이제 서로 동일하게 가지고있는 "대칭키" 로 암호화 통신을 합니다.

   특징 :  공개키 기반 암호화에서 상대방의 공개키를 믿을수있게 하기위해  인증서를 도입함. 
              공개키를 통한 핸드쉐이킹 후에는 "대칭키" 를 통해 통신. 
   응용기술 :  HTTP 프로토콜에 SSL 를 더 한것이  HTTPS 

- 커버로스 
   1)  클라이언트는 KDC 에 사용자이름, 클라이언트 비밀키로 암호화한 타임스탬프,  TGT 요청을 보냄.
   1-1)  위에서 클라이언트 비밀키는 패스워드를 해쉬로 산출된 값이다. 그 값은 당연히  KDC 도 보유.  
   2)  KDC 는 그 메세지를 기반으로 클라이언트가 적절한지를 판단한다.
   2-1)  KDC 가 보유한 해당 클라이언트의 패스워드 해쉬값으로 복호화 되면 적절하다고 판단함.
   3) KDC 는 클라이언트에게 TGT 를 발행합니다. 
   3-1) TGT 는 KDC 자신의 비밀키로 암호화 되어져있다. 
   4) KDC 는 클라이언트에게 TGS 와 통신할때 사용할 둘만의 세션키를 발행한다. 
   4-11) 세션키는 클라이언트의 비밀키로 암호화 되어져있다. (세션키와 TGT 는 같게해서 단순화)  
   5) 클라이언트는 TGT 와 클라이언트 비밀키로 암호화한 사용자정보를 가지고 티켓발급이 필요한 
       서비스요청을 한다.
   6) 요청을 받은 KDC 의 TGS 는 TGT 를 자신의 비밀키로 해독하고 세션키로 암호화된 사용자 정보
       를 해독하여 사용자의 신원/타임스탬프를 확인한다.  
   7) KDC 는 해당 서비스와 통신할수있도록 티켓과 클라이언트와 서비스가 통신할때 사용할 둘만의 
       세션키를 클라이언트에게 발행해준다. 
   7-1) 티켓은 서비스키로 암호화, 둘만의 세션키는 클라이언트 비밀키로 암호화.  
   8) 클라이언트는 서비스에  티켓과  사용자 정보를 세션키로 암호화해서 전달. 사용해도 되냐고 함.
   9) 서비스는 티켓을 서비스키로 검증한후에 접근을 허락해줍니다.


 특징: 커버로스는 인증 시스템 입니다. / Single Sign ON (SSO) 
 장점: 1.네트워크를 통한 패스워드의 전송이 없다. 유효기간이 있는 티켓만이 돌아다님.
          2. 서비스들은 모든 클라이언트에 대한 정보를 각자 가지고있을 필요가 없다. 
 단점 :  커버로스의 KDC 가 멈추면 모든게 멈춥니다. (single point of failure) 
           서버간에 시간설정이 일치해야함.


커버로스 인증에 대하여 그림을 통하여 다시 살펴봅시다. ( Hadoop Security 책에서 발췌) 
위 그림에 대한 한글 설명을 하둡 보안이라는 책에서 가져왔습니다. 저 그림과 이 설명에는 몇가지 오류가 있는데 찾아보세요.

1. 클라이언트는 KDC 에 인증부호를 보내 TGT 을 요청한다.
2.  KDC 는 클라이언트에게 TGT 와 세션키를 제공한다. TGT 는 인증된 사용자에게 제공하는 특별 티켓으로 , 모든 서버의 서비스 티켓 발급에 이용된다. 8~10시간후 만료된다. 세션키는 통신에 참여하는 두 파티를 위한 공통키로, 두 파티간 전송되는 데이터의 암호화에 사용된다.
3. 클라이언트는 TGT 를 이용해 서비스 티켓을 TGS 에 요청한다.
4. KDC 는 서비스 티켓과 타겟 서버로 보내는 데이터의 암호화에 사용할 세션키를 제공한다.
   이때 세션키는 타켓 서버만이 자신의 비밀키를 사용해 세션키를 해독하고 사용자와 통신할수 
   있도록 타겟 서버의 비밀키를 통해 암호화된다.  
5. 클라이언트는 타겟 서버와 연결해 TGS 를 제출한다. 서버는 자신의 비밀키를 이용해  TGS 를 해독
    하고 클라이언트를 인증한다.
6. 서버는 세션키로 암호화된 인증 부호를 제공한다. 이제 클라이언트와 서버는 세션키를 비밀키로 공유하고 이키로 필요한 모든 데이터를 암호화한다.


오류1  : TGS 로 부터 받는 정보는 TGT 가 아니라 TGS / 4, 5 번 그림에서 TGT -> TGS
오류 2 :  클라이언트/서버간에 사용될 세션키가 타겟서버만이 자신의 비밀키로 세션키를 해독한다고 하는데 그럼 클라이언트는 세션키를 어떻게 얻을수 있나?? 저 위의 설명만으로는 알수없음.

- 언제 커버로스를 쓰고 언제 SSL 을 쓰나 ? 

  커버로스는 인증을 위해 사용합니다. 독자분의 서버에 여러가지 서비스 데몬이 있다고 칩시다.
  각각의 서비스를  영철이한테 오픈할것과  미숙이한테 오픈할것이 다를경우, 사용합니다. 
  SSL 은  님과 영숙이간에 서로 전화를 할때, 남이 무슨 얘기를 하는지 알수없게 하기위한 검증된
  암호화가 목적입니다. 전혀 사용하려는 목적 및  방법이 다르다는 말이지요. 비교대상은 아닙니다.

- JCA /  JCE   (Java Cryptograpy Architecture)


   자바 플랫폼 상에서 각종 암호화 관련 기능을 제공한다. 암호화 API , 난수생성, 해쉬함수, MAC, 
    전자 서명, 인증서, 키 생성 및 교환 제공. 
   
- JSSE   (Java Secure Socket Extension

   자바IO/NIO 를 이용한 소켓통신을 할때 Socket 객체를 이용하지 않은가?
   단지 그 객체를 SSLSocket 으로 바꾸어주면 SSL 로 보호되어진 통신을 할수있게 
   된다. (SSLSocketFactory.getDefault 를 이용하여 생성) 

- JAAS (Java Authentication and Authorization Service)

  위의 JCA 나 JSSE 가 암호화된 통신에 목적이 있다면, JAAS 는 인증과 접근허가등에 관련된 
  기능을 제공한다.

- JAVA SASL  / JAVA GSS-API 

 커버로스 프로토콜등 을 내부적으로 이용하여 , 보안 통신에 대한 API 와 구현을 제공한다.
 하둡에서도 커버로스를 사용할때 SASLServer / SASLClient 등을 사용한다.

특징
APIs and implementations for the following standards-based secure communications protocols: Transport Layer Security (TLS), Secure Sockets Layer (SSL), Kerberos (accessible through GSS-API), and the Simple Authentication and Security Layer (SASL). Full support for HTTPS over SSL/TLS is also included.

장점 
Authenticates peers over an untrusted network and protects the integrity and privacy of data transmitted between them.


오픈소스 하둡 과 보안 

대부분의 오픈소스 솔루션들은 처음부터 보안에 대해 염두해두고 개발되지 않습니다. 그 이유는 당연히
시간때문이겠지요. 자신의 유니크한 아이디어를 돌아가는 솔루션으로 만드는것이 1차 목표가 되는것이지 보안은 후속계획쯤으로 여겨지게 됩니다. 따라서 하둡도 마찬가지로 처음에는 보안부분에 크게 신경을 쓰지 않았으며 , 버전업이되면서 보안로직들이 추가되기 시작했습니다. 하둡을 이용하여 회사내의 데이터에 대해사 의미있는 가치를 창출하였을때 그 데이터에 대한 접근이 아무한테나 허용하면 안되겠지요.  하둡은 처음에는 단순히 실수에의한 데이터분실에 촛점을 맞춰서 간단히 보안을 추가했다면, 현재는 하둡 에코 시스템 및 타 시스템들과의 연동등에 복잡한 보안이슈가 발생하고 있습니다. 

컴퓨터의 사용자 정보/그룹 그 자체로 사용자 인증을 하였으며, 커버로스가 지원된 후에는 커버로스를 통해서 인증을 처리합니다.하지만 그것들은 단지 인증만 처리하므로, 사용자가 어떤 데이터를 사용하는것에 관련된 허가정책은 하둡내에서 따로 구현하고 있습니다. 

1. 각종 설정 사항들 

 
     hadoop.security.authentication 
     
    커버로스 인증을 할것이냐, 기본인증 (Simple) 을 할것이냐 구분한다.
    기본값은 운영체제의 사용자 이름을 이용하여 신분을 확인하는  방식이 사용되어야 한다.  
    (whoami 등을 조작가능) 
    
     hadoop.security..authorization 

    서비스 수준의 권한 부여 활성화. 

     hadoop.rpc.protection = privac
   
    하둡 RPC 매커니즘에서 SASL 보안을 지원한다.하둡 클라이언트와 네임노드간의 통신을 보호하고 암호화.

     hadoop-policy.xml 
    
    접근제어목록 (ACL) 을 구성하여 사용자/그룹이 서비스를 사용할수있는지 허가해줌.

     dfs.encrypt.data.transfer = true 

    클라이언트에서 데이터노드로의 데이터 전송에 보안된 데이터 전송을 위해 SASL 래퍼를 사용한다. 
    네임노드와 데이터노드는 비밀키를 공유하는데, 네임노드에서 클라이언트한테 비밀키로 암호화된 세션키
    를 넘겨주어, 그것을 이용하여 인증한다. 


2. 하둡 보안을 위한 설계 요구사항들 

 - 하둡 사용자는 승인받은 데이터에만 접근할수있어야한다.
 - 사용자는 자신의 잡만 읽고, 수정, 중지 
 - 인증한 서비스만이 데이터노드로 등록될수있다.
 - 데이터노드내의 데이터블록접근은 보안이 필요하고, 인증한 사용자만이 하둡 클러스터의 저장된 데이터에 접근할수있어야한다.
 - 하둡 클러스터를 안전하게 보호하기위해 내부 생성토큰을 이용하는데, 데이터접근을 위해서는 블록접근토큰이 필요하다.

3. 하둡 기본 보안 모델 without  커버로스 

하둡은  HDFS에 들어 있는 파일과 디렉터리에 대한 읽기/쓰기 권한을 제한할 수 있다. POSIX 파일 시스템과 유사하게, 사용자와 관리자는 파일/디렉토리에 대한 권한과 소유권 변경을 위해 chmod 와 chown 명령어를 사용할수 있다. 하둡은 사용자 관리 기능을 전혀 제공하지 않는다. 대신 하둡 내의 운영시스템 사용자를 이용한다. 
기본적으로 하둡은 사용자나 하둡 서비스에 대한 어떠한 인증도 지원하지 않는다. 사용자는 로그인 하는 동안 운영체제에만 인증한다. 이후 사용자가 하둡 명령어를 호출할때, 사용자의 ID와 그룹이 whoami / bash -c groups 에 의해 각각 정해진다. 따라서 중간에 스트립트 조작에 의한 권한 강탈이 가능하다.
 


4. 하둡 보안 모델 with 커버로스 
커버로스는 하둡 0.20.20x 버전부터 추가되었다.


1. 모든 하둡 서비스는 KDC 에 자신을 인증한다.

2. 데이터노드를 네임노드에 등록하고. 비밀키를 교환한다. (heartbeat 메세지안에추가)

3. 클라이언트는 KDC 에 자신을 인증하고 TGT 를 요청해서 받는다.

4. 클라이언트는 TGT 를 이용하여 KDC에 서비스 티켓을 요청해서 받는다.
5. 클라이언트는 서비스티켓을 이용하여 네임노드에 파일을 요청한다. 
6. 네임노드 서비스는 클라이언트에 데이터정보(블록ID) 와 블록접근 토큰을 제공한다.
7. 클라이언트는 데이터정보와 블록접근 토큰을 데이터노드에 보내서 실제 데이터를 요청한다.
8. 데이터노드는 블록접근토큰을 통해 인증을 하고 실제 데이터를 찾아서 스트리밍해준다. 





위의 그림은 HDFS 와 함께 맵리듀스에 대한 보안인터렉션을 보여준다.





.... 작성중 ....


순서

1) 하둡 RPC
2) 하둡 스트리밍
3) 하둡 & 보안
4) 하둡 HDFS 읽기
5) 하둡 HDFS 쓰기
6) 하둡 IO  (Writable / Avro)
7) 하둡 & 가용성  (Zookeeper) 
8) 하둡 쉘 스크립트 및 환경


하둡은 HDFS 라는 분산파일시스템과 맵리듀스라는 그것을 이용하여 계산을 하는 도구를 가지고있습니다.  (YARN 이전) 계산에는 간단한 배치성 작업이 주를 이루며 다양한 머신러닝 알고리즘 (머하웃 라이브러리) 을 실행할수도 있습니다.계산복잡도,알고리즘형태에 따라서  지라프,하마같은 다른 도구를 사용할수도 있으며 , 하둡 YARN 과 함께 다양한 빅데이터 솔루션들이 하모니를 이루고 있습니다. 
Storm-yarn 같은 도구를 사용하여 실시간 분석을 용이하게 할수도있으며, 메모리를 적극적으로 활용한 Spark 라는 제품도 각광을 받고 있습니다.


... 작성중 ...

순서

1) 하둡 RPC
2) 하둡 스트리밍
3) 하둡 & 보안
4) 하둡 HDFS 읽기
5) 하둡 HDFS 쓰기
6) 하둡 IO  (Writable / Avro)
7) 하둡 & 가용성  (Zookeeper) 
8) 하둡 쉘 스크립트 및 환경


하둡은 HDFS 라는 분산파일시스템과 맵리듀스라는 그것을 이용하여 계산을 하는 도구를 가지고있습니다.  (YARN 이전) 계산에는 간단한 배치성 작업이 주를 이루며 다양한 머신러닝 알고리즘 (머하웃 라이브러리) 을 실행할수도 있습니다.계산복잡도,알고리즘형태에 따라서  지라프,하마같은 다른 도구를 사용할수도 있으며 , 하둡 YARN 과 함께 다양한 빅데이터 솔루션들이 하모니를 이루고 있습니다. 
Storm-yarn 같은 도구를 사용하여 실시간 분석을 용이하게 할수도있으며, 메모리를 적극적으로 활용한 Spark 라는 제품도 각광을 받고 있습니다.


...작성중..

순서


1) 하둡 RPC
2) 하둡 스트리밍
3) 하둡 & 보안
4) 하둡 HDFS 읽기
5) 하둡 HDFS 쓰기
6) 하둡 IO  (Writable / Avro)
7) 하둡 & 가용성  (Zookeeper) 
8) 하둡 쉘 스크립트 및 환경



하둡은 HDFS 라는 분산파일시스템과 맵리듀스라는 그것을 이용하여 계산을 하는 도구를 가지고있습니다.  (YARN 이전) 계산에는 간단한 배치성 작업이 주를 이루며 다양한 머신러닝 알고리즘 (머하웃 라이브러리) 을 실행할수도 있습니다.계산복잡도,알고리즘형태에 따라서  지라프,하마같은 다른 도구를 사용할수도 있으며 , 하둡 YARN 과 함께 다양한 빅데이터 솔루션들이 하모니를 이루고 있습니다. Storm-yarn 같은 도구를 사용하여 실시간 분석을 용이하게 할수도있으며, 메모리를 적극적으로 활용한 Spark 라는 제품도 각광을 받고 있습니다.



HDFS 라는 분산파일 시스템은 여러대의 컴퓨터에 파일을 분산하여 놓고 읽기/쓰기를 지원해주는 단순한 기능을 합니다.
NameNode 는 분산된 파일의 위치/크기등의 정보를 가지고있으며
DataNode 는 실제 데이터를 가지고있습니다.

하둡 클라이언트에서 파일을 가져오기 위해서는

0. KDC (케버로스센터) 에 네임노드 / 데이터노드 / 클라이언트는 각각 인증합니다.
1. 네임노드와 데이터노드간에 서로 비밀키를 교환합니다.
2. 클라이언트는 KDC 에 TGT 를 요청합니다.
3. 클라이언트는 KDC 에 네임노드 서비스 티켓을 요청합니다.
4. NameNode 에게 내가 원하는 파일의 위치 (블럭ID) 를 알려달라고 요청합니다. (티켓과 함께)
5. 1번에서 획득한 블럭ID  및 블록 접근 토큰을 가지고 해당 DataNode 에게 데이터를 달라고 요청합니다.

위에

5번에서 데이타를 가져올때는 하둡 스트리밍 (TCP/IP) 를 이용합니다.
나머지 0~3번은 보안과 관련된 이슈이므로 다음 연재때 자세히 다뤄보도록 하겠습니다.


하둡 스트리밍을  이해하기위해서는 자바IO/NIO 에 대한 몇가지 선행학습이 필요합니다. 
(하둡은 자바로 만들어짐. NIO2가 안정화되면 코어구현도 바뀌겠지요. 네티가 들어갈수도) 


1. 자바 IO /NIO ( http://javacan.tistory.com/entry/87 

* InputStream / OutputStream
* Selctor
* Channel
* ByteBuffer
* Socket


소켓 통신 일반  




전형적인 소켓 프로그래밍 인터렉션입니다. 서버는 대기하고, 클라이언트가 접속하면 서버는 받아드리고, 

받아드리면서 클라이언트와 통신하기위한 소켓하나 만들어서 쓰레드하나 만들어서

던진후에 그 쓰레드에서 클라이언트와 입/출력.  




자바 NIO 



  자바NIO (New I/O)는 비동기 입출력은 아닙니다.   

  쪼 위에 blocking 되있는걸 보실수있습니다. (타임아웃 가능) 

  Select 모델이라고 하지요. 소켓통신 모델에는 다양한 모델이 있으며, IOCP 모델이 빠르기로 유명함.

  개인적으로 자바 NIO/NIO2 프로그래밍이 C++ 을 이용한 IOCP 프로그래밍 보다는 애매하게 느꼈던것 

  같습니다. (추상화 하다보니 좀 희미하다고 해야하나? 그에 반해 IOCP 모델은 굉장히 클리어 하죠.)


하둡 스트리밍 


사실 하둡 스트리밍에 대한 글을 쓸까 말까 좀 고민을 했었습니다.  왜냐? 할게 없기때문에.. 

그냥 자바 소켓통신입니다. 따라서 자바소켓통신에 대한 소스레벨 분석 글을 보신다고 생각하면 편하실듯합니다. :-)  독자분이 소켓통신을 구현할때 가져다 쓰면 좋겠지요. 구태여 바퀴를 또 만들필요가 없지 않겠습니까?   (이런 의미에서 Netty 를 사용하는게 좋죠. Vert.x / Couchbase / OpenTSDB 등이 사용)




위의 그림은 클라이언트에서 데이타노드로 블록ID 을 보내어 데이터를 달라고 하는 상황에서

그와 관련된 클래스들의 모식도 입니다.

소스 조각을  하나씩 살펴보겠습니다.

DFSClient

 - 하둡 HDFS  읽기/쓰기 연재에서 설명 예정.

RemoteBlockReader

 - 하둡 HDFS  읽기/쓰기 연재에서 설명 예정.

NetUtils

public static SocketFactory getDefaultSocketFactory(Configuration conf) {

    String propValue = conf.get("hadoop.rpc.socket.factory.class.default");

    if ((propValue == null) || (propValue.length() == 0))

      return SocketFactory.getDefault();

    return getSocketFactoryFromProperty(conf, propValue);

  }

 하둡 환경설정에 따라 소켓팩토리를 선정합니다. getDefaultSocketFactory 가 

 기본.SocketFactory.getDefault(); 참고로 소켓팩토리를 통해서 SSLSocket 을 사용할수있습니다. 


 public static InputStream getInputStream(Socket socket, long timeout) 
                                           throws IOException {
    return (socket.getChannel() == null) ? 
          socket.getInputStream() : new SocketInputStream(socket, timeout);
  }
소켓의 속성에  채널이 있을때와 없을때 , 그러니깐 IO / NIO 에 따라서  리턴되는스트림객체가 다릅니다. 

public static void connect(Socket socket, 
                             SocketAddress endpoint,
                             SocketAddress localAddr,
                             int timeout) throws IOException {
  
...
  
    SocketChannel ch = socket.getChannel();
    
    if (localAddr != null) {
      socket.bind(localAddr);
    }

    if (ch == null) {
      socket.connect(endpoint, timeout);
    } else {
      SocketIOWithTimeout.connect(ch, endpoint, timeout);
    }
    ......
  }

 역시 소켓의 속성이 IO/ NIO 냐에 따라서 connection 방법이 달라집니다. IO 이면 일반 connect 를 맺고NIO 면 SocketIOWithTimeout 클래스를 사용하네요.


SocketInputStream 


생성자 


public SocketInputStream(Socket socket, long timeout) 

                                         throws IOException {

    this(socket.getChannel(), timeout);

  }


 public SocketInputStream(ReadableByteChannel channel, long timeout)

                                                        throws IOException {

    SocketIOWithTimeout.checkChannelValidity(channel);

    reader = new Reader(channel, timeout);

 }


내부에서 Reader 를 사용합니다. 


public int read(byte[] b, int off, int len) throws IOException {

    return read(ByteBuffer.wrap(b, off, len));

  }


 public int read(ByteBuffer dst) throws IOException {

    return reader.doIO(dst, SelectionKey.OP_READ);

  }


원하는 데이터의 양만큼을 ByteBuffer 클래스를 통해 할당하여 요청합니다.

  


SocketOutputStream


 public SocketOutputStream(WritableByteChannel channel, long timeout) 

                                                         throws IOException {

    SocketIOWithTimeout.checkChannelValidity(channel);

    writer = new Writer(channel, timeout);

  }


 public void write(byte[] b, int off, int len) throws IOException {

    ByteBuffer buf = ByteBuffer.wrap(b, off, len);

    while (buf.hasRemaining()) {

      try {

        if (write(buf) < 0) {

          throw new IOException("The stream is closed");

        }

      } catch (IOException e) {

        if (buf.capacity() > buf.remaining()) {

          writer.close();

        }

        throw e;

      }

    }

  }


ByteBuffer 로 쓸것을 저장한후에 남는게 없을동안 씁니다. write 를 호출한 쓰레드는 계속 기다려야합니다.
IOCP 라는 윈도우OS가 제공하는 비동기 입출력 통신 기술에서는 기다리지 않습니다. C++ 은 네이티브라 
당연히 사용가능하며, 자바도 NIO 는 지원안하지만 지원하는 라이브러리도 있는것으로 알고있습니다.


Reader 

: SocketIOWithTimeout 를 상속받은 클래스 

  int performIO(ByteBuffer buf) throws IOException {

      return channel.read(buf);

  }


 채널에서 버퍼의 크기만큼 읽는다. 

 


Writer 

: SocketIOWithTimeout 를 상속받은 클래스 


public int write(ByteBuffer src) throws IOException {

    return writer.doIO(src, SelectionKey.OP_WRITE);

  }

  

 int performIO(ByteBuffer buf) throws IOException {

      return channel.write(buf);

 }



SocketIOWithTimeout  (이게 핵심) 


private static SelectorPool selector = new SelectorPool();


int doIO(ByteBuffer buf, int ops) throws IOException {

 

    if (!buf.hasRemaining()) {

      throw new IllegalArgumentException("Buffer has no data left.");

    }


    while (buf.hasRemaining()) {

     int n = performIO(buf);

       if (n != 0) {

         return n;

        }

       selector.select(channel, ops, timeout);  

    }

    return 0; // does not reach here.

  }


Buffer 에 담을 수있는 데이터가 있는지 확인한후에 ,  -->  if (!buf.hasRemaining()) {

더이상 담을 공간이 없을때까지 -->  while (buf.hasRemaining()) 

입력/출력을 합니다. --> int n = performIO(buf);

입력/출력한게 있으면 리턴해줍니다. --> return n;

입력/출력한게 없으면 다시 입력/출력하기 위해 대기합니다. -->  selector.select(channel, ops, timeout);  


static void connect(SocketChannel channel, 

                      SocketAddress endpoint, int timeout) throws IOException {

    

    boolean blockingOn = channel.isBlocking();

    if (blockingOn) {

      channel.configureBlocking(false);

    }

    

    try { 

      if (channel.connect(endpoint)) {

        return;

      }


      while (true) {

        

        int ret = selector.select((SelectableChannel)channel, 

                                  SelectionKey.OP_CONNECT, timeoutLeft);

        

         ...

      }

    } catch (IOException e) {

   ...

  }


SelectorPool

 private static class SelectorInfo {
      Selector              selector;
      long                  lastActivityTime;
      LinkedList<SelectorInfo> queue; 
      
      void close() {
        if (selector != null) {
          try {
            selector.close();
          } catch (IOException e) {
            LOG.warn("Unexpected exception while closing selector : " +
                     StringUtils.stringifyException(e));
          }
        }
      }    
    }
    
    private static class ProviderInfo {
      SelectorProvider provider;
      LinkedList<SelectorInfo> queue; // lifo
      ProviderInfo next;
    }

int select(SelectableChannel channel, int ops, long timeout) 
                                                   throws IOException {
     
      SelectorInfo info = get(channel);
      
      SelectionKey key = null;
      int ret = 0;
      
      try {
        while (true) {
          long start = (timeout == 0) ? 0 : System.currentTimeMillis();

          key = channel.register(info.selector, ops);
          ret = info.selector.select(timeout);
          
         ...
        }
      } finally {
      ...
    }


  private synchronized SelectorInfo get(SelectableChannel channel) 

                                                         throws IOException {

      SelectorInfo selInfo = null;

      

      SelectorProvider provider = channel.provider();

      

      // pick the list : rarely there is more than one provider in use.

      ProviderInfo pList = providerList;

      while (pList != null && pList.provider != provider) {

        pList = pList.next;

      }      

      if (pList == null) {

        //LOG.info("Creating new ProviderInfo : " + provider.toString());

        pList = new ProviderInfo();

        pList.provider = provider;

        pList.queue = new LinkedList<SelectorInfo>();

        pList.next = providerList;

        providerList = pList;

      }

      

      LinkedList<SelectorInfo> queue = pList.queue;

      

      if (queue.isEmpty()) {

        Selector selector = provider.openSelector();

        selInfo = new SelectorInfo();

        selInfo.selector = selector;

        selInfo.queue = queue;

      } else {

        selInfo = queue.removeLast();

      }

      

      trimIdleSelectors(System.currentTimeMillis());

      return selInfo;

    }

    

   셀렉터를 하나만 만들면 , 여러 소켓들이 하나의 셀렉터를 통해서 , 신호를 받기때문에 효율적이지 못합니다.

   따라서 각 이벤트들마다 셀렉터를 만들면, 효율적이게 운영할수있는데, 셀렉터를 풀로 만들어서 

   셀렉터 생성에 대한 부담을 줄였습니다. 상황에 따라서 Read / Write / Acceptor 등을 별도의 셀렉터로 

   등록 시키면 좋습니다. 위의 셀렉터풀 말고도 NIO에서는 ByteBuffer  Pool을 만들어서 사용해도 좋습니다.


DataNode 


 - 하둡 HDFS  읽기/쓰기 연재에서 설명 예정.


DataXeiverServer


 - 하둡 HDFS  읽기/쓰기 연재에서 설명 예정.


DataXceiver


 - 하둡 HDFS  읽기/쓰기 연재에서 설명 예정.


헤즐케스트 소개


해즐케스트의 소켓 통신을 위한 클래스들 


Storm 소개


Storm 의 소켓 통신을 위한 클래스들 



.............. 작성중 ................




이번주 토요일 발표된 하둡 쉘에 대하여 간단히 정리해봤습니다. 지저분한것들이 많지만..
핵심은 저것인데요.

0. 실행은 항상 마스터컴퓨터에서 합니다.
1. 먼저 start-dfs.sh 로 실행합니다.   ( 하둡의 HDFS 를 실행시키기 위함) 
2. start-dfs.sh 은 내부에서 hdadoop-config.sh 을 가져와서 실행합니다. config 에는 별거 없습니다.
3. hadoop-config.sh 내부에서는 다시  hadoop-env.sh 을 가져옵니다. 위에 보시다시피 각종 환경변수들을 설정해줍니다.
4. 마지막으로 start-dfs.sh 은  hadoop-daemon.sh 와 hadoop-daemons.sh 를 실행합니다. 
5. 단수인것은 네임노드만 실행하는것이구요 ( 네임노드는 마스터에 존재)
6. 복수인것은 for 문 돌면서 원격컴퓨터의 hadoop-daemon.sh 을 실행합니다. 
7. 결국 hadoop-daemon.sh 을 실행합니다.
8. hadoop-daemon.sh 은 옵션에 따라서  namenode 와 datanode 를 실행합니다.
9. hadoop --config 컨피그 디렉토리  namenode/datanode    start/stop  이렇게 실행합니다.
10. 마지막으로 hadoop 파일을 보면  하둡 라이브러리들을 클래스패스에 등록한후에  결국 자바클래스를 호출합니다.

name 노드 실행은   exec JAVA  namenode 자바클래스  start 
data  노드 실행은   exec JAVA  datanode 자바클래스 start 
입니다.

HDFS 에 파일을 조작하는 명령어는  hadoop fs 이며 
하둡에서 맵리듀스를 실행하는 명령어는 hadoop jar  입니다.

결국 소스는 위에 있는 클래스소스들부터 분석시작하면 됩니다. 

주요 클래스

CLASS= org.apache.hadoop.hdfs.server.namenode.NameNode
CLASS= org.apache.hadoop.hdfs.server.datanode.DataNode
CLASS=org.apache.hadoop.fs.FsShell
CLASS=org.apache.hadoop.util.RunJar


hadoop-2.2.0 경우는 이클립스에서  maven  으로 import 하면 됩니다만


hadoop-1.2.1 은 이클립스에서 ant 로 import 할 경우 ivy-common (??)  에러나면서 안됩니다.

그럴 경우 아래 방법을  참고하셔서 import 하시면  됩니다.

Basic Environment Description:

     

OS:

     

Ubuntu 12.04

JAVA version:

     

jdk 1.6.0_27

Eclipse version:

     

Service Release 1


  Part 1: Building Hadoop Developing Environment

           

1) download hadoop source code.

  • For Hadoop release, download it here. I used Hadoop 1.2.0;
  • For Newest versions, it can be downloaded from Hadoop Version Control System;
  • Assume HADOOP_HOME is the directory where the source code is.
2) Install tools.
  • Run command: sudo apt-get install ant ivy automake libtool to install "ant ivy automake libtool".
  • Download forrest-0.8 and unzip it. Assume its directory is FORREST_HOME 
    Note that forrest-0.8 is not the latest version, Using the latest version may confront with some problems. In order to run forrest-0.8, we need java-1.5. Because we only need java-1.5 for forrest-0.8, we put java-1.5 in a separate directory. Assume it is JV5_HOME. The version I used is jdk1.5.0_22
3) Change configuration file of Hadoop
  • Open config file: $HADOOP_HOME/src/c++/task-controller/configure.ac 
    Find a line which contains AC_SYS_LARGEFILE, delete it.
  • Open config file: $HADOOP_HOME/build.xml 
    Find a line contains < target name="ant-eclipse-download", there is a line closely below, start with < get src= which indicates the source location of file "ant-eclipse.jar". The location in my version does not exist, change it to https://ant-eclipse.googlecode.com/files/ant-eclipse-1.0.bin.tar.bz2.
4) Build hadoop.
  • cd $HADOOP_HOME
  • ant compile
  • ant clean package -Djava5.home=$JV5_HOME -Dforrest.home=$FORREST_HOME
5) Create eclipse project files.
  • ant eclipse
6) Configure eclipse and import hadoop project.
  • Open eclipse and add class path variable. 
    The operation is "Window->preferences->java->Buiid path-> Classpath Variables" 
    Create new variable ANT_HOME=/user/share/ant
  • Import Hadoop project. 
    The operation is "File->Import->General->Existing Project Into Workspace". Goto $HADOOP_HOME and import the project
  • Configure Hadoop project. 
    Because Hadoop need to find classes for webapp but these .Class files are not included in project configuration, it cause runtime errors. 
    A simple solution is adding path "$HADOOP_HOME/build/classes" to CLASS_PATH of the project. The operation is "File->Properties->Java Build Path->Libraries->Add Class Folder"

7) Test your environment

  • There are many JUnit test examples in the project. You can choose one to test your environment. 
    If debugging Junit testcases add following arguments to your "VM Argument" in your "Debug configuration". 
    -Dhadoop.log.dir=build/test/logs -Dtest.build.data=build/test/data 
    -Dhadoop.log.file=hadoop.log 
    -Djava.net.preferIPv4Stack=true






java 5 sdk 설치방법 (1)

ubuntu의 기본 update repository에서는 sun-1.5-jdk를 제공하지 않으므로, 다음의 방법으로 추가 repository를 추가한다.

sudo add-apt-repository "deb http://us.archive.ubuntu.com/ubuntu/ hardy multiverse"
sudo add-apt-repository "deb http://us.archive.ubuntu.com/ubuntu/ hardy-updates multiverse"

aapt-get update

sudo apt-get install sun-java5-jdk



java 5 sdk 설치방법 (2) 

oracle 에서 5버전 bin 다운받은후 terminal 창에서 
sudo chmod +x jdk-1_5_0_22-linux-amd64.bin   (엔터)
sudo ./jdk-1_5_0_22-linux-amd64.bin   (엔터) 




순서

1) 하둡 RPC
2) 하둡 스트리밍
3) 하둡 & 보안
4) 하둡 HDFS 읽기
5) 하둡 HDFS 쓰기
6) 하둡 IO  (Writable / Avro)
7) 하둡 & 가용성  (Zookeeper) 
8) 하둡 쉘 스크립트 및 환경


하둡은 HDFS 라는 분산파일시스템과 맵리듀스라는 그것을 이용하여 계산을 하는 도구를 가지고있습니다.  (YARN 이전) 계산에는 간단한 배치성 작업이 주를 이루며 다양한 머신러닝 알고리즘 (머하웃 라이브러리) 을 실행할수도 있습니다.계산복잡도,알고리즘형태에 따라서  지라프,하마같은 다른 도구를 사용할수도 있으며 , 하둡 YARN 과 함께 다양한 빅데이터 솔루션들이 하모니를 이루고 있습니다. Storm-yarn 같은 도구를 사용하여 실시간 분석을 용이하게 할수도있으며, 메모리를 적극적으로 활용한 Spark 라는 제품도 각광을 받고 있습니다.



HDFS 라는 분산파일 시스템은 여러대의 컴퓨터에 파일을 분산하여 놓고 읽기/쓰기를 지원해주는 단순한 기능을 합니다.
NameNode 는 분산된 파일의 위치/크기등의 정보를 가지고있으며
DataNode 는 실제 데이터를 가지고있습니다.

하둡 클라이언트에서 파일을 가져오기 위해서는

0. KDC (케버로스센터) 에 네임노드 / 데이터노드 / 클라이언트는 각각 인증합니다.
1. 네임노드와 데이터노드간에 서로 비밀키를 교환합니다.
2. 클라이언트는 KDC 에 TGT 를 요청합니다.
3. 클라이언트는 KDC 에 네임노드 서비스 티켓을 요청합니다.
4. NameNode 에게 내가 원하는 파일의 위치 (블럭ID) 를 알려달라고 요청합니다. (티켓과 함께)
5. 1번에서 획득한 블럭ID  및 블록 접근 토큰을 가지고 해당 DataNode 에게 데이터를 달라고 요청합니다.

위에

4번에서 NameNode 의 특정 함수를 호출하여 원하는바를 이루는데. 그때 하둡RPC 가 사용됩니다.
5번에서 데이타를 가져올때는 하둡 스트리밍 (TCP/IP) 를 이용합니다.
나머지 0~3번은 보안과 관련된 이슈이므로 다음 연재때 자세히 다뤄보도록 하겠습니다.


하둡 RPC 를 이해하기위해서는 자바기술에 대한 몇가지 선행학습이 필요합니다. (하둡은 자바로 만들어짐) 


1. 자바 리플렉션 ( http://socurites.com/60 )

public class TestServer implements RPCProtocol {

Class  cls  = Class.forName(“RPCProtocol“);

Class paramTypes[] = new Class[2];
pramTypes[0] = String.TYPE;
pramTypes[2] = Integer.TYPE;

Method meth = cls.getMethod(“heartbeat”, paramTypes);

Object arglist[] = new Object[2];
arglist[0] = new String(“hello world”)
arglist[1] = new Integer(2);


RPCProtocol rp  =  new  TestServer ();

Object retObj = meth.invoke(rp, arglist);

Integer retval = (Integer) retObj;

중요포인트 ( 필요사항 ) :    실제 객체 /  클래스타입 /  메소드이름 /  인자타입 / 실제 인자 /    

Hadoop  에서 각각에 대한 변수명 

                     instance (실제객체) 
                     protocol   (클래스타입) 
                     Invoocation  ( 메소드이름, 인자클래스, 실제 인자값)


2. 자바 Dynamic Proxy (http://javacan.tistory.com/entry/115 

public class MyDebugInvocationHandler   implements java.lang.reflect.InvocationHandler {    
        private Object target = null;
          public void setTarget(Object target_) {
            this.target = target_;
        }
        
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                System.out.println("Going to execute method : " + method.getName);
                Object retObject = method.invoke(target, args);
                System.out.println("After execute method : " + method.getName());
                return retObject;
        }
    }


-----------


 IMyBusinessObject bo = new MyBusinessObject(); // 실제 비즈니스 객체
  MyDebugInvocationHandler aMyDebugInvocationHandler = new MyDebugInvocationHandler();
  aMyDebugInvocationHandler.setTarget(bo);
    
 IMyBusinessObject proxyObject =    (IMyBusinessObject) Proxy.newProxyInstance  // 프록시 객체 생성
          (IMyBusinessObject.class.getClassLoader(),
          new Class[] { IMyBusinessObject.class }, // 프록시할 인터페이스
          aMyDebugInvocationHandler); // InvocationHandler
    
    // 프록시 객체 사용
    System.out.println(proxyObject.doExecute("Hello World"));


3. 자바 IO /NIO ( http://javacan.tistory.com/entry/87 

* InputStream / OutputStream
* Selctor
* Channel
* ByteBuffer
* Socket


이제 하둡 RPC 코드를 보도록 합시다.

먼저 그림을 통해 NIO 서버 흐름도를 살펴보도록 하겠습니다.




* Listener 를 통해서 대기하고있다가 Reader 쓰레드중 하나가 응답을 받아서 큐에 메세지를 넣음.
* Handler 에서 실제 함수를 리플렉션을 이용해 호출해주고 리턴값을 받아서 Response 함.


다음은 패킷의 구성을 살펴보겠습니다.


*  헤더를 통해서 인가 과정을 거치고 Invocation 객체가 직렬화되어 전달되어 집니다.


클라이언트 와 서버간의 인터렉션을 나열해 보면

클라이언트

1. 호출할 함수를 선언한 NameNode (ClientProtocol)  인터페이스로  다이나믹 프록시를 생성
2. 생성된 프록시로 함수호출 
3. 내부에서 클라이언트 객체 하나를 풀에서 꺼내어 서버와 connection 을 맺음
4. 서버로 인터페이스 타입, 메소드 이름/타입, 메소드인자타입/ 값등 을 보내줌
5. 응답이 올때까지 대기
6. 응답이 오면 connection 을 끊고 리턴값을 완전한 writable 객체로 리턴함.  
7. writable 객체에서 값을 꺼내어 리턴.  끝.


서버 

1. 실제 구현이 있는 함수를 가진 인터페이스를 구현한 NameNode 객체에서 서버객체 생성
2. 서버객체는 Listenning 을 하며 대기.
3. 클라이이언트과 connection  을 맺고 패킷을 받은후 Read 객체는 그것을 큐에 삽
4. Handler 객체는 큐에서 패킷을 꺼내어 자바리플렉션기술을 이용하여 호출할 함수와 
    자신이 가지고있는  NameNode 객체를 이용하여 NameNode 의 함수를 호출함.
5. 함수결과값을 Response 객체를 통해서 클라이언트에 전송  



이제  DFSClient   (클라이언트) ------ RPC ------>  NameNode (서버)  의 코드를 짧게 살펴보겠습니다.


DFSClient (클라이언트 측) 

DFSClient

(ClientProtocol)RPC.getProxy(ClientProtocol.class,
        ClientProtocol.versionID, nameNodeAddr, ugi, conf,
        NetUtils.getSocketFactory(conf, ClientProtocol.class), 0,..)

 this.namenode = createNamenode(this.rpcNamenode, conf);
 
 namenode.getBlockLocations(src, start, length);

DFSClient 클래스에서 원격에 ClientProtocol  인터페이스를 구현한 객체가 가지고있는 함수 (getBlockLocations) 
하나를  호출하기위해  Proxy 를 만듭니다. 저 Proxy 는 원격함수를 호출하는 공통코드를 가지고있습니다. 
보통 Proxy or Decorator  는 웹개발에서 트랜잭션설정, 로그삽입, 필터기능등을 위하여 사용됩니다. 

RPC

1)
   final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
        rpcTimeout, connectionRetryPolicy);
    VersionedProtocol proxy = (VersionedProtocol)Proxy.newProxyInstance(
        protocol.getClassLoader(), new Class[]{protocol}, invoker);

2)

  private static class Invoker implements InvocationHandler {
    private Client.ConnectionId remoteId;
    private Client client;
   
    private Invoker(Class<? extends VersionedProtocol> protocol,
                     InetSocketAddress address, 
                     UserGroupInformation ticket,
                     Configuration conf, 
                     SocketFactory factory,
                     int rpcTimeout, 
                     RetryPolicy connectionRetryPolicy) throws IOException {
   
       this.remoteId = Client.ConnectionId.getConnectionId(address, protocol,
          ticket, rpcTimeout, connectionRetryPolicy, conf);
       this.client = CLIENTS.getClient(conf, factory);
    }

    public Object invoke(Object proxy, Method method, Object[] args)
      throws Throwable {
    
      ObjectWritable value = (ObjectWritable)
        client.call(new Invocation(method, args), remoteId);
     
      return value.get();
    }

1) 번은  Dynamic proxy 를 만드는 코드입니다.  

2) 번은 InvocationHandler 구현클래스의 내용입니다. 클라이언트객체풀에서 하나 가져와서 
    클래스 타입, 보안관련, 메소드이름, 인자, 인자타입등을 Invocation 클래스에 담아서 서버로 보내줍니다. 


Client


public Writable call(Writable param, ConnectionId remoteId)  
                       throws InterruptedException, IOException {
    Call call = new Call(param);
    Connection connection = getConnection(remoteId, call);
    connection.sendParam(call);                 // send the parameter

 하둡에서는 자바의 직렬화 및 RPC 를 사용하지 않습니다. 범용적으로 구현되있기때문에 성능에
 문제를 가져오기때문입니다.  따라서 Writble 이라는 인터페이스를 통해 아주 간단히 직렬화를 합니다. 
 Invocation 클래스는 Writble 을 상속받았음을 알수있습니다. 




NameNode (서버측) 

NameNode 

public class NameNode implements ClientProtocol, DatanodeProtocol,

   this.server = RPC.getServer(this, socAddr.getHostName(),
        socAddr.getPort(), handlerCount, false, conf, namesystem
        .getDelegationTokenSecretManager());

실제 분산노드에 저장되어있는 파일을 관리하고있는 클래스입니다.  ClientProtocol 를 구현했다는것을 알수있습니다.
클라이언트측에서 ClientProtocol  를 이용하여 Proxy 를 만든것을 보았을것입니다.  
NameNode 객체는 RPC 서버를 만드는데 포인트는 첫번째 인자로 자신을 넣어준다는것입니다. 
자바리플렉션에서 함수호출할경우 첫번째 인자로 호출하려면 함수를 가지고있는 객체를 받기때문입니다. 나머지는 소켓서버를 생성하기위한 포트 및 환경설정등입니다) 


RPC

public static Server getServer(...){    
      return new Server(instance, conf, bindAddress, port, numHandlers, verbose, secretManager);
  }

 public static class Server extends org.apache.hadoop.ipc.Server {

  public Writable call(Class<?> protocol, Writable param, long receivedTime) 
        Invocation call = (Invocation)param;
        if (verbose) log("Call: " + call);

        Method method =
          protocol.getMethod(call.getMethodName(),
                                   call.getParameterClasses());
        method.setAccessible(true);

        long startTime = System.currentTimeMillis();
        Object value = method.invoke(instance, call.getParameters());

추상클래스 Server 를 상속받아서 자신의 Server 클래스를 만듭니다. call 메소드를 오버라이드 한것을 볼수있습니다.
Server 에서  클라이언트 측에서 보낸 Invocation 객체를 해석하고 호출해주는 코드가 담겨져있습니다.

Server
 Server
   listener = new Listener();
  
 Listener 
      void doAccept(SelectionKey key) 

  Reader 
    void doRead(SelectionKey key) 

  Connection 
         public int readAndProcess() 
         private void processData(byte[] buf) 
   
  Call 
  call = new Call(id, param, this); 
  callQueue.put(call); // queue the call; 

  Handler 
    final Call call = callQueue.take(); // pop the queue; 
    value = call(call.connection.protocol, call.param, call.timestamp); 

 Response
서버 클래스입니다.

일반적인 서버측 행위를 수행합니다. 클라이언트를 기다리고 , 받아드리고 , 보낸 데이터를 읽어서 처리합니다. 
processData 함수에서 큐에 넣고 핸들러에서 pop 해서 call 합니다. call 에서는 리플렉션기능

 Invocation call = (Invocation)param;
        
 Method method = protocol.getMethod(call.getMethodName(),call.getParameterClasses());
      
 Object value = method.invoke(instance, call.getParameters());
으로  함수를 호출해주고
리턴객체를 클라이언트로 보내줍니다. 리턴객체또한 Writble 로 직렬화 됩니다.


이상 마치고  다음에는 하둡 스트리밍에 대해서 살펴보겠습니다.
(시간 날때마다 코드조각들을 더 넣고 설명도 더 추가하겠습니다. 조금 짧은감이있군요 -.-a) 



관련 클래스 모음 



+ Recent posts