일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
- 플레이프레임워크
- 그라파나
- 스칼라 강좌
- 파이썬 머신러닝
- Akka
- Golang
- 스칼라 동시성
- Play2 로 웹 개발
- 이더리움
- Hyperledger fabric gossip protocol
- hyperledger fabric
- 파이썬
- play 강좌
- CORDA
- akka 강좌
- 하이브리드앱
- 엔터프라이즈 블록체인
- 하이퍼레저 패브릭
- 블록체인
- 스위프트
- 파이썬 동시성
- 안드로이드 웹뷰
- 스칼라
- Actor
- play2 강좌
- 주키퍼
- 파이썬 강좌
- Play2
- Adapter 패턴
- 파이썬 데이터분석
- Today
- Total
HAMA 블로그
[이더리움] RLPX - 메세지 read/write 본문
이더리움의 P2P에서 리모트 피어와 메세지를 읽고/쓸때에는 위의 그림처럼 peer ( 이더리움에서 peer객체는p2p 와 eth에 각각있으며, eth의 peer 는 위 그림의 peer 와 protoRW를 포함한다) 를 통하는데, peer객체는 읽고/쓰기를 rlpxFrameRW를 통해서 한다. 이 글에서는 rlpx의 transport부분은 빼고 rlpxFrameRW를 살펴 볼 것이다.
func (pm *ProtocolManager) handleMsg(p *peer) error {
msg, err := p.rw.ReadMsg()
switch {
case msg.Code == GetBlockHeadersMsg:
...
case msg.Code == BlockHeadersMsg:
...
case msg.Code == GetBlockBodiesMsg:
먼저 eth서비스에서는 handleMsg로 메세지를 주기적으로 가져온다. 가져온 메세지의 코드에 따른 로직이 실행될 것이다. 참고로 이더리움엔 여러개의 서비스를 가질 수 있게 유연하게 설계되어 있으며 각각이 고유의 프로토콜을 가질 수 있다. p.rw.ReadMsg()를 따라가보자. 여기서 p는 위 그림에서 peer이며 rw는 protoRW이다.
type protoRW struct {
Protocol
in chan Msg // receives read messages
closed <-chan struct{} // receives when peer is shutting down
wstart <-chan struct{} // receives when write may start
werr chan<- error // for write results
offset uint64
w MsgWriter
}
func (rw *protoRW) WriteMsg(msg Msg) (err error) {
if msg.Code >= rw.Length {
return newPeerError(errInvalidMsgCode, "not handled")
}
msg.Code += rw.offset
select {
case <-rw.wstart:
err = rw.w.WriteMsg(msg)
// Report write status back to Peer.run. It will initiate
// shutdown if the error is non-nil and unblock the next write
// otherwise. The calling protocol code should exit for errors
// as well but we don't want to rely on that.
rw.werr <- err
case <-rw.closed:
err = ErrShuttingDown
}
return err
}
func (rw *protoRW) ReadMsg() (Msg, error) {
select {
case msg := <-rw.in:
msg.Code -= rw.offset
return msg, nil
case <-rw.closed:
return Msg{}, io.EOF
}
}
ReadMsg는 rw.in채널에서 메세지가 들어오길 기다리다가 들어오면 메세지코드msg를 리턴해준다. Go에 익숙하지 않다면 헷갈릴수 있는데, 위의 코드는 case 문에서 무엇인가 실행되기 전 까지는 블록된다. 만약 default 문을 추가한다면 default문을 실행하고 바로 리턴 될 것이다. 그럼 이제 rw.in 채널에 메세지를 넣어주는 부분을 찾아보자.
func (p *Peer) handle(msg Msg) error {
switch {
case msg.Code == pingMsg:
...
default:
proto, err := p.getProto(msg.Code)
if err != nil {
return fmt.Errorf("msg code out of range: %v", msg.Code)
}
select {
case proto.in <- msg:
return nil
case <-p.closed:
return io.EOF
}
}
return nil
}
p2p.peer 의 handle 메소드에서 msg 를 받은 후에 code가 어떤 프로토콜에 해당하는지 확인 후 해당 프로토콜에 proto.in <-msg: 를 통해서 메세지를 전달 해 준다. 그럼 매개변수 msg 는 어디서 왔을까?
func (p *Peer) readLoop(errc chan<- error) {
defer p.wg.Done()
for {
msg, err := p.rw.ReadMsg()
if err != nil {
errc <- err
return
}
msg.ReceivedAt = time.Now()
if err = p.handle(msg); err != nil {
errc <- err
return
}
}
}
다른 사람들의 이더리움에 해당되는 각각의 peer 안에는 고루프로 실행된 readLoop가 있어서 p.rw.ReadMsg()를 통해 메세지를 받아오고 있었다. p.rw.ReadMsg()를 추적해보자.
func (t *rlpx) ReadMsg() (Msg, error) {
t.rmu.Lock()
defer t.rmu.Unlock()
t.fd.SetReadDeadline(time.Now().Add(frameReadTimeout))
return t.rw.ReadMsg()
}
rlpx가 나왔다. t.fd.SetReadDeadline 옵션값을 적당히 주고 t.rw.ReadMsg()를 통해 메세지를 읽기 시작한다.
func (rw *rlpxFrameRW) ReadMsg() (msg Msg, err error) {
// read the header
headbuf := make([]byte, 32)
if _, err := io.ReadFull(rw.conn, headbuf); err != nil {
return msg, err
}
// verify header mac
shouldMAC := updateMAC(rw.ingressMAC, rw.macCipher, headbuf[:16])
if !hmac.Equal(shouldMAC, headbuf[16:]) {
return msg, errors.New("bad header MAC")
}
rw.dec.XORKeyStream(headbuf[:16], headbuf[:16]) // first half is now decrypted
fsize := readInt24(headbuf)
// ignore protocol type for now
// read the frame content
var rsize = fsize // frame size rounded up to 16 byte boundary
if padding := fsize % 16; padding > 0 {
rsize += 16 - padding
}
framebuf := make([]byte, rsize)
if _, err := io.ReadFull(rw.conn, framebuf); err != nil {
return msg, err
}
// read and validate frame MAC. we can re-use headbuf for that.
rw.ingressMAC.Write(framebuf)
fmacseed := rw.ingressMAC.Sum(nil)
if _, err := io.ReadFull(rw.conn, headbuf[:16]); err != nil {
return msg, err
}
shouldMAC = updateMAC(rw.ingressMAC, rw.macCipher, fmacseed)
if !hmac.Equal(shouldMAC, headbuf[:16]) {
return msg, errors.New("bad frame MAC")
}
// decrypt frame content
rw.dec.XORKeyStream(framebuf, framebuf)
// decode message code
content := bytes.NewReader(framebuf[:fsize])
if err := rlp.Decode(content, &msg.Code); err != nil {
return msg, err
}
msg.Size = uint32(content.Len())
msg.Payload = content
// if snappy is enabled, verify and decompress message
if rw.snappy {
payload, err := ioutil.ReadAll(msg.Payload)
if err != nil {
return msg, err
}
size, err := snappy.DecodedLen(payload)
if err != nil {
return msg, err
}
if size > int(maxUint24) {
return msg, errPlainMessageTooLarge
}
payload, err = snappy.Decode(nil, payload)
if err != nil {
return msg, err
}
msg.Size, msg.Payload = uint32(size), bytes.NewReader(payload)
}
return msg, nil
}
드디어 오늘의 주인공 rlpxFrameRW 객체가 나왔으며, 이 Read/Write 함수를 하나씩 뜯어 보자.
그 전에 선두지식이 필요한데, 여기서 Read/Write 하기전에 peer 끼리 커넥션이 맺어질때 RLPX은 먼저 상대방과 암호화 통신을 하기 위한 암호화 키들을 교환하는데, 그렇게 해서 만들어진 암호화 재료가 read/write에 사용된다 그 부분에 대해 좀 더 자세히 알아보려면 이 글을 필히 먼저 읽어보자. [이더리움] RLPX - Encryption handshake
복잡해보이지만 여기서 필요한 내용은 아래와 같다.
- 고정적인 NodeID(public key) 와 노드 고유의 Private 키를 가지고 random pri-key / pub-key를 상호 생성/교환
- 랜덤으로 생성한 pub/pri 키를 통해 secrets (대칭키AES와 HMAC) 생성이다.
func (h *encHandshake) secrets(auth, authResp []byte) (secrets, error) {
... s := secrets{
RemoteID: h.remoteID,
AES: aesSecret,
MAC: crypto.Keccak256(ecdheSecret, aesSecret),
}
...
if h.initiator {
s.EgressMAC, s.IngressMAC = mac1, mac2
} else {
s.EgressMAC, s.IngressMAC = mac2, mac1
}
return s, nil
}
이렇게 생성된 AEC/MAC 로는 암호화/복호화를 하고, Egress/IngressMAC으로는 메세지 authentication을 하게 된다.
func newRLPXFrameRW(conn io.ReadWriter, s secrets) *rlpxFrameRW {
macc, err := aes.NewCipher(s.MAC)
if err != nil {
panic("invalid MAC secret: " + err.Error())
}
encc, err := aes.NewCipher(s.AES)
if err != nil {
panic("invalid AES secret: " + err.Error())
}
// we use an all-zeroes IV for AES because the key used
// for encryption is ephemeral.
iv := make([]byte, encc.BlockSize())
return &rlpxFrameRW{
conn: conn,
enc: cipher.NewCTR(encc, iv),
dec: cipher.NewCTR(encc, iv),
macCipher: macc,
egressMAC: s.EgressMAC,
ingressMAC: s.IngressMAC,
}
}
secrets (AES,MAC,IngressMAC,EgressMAC) 는 rlpxFrameRW속성에 대입된다.
// read the header
headbuf := make([]byte, 32)
if _, err := io.ReadFull(rw.conn, headbuf); err != nil {
return msg, err
}
1. 헤더버퍼사이즈만큼 패킷읽음
// verify header mac
shouldMAC := updateMAC(rw.ingressMAC, rw.macCipher, headbuf[:16])
if !hmac.Equal(shouldMAC, headbuf[16:]) {
return msg, errors.New("bad header MAC")
}
rw.dec.XORKeyStream(headbuf[:16], headbuf[:16]) // first half is now decrypted
fsize := readInt24(headbuf)
// read the frame content
var rsize = fsize // frame size rounded up to 16 byte boundary
if padding := fsize % 16; padding > 0 {
rsize += 16 - padding
}
framebuf := make([]byte, rsize)
if _, err := io.ReadFull(rw.conn, framebuf); err != nil {
return msg, err
}
3. 패딩이 추가된 framebuf 사이즈만큼 데이터를 읽는다.
// read and validate frame MAC. we can re-use headbuf for that.
rw.ingressMAC.Write(framebuf)
fmacseed := rw.ingressMAC.Sum(nil)
if _, err := io.ReadFull(rw.conn, headbuf[:16]); err != nil {
return msg, err
}
shouldMAC = updateMAC(rw.ingressMAC, rw.macCipher, fmacseed)
if !hmac.Equal(shouldMAC, headbuf[:16]) {
return msg, errors.New("bad frame MAC")
}
4.들어온 frame 데이터를 ingressMAC.Write와 Sum 업데이트를 위한 fmacseed를 만들어서 해싱하여 프레임버퍼 이후에 붙어있는16바이트를 받아와 인증/검증확인한 후 이상없으면 진행한다.
// decrypt frame content
rw.dec.XORKeyStream(framebuf, framebuf)
5. 들어온 frame데이터를 secrets.AES로 만들어진 dec에 의해 복호화 하여 저장
// decode message code
content := bytes.NewReader(framebuf[:fsize])
if err := rlp.Decode(content, &msg.Code); err != nil {
return msg, err
}
msg.Size = uint32(content.Len())
msg.Payload = content
6. rlp 디코딩하여 msg.Payload에 저장
// if snappy is enabled, verify and decompress message
if rw.snappy {
payload, err := ioutil.ReadAll(msg.Payload)
if err != nil {
return msg, err
}
size, err := snappy.DecodedLen(payload)
if err != nil {
return msg, err
}
if size > int(maxUint24) {
return msg, errPlainMessageTooLarge
}
payload, err = snappy.Decode(nil, payload)
if err != nil {
return msg, err
}
msg.Size, msg.Payload = uint32(size), bytes.NewReader(payload)
}
return msg, nil
7. snappy가 사용가능하면 압축을 풀어서 최종완성된 msg 를 리턴해준다.
'블록체인' 카테고리의 다른 글
400라인의 go코드로 구현한 하이퍼레저 패브릭 [2]- 블록전파/Gossip Protocol (0) | 2019.01.28 |
---|---|
400라인의 go코드로 구현한 하이퍼레저 패브릭 [1] - 전체조망 (1) | 2019.01.24 |
[이더리움] RLPX - Encryption handshake (0) | 2019.01.21 |
[하이퍼레저 패브릭] 합의 알고리즘 (consensus algorithm) (0) | 2019.01.18 |
[비트코인] 트랜잭션이 만들어 지는 19단계 (0) | 2019.01.16 |