관리 메뉴

HAMA 블로그

[이더리움에서 배우는 Go언어] 1급함수활용과 채널 본문

Go

[이더리움에서 배우는 Go언어] 1급함수활용과 채널

[하마] 이승현 (wowlsh93@gmail.com) 2018. 12. 10. 11:53



1. [이더리움에서 배우는 Go언어]   chan chan 이란?
2. [이더리움에서 배우는 Go언어]   1급함수활용과 채널
3. 
[이더리움에서 배우는 Go언어]  nat 옵션 이야기 - (1)



 1급함수활용과 채널

지난 글에서는 채널에 채널을 보내는 chan chan 에 대해서 배워 보았는데요, 기억이 안난다면 이야기가 이렇습니다. 내가 (소비자) 먹을 준비됬으면 나의 주소와 함께 알려드릴께요. 당신은 (생산자) 만들면 내 주소로 피자를 보내주세요.
즉 생산자 주도적이 아니라, 소비자 주도적이다. 

이번 글에서도 채널과 관련된 이야기를 해 보겠습니다. 이번에는 채널에 함수를 보내는 방식입니다. 이야기는 이렇습니다.
내가 피자만드는 방식(함수)를 알려줄께요. 그 방식대로 피자를 만들어서 보내주세요. 주소는 피자만드는 방식(함수) 끝에 적혀져있답니다.

먼가 감이 잡히시나요? 안잡혀도 괜찮습니다. 이제부터 글보다 익숙한 코드로 배워 볼 것이니까요. 혹시나 채널과 고루틴 자체를 모르신다면 위에 링크 글이나 구글에서 검색해서 알아보시길 바랍니다. 약간의 이해는 필요합니다.

먼저 이더리움에서 어떻게 사용되었는지 코드를 보고 진도를 나가겠습니다.(쌍괄식?)

type peerOpFunc func(map[discover.NodeID]*Peer) .... srv.peerOp = make(chan peerOpFunc) .... // PeerCount returns the number of connected peers.
func (srv *Server) PeerCount() int {
var count int
select {
case srv.peerOp <- func(ps map[discover.NodeID]*Peer) { count = len(ps) }:
<-srv.peerOpDone
case <-srv.quit:
}
return count
}

간단히 소스를 설명하자면 먼저 NodeID와 Peer포인터를 쌍으로 갖는 맵을 매개변수로 하는 함수를 하나의 타입으로 선언했으며(peerOpFunc), 그것에 대한 채널(srv.peerOp)을 만들었습니다. 그리고 Peer의 갯수를 세는 함수에서 해당 채널에 직접 구현된 함수 func(ps map[discover.NodeID]*Peer) { count = len(ps) }:를 넘기고 있습니다. 

case op := <-srv.peerOp:
op(peers)
srv.peerOpDone <- struct{}{}

넘겨진 함수는 run이라는 함수내에서 받아서 위와 같이 처리(보내진 함수를 호출) 합니다. 이런 핑퐁관계는 파이썬,자바스크립트나 최근 코틀린에서도 지원하고 있는 코루틴과 비슷하며, 함수내에서 무한loop를 돌면서 다양한 이벤트를 기다렸다가 해당되는 이벤트에 맞게 처리되는 모습은 일견 액터패턴도 유추시킵니다. 액터패턴은 하나의 큐에서 넘어오는 일련의 명령(+데이터)들을 기다렸다가 처리한다 치면 위의 코드에서는 select를 통해서 펼쳐놓은 모습입니다. 어쨋거나 둘 다 직접 락을 거는 Mutex 식의 사용을 지양하고 쓰레드들의 동기 처리를 보다 분명하게 에러가 없게 처리하기 위한 방식인거죠. 

자 그럼 왜 이렇게 함수를 넘겨서 처리 하는 걸까요? 이제 부터 살펴보겠습니다. 


const (
OP_ADD = 1 << iota
OP_SUB
OP_MUL
)

type Calculator struct {
acc float64
}


func (c *Calculator) Do(op int, v float64) float64 {
switch op {
case OP_ADD:
c.acc += v
case OP_SUB:
c.acc -= v
case OP_MUL:
c.acc *= v
default:
panic("unhandled operation")
}
return c.acc
}

func main() {
var c Calculator
fmt.Println(c.Do(OP_ADD, 100)) // 100
fmt.Println(c.Do(OP_SUB, 50)) // 50
fmt.Println(c.Do(OP_MUL, 2)) // 100
}

위에 간단한 계산기의 코드가 있습니다.
메소드 Do에는 더하기,빼기,곱하기가 모두 정의 되어 있구요. 너무 많은 것을 하는것 처럼 보입니다.
추가적으로 제곱하기,평균내기 등이 들어간다면 더욱 더 하나의 함수가 복잡해 질 거 같습니다. 

개인적으로 코딩 할 때 간단히 2가지 법칙을 염두해 두는데요. ( SOLID,DRY법칙처럼 내가 만들었습니다 ㅎ)
1. 간단 책임의 원칙 :  함수(객체)는 최대한 심플하게 하자. 책임을 적게 지자.
2. 책임 전가의 원칙:  책임은 최대한 외부에 전가시키자. 주입받도록 하자.

근데 솔직히 자신이 모두 다 짜고,개발하며,한번 만든거 이상의 추가 업데이트가 있을 가능성이 적다? 하면 앞으로 나올 내용 처럼 생쇼 안부리고 직관적이게 코딩하는게 더 좋습니다. 마술을 부리거나 지나친 패턴사용으로 오버엔지니어링 되는게 더 문제겠지요.

Do는 무엇을 받아서 실행만 시키게 하면 안될 까요? 즉 책임을 적게 가지고 가자는 겁니다. 
Go에서의 방식은 아래와 같은데요. 


type Calculator struct {
acc float64
}

type opfunc func(float64, float64) float64

func (c *Calculator) Do(op opfunc, v float64) float64 {
c.acc = op(c.acc, v)
return c.acc
}

func Add(a, b float64) float64 { return a + b }
func Sub(a, b float64) float64 { return a - b }
func Mul(a, b float64) float64 { return a * b }

func main() {
var c Calculator
fmt.Println(c.Do(Add, 5)) // 5
fmt.Println(c.Do(Sub, 3)) // 2
fmt.Println(c.Do(Mul, 8)) // 16
}

이렇게 어떤 역할을 하는 함수 자체를 Do의 매개변수로 넣는 방식입니다.  즉 매개변수 2개를 받아서 float64타입을 리턴하는  함수타입이면 모두 ㅇㅋ 입니다.  이제 Do는 그냥 실행만 하면 땡이네요. 굉장히 간단해 졌으며, 추가적인 계산방식이 생기면 외부에서 만들어서 주입해주면 됩니다. 좀 더 유연해 졌다고 볼 수 있습니다. 


func Sqrt(n, _ float64) float64 {
return math.Sqrt(n)
}

func main() {
var c Calculator
c.Do(Add, 16)
c.Do(Sqrt, 0) // operand ignored
}

제곱(sqrt) 로직 추가시에는  쓸때없는 매개변수를 하나 더 넣어 줘야하는 상황이 됩니다. 이런 경우는 


func Add(n float64) func(float64) float64 {
return func(acc float64) float64 {
return acc + n
}
}

func Sqrt() func(float64) float64 {
return func(n float64) float64 {
return math.Sqrt(n)
}
}

func (c *Calculator) Do(op func(float64) float64) float64 {
c.acc = op(c.acc)
return c.acc
}
func main() {
var c Calculator
c.Do(Add(2))
c.Do(Sqrt()) // 1.41421356237
}

이런식으로 커링(부분함수)식으로 처리하면 됩니다. 공통 타입의 함수타입을 만들기 위해서 겉에 하나의 함수를 더 감싸는거죠. 겉 함수에서는 리턴 될 내부 함수에서 사용 될 매개변수를 미리 넘겨주게 됩니다. 
자 이제 1급함수의 역할을 2가지로 맛보기 했는데요 (매개변수로써의 함수, 리턴값으로써의 함수) 


이제 1급함수를 채널에서 사용해 보겠습니다.
먼저 채널을 어떤 경우에 사용하는지 잠시 살펴보겠습니다. 


type Mux struct {
mu sync.Mutex
conns map[net.Addr]net.Conn
}

func (m *Mux) Add(conn net.Conn) {
m.mu.Lock()
defer m.mu.Unlock()
m.conns[conn.RemoteAddr()] = conn
}

func (m *Mux) Remove(addr net.Addr) {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.conns, addr)
}
}

conns 맵이 여러 쓰레드에서 한번에 사용되면, 접근 경쟁에 의한 버그가 발생할 위험이 생기겠지요?
따라서 소스에서 처럼 Mutex로 진입시 락을 걸어서 혼자 만 사용하겠다고 알려줘야 합니다.
이렇게 직접 락을 걸면서 코딩을 하면 동시성 프로그래밍에서 알기 힘든 버그를 만들수 있다고 구루들이 경고하잖아요.
그래서 다양한 동시성패턴들이 생겨났는데, Go언어에서는 자신(코루틴) 이 건드리고, 남(다른 코루틴)에게 패스하고 그 패스를 받은 사람이 또 건드리고, 패스하고 이런 전략을 사용합니다.직접 메모리를 공유하지말고 서로 패스를  통해서 공유하자는 겁니다. (무엇인가 액션을 던진다고 생각하면 액터패턴과도 비슷합니다. 간단히 말해 그걸 넣고 받는게 큐면 액터가 되는 것이고, 채널이면 고루틴이 되는 것이죠.)

아래 소스를 보겠습니다.


type Mux struct {
add chan net.Conn
remove chan net.Addr
sendMsg chan string
}

func (m *Mux) Add(conn net.Conn) {
m.add <- conn
}

func (m *Mux) Remove(addr net.Addr) {
m.remove <- addr
}

func (m *Mux) SendMsg(msg string) error {
m.sendMsg <- msg
return nil
}

func (m *Mux) loop() {
conns := make(map[net.Addr]net.Conn)
for {
select {
case conn := <-m.add:
m.conns[conn.RemoteAddr()] = conn
case addr := <-m.remove:
delete(m.conns, addr)
case msg := <-m.sendMsg:
for _, conn := range m.conns {
io.WriteString(conn, msg)
}
}
}
}

Add,Remove,SendMsg 각각의 채널을 만들고, 해당 함수에서는 해당 로직을 수행하길 바란다고 이벤트(+데이터) 를 패스하기만 합니다. 그러면 패스를 받은 loop메소드에서는 (이게 액터역할을 하지요) 어떤 후보 이벤트가 발생할 때까지 select 에서 대기하고 있다가 해당 이벤트가 발생하면 그것을 처리해 줍니다.

m.add <- conn

이것은 m.add채널에 conn이라는 이벤트(+데이터)를 패스하는 것이고

case conn := <-m.add:

이것은 m.add채널에서 conn이라는 이벤트(+데이터)를 받는 것이겠지요.

근데 처음에 Calculator 코드를 보면 Do안에서 모든것을 다 처리해서 Do는 그냥 로직을 외부에서 주입받는 방식으로 바꾼거 기억하시나요? 기억안나면 위로 올라가서 다시 확인하세요.
여기도 마찬가지로 loop함수에서 모든것을 다 하는게 싫습니다. loop함수도 그냥 주입받은 로직을 처리하고 싶어지는데요
이때 채널에 함수를 패스하게 됩니다. 아래 처럼 말이죠.

func (m *Mux) Add(conn net.Conn) {
m.ops <- func(m map[net.Addr]net.Conn) {
m[conn.RemoteAddr()] = conn
}
}

자 이제 왜 이더리움에서 저런식의 코드를 짯는지 이해가 되시나요? (올라가지 마시라고 아래 다시 적어봤습니다) 

// PeerCount returns the number of connected peers.
func (srv *Server) PeerCount() int {
var count int
select {
case srv.peerOp <- func(ps map[discover.NodeID]*Peer) { count = len(ps) }:
<-srv.peerOpDone
case <-srv.quit:
}
return count
}


func (srv *Server) run(dialstate dialer) {

    running:
for {
...
select {
    case <-srv.quit:
     break running
    case n := <-srv.addstatic:
    ...
     case n := <-srv.removestatic:
     ...
     case op := <-srv.peerOp:
     // This channel is used by Peers and PeerCount.
     op(peers)
    srv.peerOpDone <- struct{}{}
    ...
    }

}

위의 run 함수에서 모든것을 처리하기 싫어서 입니다. 로직은 외부에서 주입받고 싶은것이죠.
소스에 대한 설명을 하자면 PeerCount 함수에서 피어의 갯수를 세는 로직을 만들어서 다른 쓰레드에게  srv.peerOp <- func(ps map[discover.NodeID]*Peer) { count = len(ps) }: 이렇게 보내주고 기다리면, 다른 쓰레드에서는 op(peers)처럼 실행만 한뒤에 다 했다고 peerOpDone <- struct{}{} 이렇게 익명구조체 생성하여 채널로 보내서 알려줍니다. 그러면 PeerCount에서는 <-srv.peerOpDone 이렇게 기다리다가 종료하고 있습니다 . 

근데 안타깝게도 실제 이더리움소스에서 모든것을 주입받는 것은 아닙니다. run안에서는 그냥 이벤트만 호출받고 그에 해당하는 로직을 처리하거나 위임 처리 한 후에 또 다른 채널에 패스를 하기도 합니다. 실전은 항상 단순하진 않지요.

마지막으로 해당 run전체 함수를 보여드리면서 글을 끝마치겠습니다.


func (srv *Server) run(dialstate dialer) {
defer srv.loopWG.Done()
var (
peers = make(map[discover.NodeID]*Peer)
inboundCount = 0
trusted = make(map[discover.NodeID]bool, len(srv.TrustedNodes))
taskdone = make(chan task, maxActiveDialTasks)
runningTasks []task
queuedTasks []task // tasks that can't run yet
)
// Put trusted nodes into a map to speed up checks.
// Trusted peers are loaded on startup and cannot be
// modified while the server is running.
for _, n := range srv.TrustedNodes {
trusted[n.ID] = true
}

// removes t from runningTasks
delTask := func(t task) {
for i := range runningTasks {
if runningTasks[i] == t {
runningTasks = append(runningTasks[:i], runningTasks[i+1:]...)
break
}
}
}
// starts until max number of active tasks is satisfied
startTasks := func(ts []task) (rest []task) {
i := 0
for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ {
t := ts[i]
srv.log.Trace("New dial task", "task", t)
go func() { t.Do(srv); taskdone <- t }()
runningTasks = append(runningTasks, t)
}
return ts[i:]
}
scheduleTasks := func() {
// Start from queue first.
queuedTasks = append(queuedTasks[:0], startTasks(queuedTasks)...)
// Query dialer for new tasks and start as many as possible now.
if len(runningTasks) < maxActiveDialTasks {
nt := dialstate.newTasks(len(runningTasks)+len(queuedTasks), peers, time.Now())
queuedTasks = append(queuedTasks, startTasks(nt)...)
}
}

running:
for {
scheduleTasks()

select {
case <-srv.quit:
// The server was stopped. Run the cleanup logic.
break running
case n := <-srv.addstatic:
// This channel is used by AddPeer to add to the
// ephemeral static peer list. Add it to the dialer,
// it will keep the node connected.
srv.log.Trace("Adding static node", "node", n)
dialstate.addStatic(n)
case n := <-srv.removestatic:
// This channel is used by RemovePeer to send a
// disconnect request to a peer and begin the
// stop keeping the node connected
srv.log.Trace("Removing static node", "node", n)
dialstate.removeStatic(n)
if p, ok := peers[n.ID]; ok {
p.Disconnect(DiscRequested)
}
case op := <-srv.peerOp:
// This channel is used by Peers and PeerCount.
op(peers)
srv.peerOpDone <- struct{}{}
case t := <-taskdone:
// A task got done. Tell dialstate about it so it
// can update its state and remove it from the active
// tasks list.
srv.log.Trace("Dial task done", "task", t)
dialstate.taskDone(t, time.Now())
delTask(t)
case c := <-srv.posthandshake:
// A connection has passed the encryption handshake so
// the remote identity is known (but hasn't been verified yet).
if trusted[c.id] {
// Ensure that the trusted flag is set before checking against MaxPeers.
c.flags |= trustedConn
}
// TODO: track in-progress inbound node IDs (pre-Peer) to avoid dialing them.
select {
case c.cont <- srv.encHandshakeChecks(peers, inboundCount, c):
case <-srv.quit:
break running
}
case c := <-srv.addpeer:
// At this point the connection is past the protocol handshake.
// Its capabilities are known and the remote identity is verified.
err := srv.protoHandshakeChecks(peers, inboundCount, c)
if err == nil {
// The handshakes are done and it passed all checks.
p := newPeer(c, srv.Protocols)
// If message events are enabled, pass the peerFeed
// to the peer
if srv.EnableMsgEvents {
p.events = &srv.peerFeed
}
name := truncateName(c.name)
srv.log.Debug("Adding p2p peer", "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1)
go srv.runPeer(p)
peers[c.id] = p
if p.Inbound() {
inboundCount++
}
}
// The dialer logic relies on the assumption that
// dial tasks complete after the peer has been added or
// discarded. Unblock the task last.
select {
case c.cont <- err:
case <-srv.quit:
break running
}
case pd := <-srv.delpeer:
// A peer disconnected.
d := common.PrettyDuration(mclock.Now() - pd.created)
pd.log.Debug("Removing p2p peer", "duration", d, "peers", len(peers)-1, "req", pd.requested, "err", pd.err)
delete(peers, pd.ID())
if pd.Inbound() {
inboundCount--
}
}
}

srv.log.Trace("P2P networking is spinning down")

// Terminate discovery. If there is a running lookup it will terminate soon.
if srv.ntab != nil {
srv.ntab.Close()
}
if srv.DiscV5 != nil {
srv.DiscV5.Close()
}
// Disconnect all peers.
for _, p := range peers {
p.Disconnect(DiscQuitting)
}
// Wait for peers to shut down. Pending connections and tasks are
// not handled here and will terminate soon-ish because srv.quit
// is closed.
for len(peers) > 0 {
p := <-srv.delpeer
p.log.Trace("<-delpeer (spindown)", "remainingTasks", len(runningTasks))
delete(peers, p.ID())
}
}

레퍼런스:
https://dave.cheney.net/2016/11/13/do-not-fear-first-class-functions
https://github.com/ethereum/go-ethereum/blob/master/p2p/server.go

Comments