일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 | 31 |
- 파이썬 데이터분석
- 스칼라 강좌
- Actor
- Adapter 패턴
- akka 강좌
- CORDA
- 블록체인
- 스칼라
- 스위프트
- Akka
- 파이썬 강좌
- Play2
- hyperledger fabric
- Hyperledger fabric gossip protocol
- 파이썬 동시성
- 안드로이드 웹뷰
- play 강좌
- 이더리움
- 엔터프라이즈 블록체인
- 스칼라 동시성
- Golang
- 하이퍼레저 패브릭
- play2 강좌
- 주키퍼
- 하이브리드앱
- Play2 로 웹 개발
- 그라파나
- 파이썬 머신러닝
- 파이썬
- 플레이프레임워크
- Today
- Total
HAMA 블로그
[ 하둡 인사이드 ] 2. Hadoop Streamming 본문
* InputStream / OutputStream
* Selctor
* Channel
* ByteBuffer
* Socket
소켓 통신 일반
전형적인 소켓 프로그래밍 인터렉션입니다. 서버는 대기하고, 클라이언트가 접속하면 서버는 받아드리고,
받아드리면서 클라이언트와 통신하기위한 소켓하나 만들어서 쓰레드하나 만들어서
던진후에 그 쓰레드에서 클라이언트와 입/출력.
자바 NIO
자바NIO (New I/O)는 비동기 입출력은 아닙니다.
쪼 위에 blocking 되있는걸 보실수있습니다. (타임아웃 가능)
Select 모델이라고 하지요. 소켓통신 모델에는 다양한 모델이 있으며, IOCP 모델이 빠르기로 유명함.
개인적으로 자바 NIO/NIO2 프로그래밍이 C++ 을 이용한 IOCP 프로그래밍 보다는 애매하게 느꼈던것
같습니다. (추상화 하다보니 좀 희미하다고 해야하나? 그에 반해 IOCP 모델은 굉장히 클리어 하죠.)
하둡 스트리밍
사실 하둡 스트리밍에 대한 글을 쓸까 말까 좀 고민을 했었습니다. 왜냐? 할게 없기때문에..
그냥 자바 소켓통신입니다. 따라서 자바소켓통신에 대한 소스레벨 분석 글을 보신다고 생각하면 편하실듯합니다. :-) 독자분이 소켓통신을 구현할때 가져다 쓰면 좋겠지요. 구태여 바퀴를 또 만들필요가 없지 않겠습니까? (이런 의미에서 Netty 를 사용하는게 좋죠. Vert.x / Couchbase / OpenTSDB 등이 사용)
위의 그림은 클라이언트에서 데이타노드로 블록ID 을 보내어 데이터를 달라고 하는 상황에서
그와 관련된 클래스들의 모식도 입니다.
소스 조각을 하나씩 살펴보겠습니다.
DFSClient
- 하둡 HDFS 읽기/쓰기 연재에서 설명 예정.
RemoteBlockReader
- 하둡 HDFS 읽기/쓰기 연재에서 설명 예정.
NetUtils
public static SocketFactory getDefaultSocketFactory(Configuration conf) {
String propValue = conf.get("hadoop.rpc.socket.factory.class.default");
if ((propValue == null) || (propValue.length() == 0))
return SocketFactory.getDefault();
return getSocketFactoryFromProperty(conf, propValue);
}
하둡 환경설정에 따라 소켓팩토리를 선정합니다. getDefaultSocketFactory 가
기본.SocketFactory.getDefault(); 참고로 소켓팩토리를 통해서 SSLSocket 을 사용할수있습니다.
SocketInputStream
생성자
public SocketInputStream(Socket socket, long timeout)
throws IOException {
this(socket.getChannel(), timeout);
}
public SocketInputStream(ReadableByteChannel channel, long timeout)
throws IOException {
SocketIOWithTimeout.checkChannelValidity(channel);
reader = new Reader(channel, timeout);
}
내부에서 Reader 를 사용합니다.
public int read(byte[] b, int off, int len) throws IOException {
return read(ByteBuffer.wrap(b, off, len));
}
public int read(ByteBuffer dst) throws IOException {
return reader.doIO(dst, SelectionKey.OP_READ);
}
원하는 데이터의 양만큼을 ByteBuffer 클래스를 통해 할당하여 요청합니다.
SocketOutputStream
public SocketOutputStream(WritableByteChannel channel, long timeout)
throws IOException {
SocketIOWithTimeout.checkChannelValidity(channel);
writer = new Writer(channel, timeout);
}
public void write(byte[] b, int off, int len) throws IOException {
ByteBuffer buf = ByteBuffer.wrap(b, off, len);
while (buf.hasRemaining()) {
try {
if (write(buf) < 0) {
throw new IOException("The stream is closed");
}
} catch (IOException e) {
if (buf.capacity() > buf.remaining()) {
writer.close();
}
throw e;
}
}
}
Reader
: SocketIOWithTimeout 를 상속받은 클래스
int performIO(ByteBuffer buf) throws IOException {
return channel.read(buf);
}
채널에서 버퍼의 크기만큼 읽는다.
Writer
: SocketIOWithTimeout 를 상속받은 클래스
public int write(ByteBuffer src) throws IOException {
return writer.doIO(src, SelectionKey.OP_WRITE);
}
int performIO(ByteBuffer buf) throws IOException {
return channel.write(buf);
}
SocketIOWithTimeout (이게 핵심)
private static SelectorPool selector = new SelectorPool();
int doIO(ByteBuffer buf, int ops) throws IOException {
if (!buf.hasRemaining()) {
throw new IllegalArgumentException("Buffer has no data left.");
}
while (buf.hasRemaining()) {
int n = performIO(buf);
if (n != 0) {
return n;
}
selector.select(channel, ops, timeout);
}
return 0; // does not reach here.
}
Buffer 에 담을 수있는 데이터가 있는지 확인한후에 , --> if (!buf.hasRemaining()) {
더이상 담을 공간이 없을때까지 --> while (buf.hasRemaining())
입력/출력을 합니다. --> int n = performIO(buf);
입력/출력한게 있으면 리턴해줍니다. --> return n;
입력/출력한게 없으면 다시 입력/출력하기 위해 대기합니다. --> selector.select(channel, ops, timeout);
static void connect(SocketChannel channel,
SocketAddress endpoint, int timeout) throws IOException {
boolean blockingOn = channel.isBlocking();
if (blockingOn) {
channel.configureBlocking(false);
}
try {
if (channel.connect(endpoint)) {
return;
}
while (true) {
int ret = selector.select((SelectableChannel)channel,
SelectionKey.OP_CONNECT, timeoutLeft);
...
}
} catch (IOException e) {
...
}
private synchronized SelectorInfo get(SelectableChannel channel)
throws IOException {
SelectorInfo selInfo = null;
SelectorProvider provider = channel.provider();
// pick the list : rarely there is more than one provider in use.
ProviderInfo pList = providerList;
while (pList != null && pList.provider != provider) {
pList = pList.next;
}
if (pList == null) {
//LOG.info("Creating new ProviderInfo : " + provider.toString());
pList = new ProviderInfo();
pList.provider = provider;
pList.queue = new LinkedList<SelectorInfo>();
pList.next = providerList;
providerList = pList;
}
LinkedList<SelectorInfo> queue = pList.queue;
if (queue.isEmpty()) {
Selector selector = provider.openSelector();
selInfo = new SelectorInfo();
selInfo.selector = selector;
selInfo.queue = queue;
} else {
selInfo = queue.removeLast();
}
trimIdleSelectors(System.currentTimeMillis());
return selInfo;
}
셀렉터를 하나만 만들면 , 여러 소켓들이 하나의 셀렉터를 통해서 , 신호를 받기때문에 효율적이지 못합니다.
따라서 각 이벤트들마다 셀렉터를 만들면, 효율적이게 운영할수있는데, 셀렉터를 풀로 만들어서
셀렉터 생성에 대한 부담을 줄였습니다. 상황에 따라서 Read / Write / Acceptor 등을 별도의 셀렉터로
등록 시키면 좋습니다. 위의 셀렉터풀 말고도 NIO에서는 ByteBuffer Pool을 만들어서 사용해도 좋습니다.
DataNode
- 하둡 HDFS 읽기/쓰기 연재에서 설명 예정.
DataXeiverServer
- 하둡 HDFS 읽기/쓰기 연재에서 설명 예정.
DataXceiver
- 하둡 HDFS 읽기/쓰기 연재에서 설명 예정.
헤즐케스트 소개
해즐케스트의 소켓 통신을 위한 클래스들
Storm 소개
Storm 의 소켓 통신을 위한 클래스들
.............. 작성중 ................
'Hadoop' 카테고리의 다른 글
[ 하둡 인사이드 ] 5. 하둡 HDFS 쓰기 (0) | 2015.05.02 |
---|---|
[ 하둡 인사이드 ] 4. HDFS 읽기 (0) | 2015.05.02 |
하둡 쉘 스크립트 실행 순서도 | (0) | 2015.04.27 |
hadoop-1.2.1 이클랩스 import 하기 (0) | 2015.04.27 |
[ 하둡 인사이드 ] 1. Hadoop RPC (0) | 2015.04.27 |