관리 메뉴

HAMA 블로그

[이더리움] RLPX - 메세지 read/write 본문

블록체인

[이더리움] RLPX - 메세지 read/write

[하마] 이승현 (wowlsh93@gmail.com) 2019. 1. 21. 18:56

이더리움의 P2P에서 리모트 피어와 메세지를 읽고/쓸때에는 위의 그림처럼 peer ( 이더리움에서 peer객체는p2p 와 eth에 각각있으며, eth의 peer 는 위 그림의 peer 와 protoRW를 포함한다) 를 통하는데, peer객체는 읽고/쓰기를 rlpxFrameRW를 통해서 한다. 이 글에서는 rlpx의 transport부분은 빼고 rlpxFrameRW를 살펴 볼 것이다.

func (pm *ProtocolManager) handleMsg(p *peer) error {
msg, err := p.rw.ReadMsg()
switch {
case msg.Code == GetBlockHeadersMsg:
...
case msg.Code == BlockHeadersMsg:
...
case msg.Code == GetBlockBodiesMsg:

먼저 eth서비스에서는 handleMsg로 메세지를 주기적으로 가져온다. 가져온 메세지의 코드에 따른 로직이 실행될 것이다. 참고로 이더리움엔 여러개의 서비스를 가질 수 있게 유연하게 설계되어 있으며 각각이 고유의 프로토콜을 가질 수 있다.  p.rw.ReadMsg()를 따라가보자. 여기서 p는 위 그림에서 peer이며 rw는 protoRW이다.


type protoRW struct {
Protocol
in chan Msg // receives read messages
closed <-chan struct{} // receives when peer is shutting down
wstart <-chan struct{} // receives when write may start
werr chan<- error // for write results
offset uint64
w MsgWriter
}

func (rw *protoRW) WriteMsg(msg Msg) (err error) {
if msg.Code >= rw.Length {
return newPeerError(errInvalidMsgCode, "not handled")
}
msg.Code += rw.offset
select {
case <-rw.wstart:
err = rw.w.WriteMsg(msg)
// Report write status back to Peer.run. It will initiate
// shutdown if the error is non-nil and unblock the next write
// otherwise. The calling protocol code should exit for errors
// as well but we don't want to rely on that.
rw.werr <- err
case <-rw.closed:
err = ErrShuttingDown
}
return err
}

func (rw *protoRW) ReadMsg() (Msg, error) {
select {
case msg := <-rw.in:
msg.Code -= rw.offset
return msg, nil
case <-rw.closed:
return Msg{}, io.EOF
}
}

 ReadMsg는 rw.in채널에서 메세지가 들어오길 기다리다가 들어오면 메세지코드msg를 리턴해준다. Go에 익숙하지 않다면 헷갈릴수 있는데, 위의 코드는 case 문에서 무엇인가 실행되기 전 까지는 블록된다. 만약 default 문을 추가한다면 default문을 실행하고 바로 리턴 될 것이다. 그럼 이제 rw.in 채널에 메세지를 넣어주는 부분을 찾아보자.

func (p *Peer) handle(msg Msg) error {
switch {
case msg.Code == pingMsg:
...
default:
proto, err := p.getProto(msg.Code)
if err != nil {
return fmt.Errorf("msg code out of range: %v", msg.Code)
}
select {
case proto.in <- msg:
return nil
case <-p.closed:
return io.EOF
}
}
return nil
}

p2p.peer 의 handle 메소드에서 msg 를 받은 후에 code가 어떤 프로토콜에 해당하는지  확인 후 해당 프로토콜에 proto.in <-msg: 를 통해서 메세지를 전달 해 준다. 그럼 매개변수 msg 는 어디서 왔을까?


func (p *Peer) readLoop(errc chan<- error) {
defer p.wg.Done()
for {
msg, err := p.rw.ReadMsg()
if err != nil {
errc <- err
return
}
msg.ReceivedAt = time.Now()
if err = p.handle(msg); err != nil {
errc <- err
return
}
}
}

다른 사람들의 이더리움에 해당되는 각각의 peer 안에는 고루프로 실행된 readLoop가 있어서 p.rw.ReadMsg()를 통해 메세지를 받아오고 있었다.  p.rw.ReadMsg()를 추적해보자.

func (t *rlpx) ReadMsg() (Msg, error) {
t.rmu.Lock()
defer t.rmu.Unlock()
t.fd.SetReadDeadline(time.Now().Add(frameReadTimeout))
return t.rw.ReadMsg()
}

rlpx가 나왔다. t.fd.SetReadDeadline 옵션값을 적당히 주고 t.rw.ReadMsg()를 통해 메세지를 읽기 시작한다.


func (rw *rlpxFrameRW) ReadMsg() (msg Msg, err error) {
// read the header
headbuf := make([]byte, 32)
if _, err := io.ReadFull(rw.conn, headbuf); err != nil {
return msg, err
}
// verify header mac
shouldMAC := updateMAC(rw.ingressMAC, rw.macCipher, headbuf[:16])
if !hmac.Equal(shouldMAC, headbuf[16:]) {
return msg, errors.New("bad header MAC")
}
rw.dec.XORKeyStream(headbuf[:16], headbuf[:16]) // first half is now decrypted
fsize := readInt24(headbuf)
// ignore protocol type for now

// read the frame content
var rsize = fsize // frame size rounded up to 16 byte boundary
if padding := fsize % 16; padding > 0 {
rsize += 16 - padding
}
framebuf := make([]byte, rsize)
if _, err := io.ReadFull(rw.conn, framebuf); err != nil {
return msg, err
}

// read and validate frame MAC. we can re-use headbuf for that.
rw.ingressMAC.Write(framebuf)
fmacseed := rw.ingressMAC.Sum(nil)
if _, err := io.ReadFull(rw.conn, headbuf[:16]); err != nil {
return msg, err
}
shouldMAC = updateMAC(rw.ingressMAC, rw.macCipher, fmacseed)
if !hmac.Equal(shouldMAC, headbuf[:16]) {
return msg, errors.New("bad frame MAC")
}

// decrypt frame content
rw.dec.XORKeyStream(framebuf, framebuf)

// decode message code
content := bytes.NewReader(framebuf[:fsize])
if err := rlp.Decode(content, &msg.Code); err != nil {
return msg, err
}
msg.Size = uint32(content.Len())
msg.Payload = content

// if snappy is enabled, verify and decompress message
if rw.snappy {
payload, err := ioutil.ReadAll(msg.Payload)
if err != nil {
return msg, err
}
size, err := snappy.DecodedLen(payload)
if err != nil {
return msg, err
}
if size > int(maxUint24) {
return msg, errPlainMessageTooLarge
}
payload, err = snappy.Decode(nil, payload)
if err != nil {
return msg, err
}
msg.Size, msg.Payload = uint32(size), bytes.NewReader(payload)
}
return msg, nil
}

드디어 오늘의 주인공 rlpxFrameRW 객체가 나왔으며, 이 Read/Write 함수를 하나씩 뜯어 보자. 

그 전에 선두지식이 필요한데, 여기서 Read/Write 하기전에 peer 끼리 커넥션이 맺어질때 RLPX은 먼저 상대방과 암호화 통신을 하기 위한 암호화 키들을 교환하는데, 그렇게 해서 만들어진 암호화 재료가 read/write에 사용된다 그 부분에 대해 좀 더 자세히 알아보려면 이 글을 필히 먼저 읽어보자. [이더리움] RLPX - Encryption handshake 

복잡해보이지만 여기서 필요한 내용은 아래와 같다.

- 고정적인 NodeID(public key) 와 노드 고유의 Private 키를 가지고 random pri-key / pub-key를 상호 생성/교환 
- 랜덤으로 생성한 pub/pri 키를 통해 secrets (대칭키AES와 HMAC) 생성이다.

func (h *encHandshake) secrets(auth, authResp []byte) (secrets, error) {
... s := secrets{
RemoteID: h.remoteID,
AES: aesSecret,
MAC: crypto.Keccak256(ecdheSecret, aesSecret),
}

...
if h.initiator {
s.EgressMAC, s.IngressMAC = mac1, mac2
} else {
s.EgressMAC, s.IngressMAC = mac2, mac1
}

return s, nil
}

이렇게 생성된 AEC/MAC 로는 암호화/복호화를 하고, Egress/IngressMAC으로는 메세지 authentication을 하게 된다. 

func newRLPXFrameRW(conn io.ReadWriter, s secrets) *rlpxFrameRW {
macc, err := aes.NewCipher(s.MAC)
if err != nil {
panic("invalid MAC secret: " + err.Error())
}
encc, err := aes.NewCipher(s.AES)
if err != nil {
panic("invalid AES secret: " + err.Error())
}
// we use an all-zeroes IV for AES because the key used
// for encryption is ephemeral.
iv := make([]byte, encc.BlockSize())
return &rlpxFrameRW{
conn: conn,
enc: cipher.NewCTR(encc, iv),
dec: cipher.NewCTR(encc, iv),
macCipher: macc,
egressMAC: s.EgressMAC,
ingressMAC: s.IngressMAC,
}
}

secrets (AES,MAC,IngressMAC,EgressMAC) 는 rlpxFrameRW속성에 대입된다.

 // read the header
headbuf := make([]byte, 32)
if _, err := io.ReadFull(rw.conn, headbuf); err != nil {
return msg, err
}

1. 헤더버퍼사이즈만큼 패킷읽음

 // verify header mac
shouldMAC := updateMAC(rw.ingressMAC, rw.macCipher, headbuf[:16])
if !hmac.Equal(shouldMAC, headbuf[16:]) {
return msg, errors.New("bad header MAC")
}
rw.dec.XORKeyStream(headbuf[:16], headbuf[:16]) // first half is now decrypted
fsize := readInt24(headbuf)
2. 헤더를 macCipher로 복호화하고 ingressMAC으로 해싱한후에, 받은 헤더의 16바이트 이후의 값과 대조하여 동일하면 문제가 없다고 판단하여 다음으로 진행. 그리고 MAC 은 항상 업데이트되서 보안이 더 강화된다. 업데이트를 위한 시드는 headbuf[:16]을 사용하였다.
 // read the frame content
var rsize = fsize // frame size rounded up to 16 byte boundary
if padding := fsize % 16; padding > 0 {
rsize += 16 - padding
}
framebuf := make([]byte, rsize)
if _, err := io.ReadFull(rw.conn, framebuf); err != nil {
return msg, err
}

3. 패딩이 추가된 framebuf 사이즈만큼 데이터를 읽는다. 

// read and validate frame MAC. we can re-use headbuf for that.
rw.ingressMAC.Write(framebuf)
fmacseed := rw.ingressMAC.Sum(nil)
if _, err := io.ReadFull(rw.conn, headbuf[:16]); err != nil {
return msg, err
}
shouldMAC = updateMAC(rw.ingressMAC, rw.macCipher, fmacseed)
if !hmac.Equal(shouldMAC, headbuf[:16]) {
return msg, errors.New("bad frame MAC")
}

4.들어온 frame 데이터를 ingressMAC.Write와 Sum 업데이트를 위한 fmacseed를 만들어서 해싱하여 프레임버퍼 이후에 붙어있는16바이트를 받아와 인증/검증확인한 후 이상없으면 진행한다.

// decrypt frame content
rw.dec.XORKeyStream(framebuf, framebuf)

5. 들어온 frame데이터를 secrets.AES로 만들어진 dec에 의해 복호화 하여 저장

// decode message code
content := bytes.NewReader(framebuf[:fsize])
if err := rlp.Decode(content, &msg.Code); err != nil {
return msg, err
}
msg.Size = uint32(content.Len())
msg.Payload = content

6. rlp 디코딩하여 msg.Payload에 저장

// if snappy is enabled, verify and decompress message
if rw.snappy {
payload, err := ioutil.ReadAll(msg.Payload)
if err != nil {
return msg, err
}
size, err := snappy.DecodedLen(payload)
if err != nil {
return msg, err
}
if size > int(maxUint24) {
return msg, errPlainMessageTooLarge
}
payload, err = snappy.Decode(nil, payload)
if err != nil {
return msg, err
}
msg.Size, msg.Payload = uint32(size), bytes.NewReader(payload)
}
return msg, nil


7. snappy가 사용가능하면 압축을 풀어서 최종완성된 msg 를 리턴해준다.

Comments