관리 메뉴

HAMA 블로그

[이더리움 코어] DevP2P 소스코드 분석 (feat. golang) 본문

블록체인

[이더리움 코어] DevP2P 소스코드 분석 (feat. golang)

[하마] 이승현 (wowlsh93@gmail.com) 2018.07.11 13:34


서론

이전 글에서는 파이썬 기반으로 분석해 보았는데 이번에는 go-ethereum의 devp2p 를 대상으로 한다. 

이름이 나타내는 것처럼 go-ethereum은 구글에서 만든 go언어 기반인데, 

고 언어.....고 언어.... 코드리딩... 

계획대로 생활하며 모든것에 주기가 붙어 있는 군인의 삶을 쫒는게 쉬울까? 자유롭게 여행다니는 사람의 행적을 쫒는게 쉬울까?
golang 의 장점은 
컴파일속도,실행속도가 빠르고, 멀티코어를 활용하는데 있어서 언어자체적으로 쉽게 사용하도록 지원하는 것이 큰 장점이며, 로직을 구성하는데 있어서의 구현의 자유로움과 단순함(키워드가 별로 없다는 것을 장점으로 내세운다)인데, 역으로 그것이 코드리딩에 있어서는 오히려 굉장히 어려운 요소로 작동한다. 고 언어로 짜여진 코드는 객체지향언어 (자바,파이썬,C++) 로 짜여진 코드에 비해서 경계가 모호하며, 고 언어의 꽃인 고루틴,채널의 범벅으로 인한 이벤트 기반 코드이기에, 시리얼하게 코드를 읽는 습관을 지닌 사람에게는 맨붕 그 자체일 것이다.  (여담으로 자바스크립트가 두세수 위.) 

두가지 예를 들어보면 


첫째연관 짓는것의 단순함&자유로움 (설명에서 구조체,타입,객체를 정확히 구분 하지 않고 혼용하였습니다.)


// conn wraps a network connection with information gathered
// during the two handshakes.
type conn struct {
fd net.Conn (리모트 연결 파일 디스크립터)
transport
flags connFlag
cont chan error // The run loop uses cont to signal errors to SetupConn.
id discover.NodeID // valid after the encryption handshake
caps []Cap // valid after the protocol handshake (리모트가 가진 프로토콜 정보)
name string // valid after the protocol handshake
}

1. 이 코드를 보고 conn 이라는 타입이 어떤 인터페이스와 연관되어 있는지 알 수 없다. 알기 위해서는 이 타입이 구현하고 있는 메소드들을 찾아서 그 메소드 중에 혹시 어떤 인터페이스가 선언 해 둔 메소드가 있는지 확인 해야한다. 즉 자바가 implements 키워드를 사용하는 것 처럼 눈에 띄게 붙어있지 않다. go 는 덕타이핑으로 매우 유연하게 폴리모피즘을 지원 하지만 코드리딩엔 불리하다. 

2. 이 코드를 보고 conn 이라는 객체의 변수로 rlpx 객체의 변수와 메소드를 가지고 있으리 라는 것을 알 수 없다. 여기서 transport 는 rlpx 의 부모 인터페이스 라고 볼 수 있는데, 객체지향도 부모만 보고 자식을 바로 알 수 없는 것은 마찬가지긴 하지만, (즉 자식한테 가봐야 어떤 부모를 가졌는지 알 수 있다.) 문제는 고 언어에서는 자식인 rlpx 구조체로 가 봐도, 그게 transport 인터페이스 상속받은 객체라는 것을 바로 알 수 없다. 1번 처럼 확인을 해봐야 한다.

즉 어떤 타입(객체)가 이 놈도 될 수 있고, 저 놈도 될 수 있는 능력을 자유롭게 해 주고 있으니, 코드를 짤 때는 편할지라도, 읽을 경우는 여기 저기 다 살펴봐야한다. 

둘째쓰레드 생성과 이벤트 기반 코드를 작성하는 것의 단순함&자유로움 


// Server manages all peer connections.
type Server struct {
// Config fields may not be modified while the server is running.
Config

...

// These are for Peers, PeerCount (and nothing else).
peerOp chan peerOpFunc
peerOpDone chan struct{}

quit chan struct{}
addstatic chan *discover.Node
removestatic chan *discover.Node
posthandshake chan *conn
addpeer chan *conn
delpeer chan peerDrop
loopWG sync.WaitGroup // loop, listenLoop
peerFeed event.Feed
log log.Logger
}

for {
scheduleTasks()

select {
case <-srv.quit:
...
case n := <-srv.addstatic:
...
case n := <-srv.removestatic:
...
case op := <-srv.peerOp:
...
case t := <-taskdone:
...
case c := <-srv.posthandshake:
...
case c := <-srv.addpeer:
...
case pd := <-srv.delpeer:
...
}
}

타입(객체) 하나에 채널이 수도 없이 많다. 즉 다른 어떤 경량쓰레드(고루틴)에서 어떤 이벤트가 벌어지는 경우에 한해서 행동하는 방식으로 대부분의 코드가 짜여져 있기 때문에, 거리가 먼 행위들을 연관 지어서 상상 할 수 있는 능력이 더 필요해 진다. 고전적인 C,C++,JAVA 의 경우 코드를 이해하려면 비교적 이어지는 주변의 코드만 집중하면 되었다면, golang 의 경우 전체적으로 조망하는 능력이 더 필요하다고 볼 수 있다. 위에서 switch 처럼 생긴 select 문은 이벤트가 발생하길 기다리는 녀석이다. 이 처럼 go-ethereum의 코드는 생산자(이벤트발생)-소비자(이벤트소비)가 무지하게 복잡하게 얽혀있는 구조를 가진다. (그나마 다행인것은 언어 자체적으로 지원하는 chan 이라는 키워드 덕분에 비동기& 멀티 쓰레딩 코드이긴 해도 순서 관계가 명확하게 보이고 있다. 즉 Mutex 를 사용하지 않고 액터패턴처럼 구현되기 때문에  안정적이게 된다.  C++에 없던 interface 때문에 자바의 객체지향 설계가 더 명확해 지는 것처럼)

사전지식

소스를 온전히 이해하기 위한 많은 사전 지식이 있는데 나열해 보면 아래와 같다.

1. 고언어 (기본 문법/라이브러리 + 고루틴,채널에 대한 체득) 
2. 소켓통신 및 비동기 I/O , Multiplexing  개념
3. ECC 기술들(ECDSA,ECDH),대칭키,공개키,암호화해싱,서명 같은 암호화 기본 
4. Kademlia DHT 
5. RLP 인코딩/디코딩
6. NAT,홀펀칭,UPNP 개념

소스 분석 시작 

1. 조망 - 큰 그림으로 보기 


시작하면 노드를 만들고, 노드 안에서 p2p 패키지의 서버가 돌아간다. devp2p의 영역은 위의 그림에서 p2p 박스에 해당되며, 그 안에 노드 디스커버리와 Rlpx가 있다. 노드 디스커버리는 UDP 프로토콜을 이용하여 노드탐색에 이용되며, 탐색된 노드를 이용하여 실제 TCP 커넥션을 맺고, Encrypt 핸드쉐이크와 프로토콜 핸드쉐이크(위의 eth 프로토콜에 대한 정보 교환) 를 담당하며, 이후의 데이터 교환을 책임지는 역할은 Rlpx 박스에서 한다. 오른쪽의 eth 박스 부분에서는 실제 블록체인에서 하는 일에 대한 데이터 교환에 대한 로직을 담당하고 그것의 입력,출력을 왼쪽의 p2p를 이용하여 처리하게 된다. 지난 글에서 노드 디스커버리와 Rlpx에 대해서 대략 살펴봤기 때문에 이번 글에서는 주로 응용프로토콜과의 인터페이싱 부분(위 그림의 노란 화살표)에 대해서 살펴 볼 것이다.

 p2p 코어계층과 ethereum응용계층과의 관계를 조금 더 명확히 해보면, 이더리움이 시작되면 Node 객체가 생성되는데 

1. 생성되는 과정이 오른쪽인데 노드 내부에 이더리움 서비스가 생성된다. (이 말은 다른 서비스가 생성 될 수도 있다는 의미이다.) 이더리움 서비스는 내부에  프로토콜 매니저를 운용하는데 서브프로토콜을 가지고 있고, 각각 프로토콜의 규칙을 따라서 외부와 통신하기 위한 eth.Peer 객체가 생성 될 준비를 한다. 이 eth.Peer 객체는 아래 2번에서 외부peer와 연결이 되면 Run메소드로 만들어지며 기능을 하기 시작한다. * 이 라인으로는  Protocol 객체를 만든다고 생각하자. (상호 통신 위의  응용 메소드 규약)
  
2. 왼쪽의 p2p 코어층에서는 다른 소켓프로그램들이 그렇듯이 다른 peer 의 connection을 리스닝하고 있다가, conn 객체 (이것은 io 및 rlpx인코딩/디코딩을 책임진다) 와 protocols 인터페이스를 구현한 객체를 만들고 이 둘을 매개변수로 갖는 p2p.Peer 객체를 만든후에 최종적으로 위의 1번에서 eth.Peer 객체를 만들라고 요청하게 되고 서로간의 인터페이싱이 시작된다. 
* 이 라인의 포인트는  conn 객체를 만드는 것으로 생각하자. (상호통신 그 자체) 

2번 라인에서는 최종적으로 protoRW객체가 만들어지는데, 이 객체 안에는 1번 라인의 최종인 Protocol 객체와 2번라인의 최종인 conn객체를 가지고 있다. 이 protoRW는 Peer 객체 안에서 상호통신 작업을 대리한다.

2.  p2p 코어 계층 살펴보기

이번에는 eth 와 인터페이싱 될 p2p 코어 부분을 먼저 살펴보자. (1번 그림에서 Rlpx)

p2p.Peer 는  conn 과 protoRW 를 가지고 있는데. 각자 내부적으로 다른 구조체를 포함하며, 인터페이스를 상속받고 있다. 나중에 응용쪽에서도 peer 객체가 생성되는데, 그 객체는 내부적으로 p2p.peer 를 가지고 통신한다.

conn
- conn 는 순수한 TCP 통신 및 핸드쉐이킹에 촛점이 맞춰져 있고(최종적으로 모든 소켓쓰기,읽기는 이걸 통한다)
- conn 은 리모트 노드와 접속 시작되면 바로 rlpx 를 이용해서 doEnc..(),doProto..() 핸드쉐이킹을 먼저 한 후에, rlpxFrameRW 를 통해 read, writed 를 전담하게 된다. 참고로 devp2p 는 이전 글에서 살펴본 pydevp2p과 다르게 프로토콜 별 공평하게 분배하기 위한 framing 부분이 생략되있다.이것은 뒤에서 나오겠지만 프로토콜 매니저를 외부가 아니라 gth 패키지에서 관리하고, 서브프로토콜이 gth의 호환목적으로만 구성되는 것만 봐서도 왜 생략되었는지 유추 할 수 있다.

conn 객체가 생성되는 코드는 아래와 같다. 참고로 이후의 모든 소스는 주요 부분위주로 편집되었다.(p2p/server.go)

func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) error {
...
c := &conn{fd: fd, transport: srv.newTransport(fd), flags: flags, cont: make(chan error)}
err := srv.setupConn(c, flags, dialDest)

return err
}

fd 는 소켓 파일디스크립터이고, newTransprot 는 rlpx 객체이다. 즉 conn 은 rlpx를 이용한다.
SetupConn은 리모트와 접속이 되면 만들어지기 시작되며, srv.setupConn(...) 을 통해서 본격적인 핸드쉐이킹이 시작된다.
여기서 리모트와의 접속은 접속을 할 때(bootnodes 혹은 discovery알고리즘에 따라 발견된 node를 dial을 통해 접속)와 접속을 받을 때(llistenloop 를 통한) 2가지가 있다. 

protoRW
- protoRW 는 프로토콜 정보에 촛점이 맞춰져 있다.
- 응용쪽의 실제 프로토콜 로직에서 데이터를 쓸 경우, 이 protoRW 를 사용한다. 하지만 위에 말했듯이 이 protoRW 도 결국 내부적으로는 conn 을 가지고 있으며 (위 그림을 보면  공통적으로 MsgReadWrite 인터페이스를 상속받고 있는 것을 알 수 있다) 그것을 이용해 최종적인 소켓 입,출력을 하게 된다. 

protoRW 가 conn 을 소유하게 되는 코드는 아래와 같다. (p2p/peer.go)

func matchProtocols(protocols []Protocol, caps []Cap, rw MsgReadWriter) map[string]*protoRW {
....

outer:
for _, cap := range caps {
for _, proto := range protocols {
if proto.Name == cap.Name && proto.Version == cap.Version {
...
result[cap.Name] = &protoRW{Protocol: proto, offset: offset, in: make(chan Msg), w: rw}
...
}
}
}
return result
}

rw 매개변수는 conn 이며, protoRW 객체가 생성 될 때, 마지막에 w 에 할당되는 것을 볼 수 있다.
이 코드는 프로토타입 핸드쉐이크 과정에서 상대 프로토콜(코드에서 caps) 과 내가 가지고 있는 프로토콜(코드에서 protocols)의 이름과 버전을 맞춰보고서, 일치하면 protoRW를 만들어주는 로직이다. 

p2p.Peer 생성

func (srv *Server) listenLoop() {
...

for {
// Wait for a handshake slot before accepting.

fd, err = srv.listener.Accept()

p2p.Server 에서 외부노드에대해 Listening / Accept 를 하고 있다가

func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *discover.Node) error {
...
// Run the encryption handshake.
var err error
if c.id, err = c.doEncHandshake(srv.PrivateKey, dialDest); err != nil {
srv.log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)
return err
}
...
// Run the protocol handshake
phs, err := c.doProtoHandshake(srv.ourHandshake)
...
c.caps, c.name = phs.Caps, phs.Name
err = srv.checkpoint(c, srv.addpeer)
//
}

접속이 되면 conn 을 셋업하는데, Enc/Proto 핸드쉐이킹을 하여 서로간에 기본 정보를 교환한다.
모든게 잘되면 srv.addpeer 채널을 통해 conn 객체를 보낸다.ㄹㅈㄹ

func (srv *Server) run(dialstate dialer) {
...
running:
for {
scheduleTasks()

select {
case <-srv.quit:
...
case c := <-srv.posthandshake:
...
case c := <-srv.addpeer:
err := srv.protoHandshakeChecks(peers, inboundCount, c)
if err == nil {

p := newPeer(c, srv.Protocols)
...
go srv.runPeer(p)

}

 case c:=<-srv.addpeer: 를 통해 conn 객체를 받은 후에 newPeer(c, svr.Protocols) 로 객체를 만들고 내부적으로 peer.run() 을 실행 하는 srv.runPeer(p) 를 호출.  아래 코드가 peer.run() 이다. 


func (p *Peer) run() (remoteRequested bool, err error) {

go p.readLoop(readErr)
go p.pingLoop()

// Start all protocol handlers.
writeStart <- struct{}{}
p.startProtocols(writeStart, writeErr)
...
}

위 소스 중  startProtocols  내부에서 eth 응용계층의 eth.Peer 객체를 생성하는 proto.Run(p,rw)를 하는 모습을 볼수 있다.

func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error) {
for _, proto := range p.running {
...
go func() {
err := proto.Run(p, rw)
...
}()
}
}

3.  eth 응용 계층 살펴보기

위에서는 네트워킹의 기본이 되는, 소켓을 열고 핸드쉐이킹을 하고, 큰 맥락에서의 프로토콜을 매칭하였다. (큰 맥락이란 이름과 버전을 말한다. 예를들어 eth / 62). 근데 2번에서의 프로토콜을 매칭하기 위해 사용된 자신의 프로토콜 정보는 어디서 나왔을까? 그렇다 그게 여기 3번에서 살펴 볼 내용 중 하나이다. 이제 "자신이 소유한 프로토콜 을 생성하는 부분" "실제 로직에서 그 (eth) 프로토콜이 어떻게 p2p 패키지를 활용" 를 어떻게 하는지 살펴보자.

먼저 eth 객체가 생성되는 모습을 보자. 이것은 p2p.server 가 시작되기 이전에 실행 된다. 즉 다른 노드와 연결되기 이전에 이미 eth 객체가 생성되고, 자신이 가지고 있는 프로토콜 정보를 정리한다는 뜻이다. (app.go)

func RegisterEthService(stack *node.Node, cfg *eth.Config) {
...
err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
fullNode, err := eth.New(ctx, cfg)
if fullNode != nil && cfg.LightServ > 0 {
ls, _ := les.NewLesServer(fullNode, cfg)
fullNode.AddLesServer(ls)
}
return fullNode, err
})
....
}

노드에 서비스들을 등록 해주는데, eth 서비스를 등록해 주며 eth.New 생성함수를 통해 Ethereum 객체가 생성된다. (app.go)

func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
// 체인 디비를 만들고
chainDb, err := CreateDB(ctx, config, "chaindata") // 제네시스 블록을 세팅합니다.
chainConfig, genesisHash, genesisErr := core.SetupGenesisBlock(chainDb, config.Genesis)
// 이더리움 객체 생성.
eth := &Ethereum{
config: config,
chainDb: chainDb,
chainConfig: chainConfig,
eventMux: ctx.EventMux,
accountManager: ctx.AccountManager,
engine: CreateConsensusEngine(ctx, &config.Ethash, chainConfig, chainDb),
shutdownChan: make(chan bool),
networkId: config.NetworkId,
gasPrice: config.GasPrice,
etherbase: config.Etherbase,
bloomRequests: make(chan chan *bloombits.Retrieval),
bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks),
}

// 새로운 블록체인을 만들고
eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, eth.chainConfig, eth.engine, vmConfig)

// 블룸인덱서를 시작합니다.
eth.bloomIndexer.Start(eth.blockchain)

// 트랜잭션 풀을 만들고
eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain)

// 프로토콜 매니저를 생성합니다. !! 여기가 우리가 살펴볼 지점입니다. if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb); err != nil {
return nil, err
} // 채굴 객체도 생성해 줍니다.
eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine)
...

return eth, nil
}

Ethereum 객체를 생성해주는 생성자 함수이며, 매우 많은 것들이 여기서 시작됨을 알 수 있다. 그 많은 것들의 결과(트랜잭션,블록등)는 결국 p2p.peer를 통해서 외부와 소통 할 것입니다. (정확히는 p2p.peer가 가지고 있는 protoRW -> conn(rlpx)를 통해서) 

이제 우리가 포커싱을 맞춰야 하는 부분은 NewProtocolManager 이다. (
eth/handler.go)
 "자신이 소유한 프로토콜 을 생성하는 부분"<-- 이것이 바로 여기서 이루어 진다.

func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkId uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) {
// 프로토콜 매니저를 만들어 준다.
manager := &ProtocolManager{
networkId: networkId,
eventMux: mux,
txpool: txpool,
blockchain: blockchain,
chainconfig: config,
peers: newPeerSet(),
newPeerCh: make(chan *peer),
noMorePeers: make(chan struct{}),
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
}

// 서브 프로토콜을 할당하기 위한 배열을 초기화 한다.(현재 eth의 프로토콜 버전은 2개이다. 62,63)
manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
for i, version := range ProtocolVersions {
....
// 서브 프로토콜(eth62,eth63)을 초기화 해서 배열에 추가한다.
manager.SubProtocols = append(manager.SubProtocols, <--- 요기에 서브프로토콜이 들어 간다. ---> )

}

... // fetcher 객체를 만든다.
manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer)

return manager, nil
}

위에서 <---- 요기에 서브프로토콜이 들어 간다 --> 에 해당되는 코드가 아래에 있다.

// 프로토콜 매니저가 관리 할 프로토콜 객체를 만든다. 리모트 노드의 프로토콜 정보와 매칭 될 정보이다. p2p.Protocol{
Name: ProtocolName, // 비교될 프로토콜 이름. eth
Version: version, // 비교될 프로토콜 버전. 63,62
Length: ProtocolLengths[i], // 구현된 메세지의 숫자이다. 참고로 63는 17개, 62은 8개 // 나중에 핸드쉐이크가 끝나고, p2p.Peer 와 p2p.protoRW 객체가 매개변수로 들어 와 Run이 호출 되면서 // eth 쪽에 새로운 peer 객체에 포함되며 결합된다. 즉 eth 쪽 Peer 에서 p2p 코어쪽을 활용하게 된다.

Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
peer := manager.newPeer(int(version), p, rw)
select {
case manager.newPeerCh <- peer:
manager.wg.Add(1)
defer manager.wg.Done()
return manager.handle(peer) // <-- 이 코드!!!! (마지막으로 설명할 코드이다.)
case <-manager.quitSync:
return p2p.DiscQuitting
}
},
NodeInfo: func() interface{} {
return manager.NodeInfo()
},
PeerInfo: func(id discover.NodeID) interface{} {
if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
return p.Info()
}
return nil
},
}

위에서 프로토콜 길이를 보면 63버전은 18개고 62버전은 8개라고 나오는데, 블록헤더/블록바디/트랜잭션 관련된 데이터에 대한 요청과 처리에 관한 내용이다. 각각의 프로토콜은 아래와 같다. 63버전에서는 Receipts 등에 대한 요청이 추가 되었다.

// Protocol messages belonging to eth/62
StatusMsg = 0x00
NewBlockHashesMsg = 0x01
TxMsg = 0x02
GetBlockHeadersMsg = 0x03
BlockHeadersMsg = 0x04
GetBlockBodiesMsg = 0x05
BlockBodiesMsg = 0x06
NewBlockMsg = 0x07

// Protocol messages belonging to eth/63
GetNodeDataMsg = 0x0d
NodeDataMsg = 0x0e
GetReceiptsMsg = 0x0f
ReceiptsMsg = 0x10

이제 마지막으로 "실제 로직에서 eth 프로토콜이 어떻게 p2p 패키지를 활용" 부분에 대해서 살펴보자.

위에서 p2p.Protocol 객체가 만들어 지면서, 응용계층에 새로운 peer 객체를 만드는데, peer 객체가 만들어 졌다는 이벤트가 채널을 통해서 날라오면 (case manager.newPeerCh <- peer) 프로토콜 매니저는 그 peer 객체를 핸들링하기 시작한다.


return manager.handle(peer) //<-- 이 코드!!!! (마지막으로 설명할 코드이다.)

구체적인 코드로 step in ~

// eth 피어의 라이프 사이클을 관리하기 위한 콜백 함수를 핸들링 한다.
func (pm *ProtocolManager) handle(p *peer) error {

// Execute the Ethereum handshake
var (
genesis = pm.blockchain.Genesis()
head = pm.blockchain.CurrentHeader()
hash = head.Hash()
number = head.Number.Uint64()
td = pm.blockchain.GetTd(hash, number)
) // eth 정보(버전넘버,네트웤ID,Difficulties, head,genesis 블록에 관련된 정보)에 대한 핸드쉐이킹을 한다.
p.Handshake(pm.networkId, td, hash, genesis.Hash()); err != nil {
// p2p 코어(protoRW) 에 자신의 버전을 할당 해 둔다.
rw.Init(p.version)
// 특정 리모트와 연결된 peer 를 등록 해 둔다.
pm.peers.Register(p);
// 다운로드에도 등록 해 둔다.

pm.downloader.RegisterPeer(p.id, p.version, p); // 새로 만들어진 peer 객체에 대해 트랜잭션 동기화를 진행한다. (트랜잭션 전파등) // 여기서 트랜잭션에 관련된 데이터 가 만들어지면, 채널을 통해 전송 할 것이며, // 해당 채널에 대한 이벤트가 일어나길 기다리는 고루틴에서는 p2p 로 전송 할 것이다. (다음 소스 참고)
pm.syncTransactions(p)


// 들어오는 메세지에 대해서 핸들링 할 메인 루프
for {
pm.handleMsg(p)
}
}

위의 syncTranaction 과 같이 어떤 로직에 의해 데이터가 완성되면 채널을 통해 알려 주는데, 해당 채널에서 데이터가 오길 기다리는 소스는 아래와 같다. 아래 소스는 특히 트랜잭션 동기화를 위한 일을 전담하는 고루틴이다. (eth/sync.go, eth/peer.go)

// txsyncLoop takes care of the initial transaction sync for each new
// connection. When a new peer appears, we relay all currently pending
// transactions. In order to minimise egress bandwidth usage, we send
// the transactions in small packs to one peer at a time.
func (pm *ProtocolManager) txsyncLoop() {
....

// 패킷을 만들고 p2p 를 통해 전송 하는 로직
send := func(s *txsync) {
// Fill pack with transactions up to the target size.
size := common.StorageSize(0)
pack.p = s.p
pack.txs = pack.txs[:0]
for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ {
pack.txs = append(pack.txs, s.txs[i])
size += s.txs[i].Size()
}
// Remove the transactions that will be sent.
s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])]
if len(s.txs) == 0 {
delete(pending, s.p.ID())
}

s.p.Log().Trace("Sending batch of transactions", "count", len(pack.txs), "bytes", size)
sending = true
go func() { done <- pack.p.SendTransactions(pack.txs) }() // 전송
}

for {
select {
case s := <-pm.txsyncCh: // 새로운 트랜잭션이 만들어지면
pending[s.p.ID()] = s
if !sending {
send(s) // 패킷을 만들어서 p2p 를 통해 전송한다.
}
...
}
}

func (p *peer) SendTransactions(txs types.Transactions) error {
for _, tx := range txs {
p.knownTxs.Add(tx.Hash())
}
return p2p.Send(p.rw, TxMsg, txs) // p2p 를 통한 전송 !!!
}

마지막으로 아래 코드는 들어오는 데이터에 대한 메세지 핸들링 코드이다. (eth/handle.go)
프로토타입 메세지에 따라서 분기되어 처리되고 있다. 이 부분은P2P랑은 상관없는 부분이니 대충 보자.


// handleMsg is invoked whenever an inbound message is received from a remote
// peer. The remote connection is torn down upon returning any error.
func (pm *ProtocolManager) handleMsg(p *peer) error {
// Read the next message from the remote peer, and ensure it's fully consumed
msg, err := p.rw.ReadMsg()
if err != nil {
return err
}
if msg.Size > ProtocolMaxMsgSize {
return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
}
defer msg.Discard()

// Handle the message depending on its contents
switch {
case msg.Code == StatusMsg:
// Status messages should never arrive after the handshake
return errResp(ErrExtraStatusMsg, "uncontrolled status message")

// Block header query, collect the requested headers and reply
case msg.Code == GetBlockHeadersMsg:
// Decode the complex header query
var query getBlockHeadersData
if err := msg.Decode(&query); err != nil {
return errResp(ErrDecode, "%v: %v", msg, err)
}
hashMode := query.Origin.Hash != (common.Hash{})
first := true
maxNonCanonical := uint64(100)

// Gather headers until the fetch or network limits is reached
var (
bytes common.StorageSize
headers []*types.Header
unknown bool
)
for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit && len(headers) < downloader.MaxHeaderFetch {
// Retrieve the next header satisfying the query
var origin *types.Header
if hashMode {
if first {
first = false
origin = pm.blockchain.GetHeaderByHash(query.Origin.Hash)
if origin != nil {
query.Origin.Number = origin.Number.Uint64()
}
} else {
origin = pm.blockchain.GetHeader(query.Origin.Hash, query.Origin.Number)
}
} else {
origin = pm.blockchain.GetHeaderByNumber(query.Origin.Number)
}
if origin == nil {
break
}
headers = append(headers, origin)
bytes += estHeaderRlpSize

// Advance to the next header of the query
switch {
case hashMode && query.Reverse:
// Hash based traversal towards the genesis block
ancestor := query.Skip + 1
if ancestor == 0 {
unknown = true
} else {
query.Origin.Hash, query.Origin.Number = pm.blockchain.GetAncestor(query.Origin.Hash, query.Origin.Number, ancestor, &maxNonCanonical)
unknown = (query.Origin.Hash == common.Hash{})
}
case hashMode && !query.Reverse:
// Hash based traversal towards the leaf block
var (
current = origin.Number.Uint64()
next = current + query.Skip + 1
)
if next <= current {
infos, _ := json.MarshalIndent(p.Peer.Info(), "", " ")
p.Log().Warn("GetBlockHeaders skip overflow attack", "current", current, "skip", query.Skip, "next", next, "attacker", infos)
unknown = true
} else {
if header := pm.blockchain.GetHeaderByNumber(next); header != nil {
nextHash := header.Hash()
expOldHash, _ := pm.blockchain.GetAncestor(nextHash, next, query.Skip+1, &maxNonCanonical)
if expOldHash == query.Origin.Hash {
query.Origin.Hash, query.Origin.Number = nextHash, next
} else {
unknown = true
}
} else {
unknown = true
}
}
case query.Reverse:
// Number based traversal towards the genesis block
if query.Origin.Number >= query.Skip+1 {
query.Origin.Number -= query.Skip + 1
} else {
unknown = true
}

case !query.Reverse:
// Number based traversal towards the leaf block
query.Origin.Number += query.Skip + 1
}
}
return p.SendBlockHeaders(headers)

case msg.Code == BlockHeadersMsg:
// A batch of headers arrived to one of our previous requests
var headers []*types.Header
if err := msg.Decode(&headers); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
// If no headers were received, but we're expending a DAO fork check, maybe it's that
if len(headers) == 0 && p.forkDrop != nil {
// Possibly an empty reply to the fork header checks, sanity check TDs
verifyDAO := true

// If we already have a DAO header, we can check the peer's TD against it. If
// the peer's ahead of this, it too must have a reply to the DAO check
if daoHeader := pm.blockchain.GetHeaderByNumber(pm.chainconfig.DAOForkBlock.Uint64()); daoHeader != nil {
if _, td := p.Head(); td.Cmp(pm.blockchain.GetTd(daoHeader.Hash(), daoHeader.Number.Uint64())) >= 0 {
verifyDAO = false
}
}
// If we're seemingly on the same chain, disable the drop timer
if verifyDAO {
p.Log().Debug("Seems to be on the same side of the DAO fork")
p.forkDrop.Stop()
p.forkDrop = nil
return nil
}
}
// Filter out any explicitly requested headers, deliver the rest to the downloader
filter := len(headers) == 1
if filter {
// If it's a potential DAO fork check, validate against the rules
if p.forkDrop != nil && pm.chainconfig.DAOForkBlock.Cmp(headers[0].Number) == 0 {
// Disable the fork drop timer
p.forkDrop.Stop()
p.forkDrop = nil

// Validate the header and either drop the peer or continue
if err := misc.VerifyDAOHeaderExtraData(pm.chainconfig, headers[0]); err != nil {
p.Log().Debug("Verified to be on the other side of the DAO fork, dropping")
return err
}
p.Log().Debug("Verified to be on the same side of the DAO fork")
return nil
}
// Irrelevant of the fork checks, send the header to the fetcher just in case
headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now())
}
if len(headers) > 0 || !filter {
err := pm.downloader.DeliverHeaders(p.id, headers)
if err != nil {
log.Debug("Failed to deliver headers", "err", err)
}
}

case msg.Code == GetBlockBodiesMsg:
// Decode the retrieval message
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
if _, err := msgStream.List(); err != nil {
return err
}
// Gather blocks until the fetch or network limits is reached
var (
hash common.Hash
bytes int
bodies []rlp.RawValue
)
for bytes < softResponseLimit && len(bodies) < downloader.MaxBlockFetch {
// Retrieve the hash of the next block
if err := msgStream.Decode(&hash); err == rlp.EOL {
break
} else if err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
// Retrieve the requested block body, stopping if enough was found
if data := pm.blockchain.GetBodyRLP(hash); len(data) != 0 {
bodies = append(bodies, data)
bytes += len(data)
}
}
return p.SendBlockBodiesRLP(bodies)

case msg.Code == BlockBodiesMsg:
// A batch of block bodies arrived to one of our previous requests
var request blockBodiesData
if err := msg.Decode(&request); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
// Deliver them all to the downloader for queuing
transactions := make([][]*types.Transaction, len(request))
uncles := make([][]*types.Header, len(request))

for i, body := range request {
transactions[i] = body.Transactions
uncles[i] = body.Uncles
}
// Filter out any explicitly requested bodies, deliver the rest to the downloader
filter := len(transactions) > 0 || len(uncles) > 0
if filter {
transactions, uncles = pm.fetcher.FilterBodies(p.id, transactions, uncles, time.Now())
}
if len(transactions) > 0 || len(uncles) > 0 || !filter {
err := pm.downloader.DeliverBodies(p.id, transactions, uncles)
if err != nil {
log.Debug("Failed to deliver bodies", "err", err)
}
}

case p.version >= eth63 && msg.Code == GetNodeDataMsg:
// Decode the retrieval message
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
if _, err := msgStream.List(); err != nil {
return err
}
// Gather state data until the fetch or network limits is reached
var (
hash common.Hash
bytes int
data [][]byte
)
for bytes < softResponseLimit && len(data) < downloader.MaxStateFetch {
// Retrieve the hash of the next state entry
if err := msgStream.Decode(&hash); err == rlp.EOL {
break
} else if err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
// Retrieve the requested state entry, stopping if enough was found
if entry, err := pm.blockchain.TrieNode(hash); err == nil {
data = append(data, entry)
bytes += len(entry)
}
}
return p.SendNodeData(data)

case p.version >= eth63 && msg.Code == NodeDataMsg:
// A batch of node state data arrived to one of our previous requests
var data [][]byte
if err := msg.Decode(&data); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
// Deliver all to the downloader
if err := pm.downloader.DeliverNodeData(p.id, data); err != nil {
log.Debug("Failed to deliver node state data", "err", err)
}

case p.version >= eth63 && msg.Code == GetReceiptsMsg:
// Decode the retrieval message
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
if _, err := msgStream.List(); err != nil {
return err
}
// Gather state data until the fetch or network limits is reached
var (
hash common.Hash
bytes int
receipts []rlp.RawValue
)
for bytes < softResponseLimit && len(receipts) < downloader.MaxReceiptFetch {
// Retrieve the hash of the next block
if err := msgStream.Decode(&hash); err == rlp.EOL {
break
} else if err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
// Retrieve the requested block's receipts, skipping if unknown to us
results := pm.blockchain.GetReceiptsByHash(hash)
if results == nil {
if header := pm.blockchain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash {
continue
}
}
// If known, encode and queue for response packet
if encoded, err := rlp.EncodeToBytes(results); err != nil {
log.Error("Failed to encode receipt", "err", err)
} else {
receipts = append(receipts, encoded)
bytes += len(encoded)
}
}
return p.SendReceiptsRLP(receipts)

case p.version >= eth63 && msg.Code == ReceiptsMsg:
// A batch of receipts arrived to one of our previous requests
var receipts [][]*types.Receipt
if err := msg.Decode(&receipts); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
// Deliver all to the downloader
if err := pm.downloader.DeliverReceipts(p.id, receipts); err != nil {
log.Debug("Failed to deliver receipts", "err", err)
}

case msg.Code == NewBlockHashesMsg:
var announces newBlockHashesData
if err := msg.Decode(&announces); err != nil {
return errResp(ErrDecode, "%v: %v", msg, err)
}
// Mark the hashes as present at the remote node
for _, block := range announces {
p.MarkBlock(block.Hash)
}
// Schedule all the unknown hashes for retrieval
unknown := make(newBlockHashesData, 0, len(announces))
for _, block := range announces {
if !pm.blockchain.HasBlock(block.Hash, block.Number) {
unknown = append(unknown, block)
}
}
for _, block := range unknown {
pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)
}

case msg.Code == NewBlockMsg:
// Retrieve and decode the propagated block
var request newBlockData
if err := msg.Decode(&request); err != nil {
return errResp(ErrDecode, "%v: %v", msg, err)
}
request.Block.ReceivedAt = msg.ReceivedAt
request.Block.ReceivedFrom = p

// Mark the peer as owning the block and schedule it for import
p.MarkBlock(request.Block.Hash())
pm.fetcher.Enqueue(p.id, request.Block)

// Assuming the block is importable by the peer, but possibly not yet done so,
// calculate the head hash and TD that the peer truly must have.
var (
trueHead = request.Block.ParentHash()
trueTD = new(big.Int).Sub(request.TD, request.Block.Difficulty())
)
// Update the peers total difficulty if better than the previous
if _, td := p.Head(); trueTD.Cmp(td) > 0 {
p.SetHead(trueHead, trueTD)

// Schedule a sync if above ours. Note, this will not fire a sync for a gap of
// a singe block (as the true TD is below the propagated block), however this
// scenario should easily be covered by the fetcher.
currentBlock := pm.blockchain.CurrentBlock()
if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 {
go pm.synchronise(p)
}
}

case msg.Code == TxMsg:
// Transactions arrived, make sure we have a valid and fresh chain to handle them
if atomic.LoadUint32(&pm.acceptTxs) == 0 {
break
}
// Transactions can be processed, parse all of them and deliver to the pool
var txs []*types.Transaction
if err := msg.Decode(&txs); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
for i, tx := range txs {
// Validate and mark the remote transaction
if tx == nil {
return errResp(ErrDecode, "transaction %d is nil", i)
}
p.MarkTransaction(tx.Hash())
}
pm.txpool.AddRemotes(txs)

default:
return errResp(ErrInvalidMsgCode, "%v", msg.Code)
}
return nil
}

지난 "DevP2P 소스코드 분석  (feat. python)" 글 에서는 노드 디스커버리와 Rlpx 에 대해서 주로 살펴봤다면. 이번 글에서는 eth 응용단 프로토콜과 Rlpx간의 인터페이싱에 대해서 알아 보았습니다. 아마도 여유가 생긴다면 다음 글에서는 블록전송/싱크에 관한 로직을 살펴보고 마지막으로는 SWARM 등의 또 다른 서비스 프로토콜에 대한 로직을 살펴보는 순서대로 글을 작성 할 거 같네요. 

코드가 포함된 긴 글 읽으시느라고 고생하셨습니다.



0 Comments
댓글쓰기 폼