관리 메뉴

HAMA 블로그

파이썬 코딩으로 말하는 데이터 분석 - 8. HMM 학습문제 (Baum-Welch 알고리즘) 본문

통계 & 머신러닝 & 딥러닝

파이썬 코딩으로 말하는 데이터 분석 - 8. HMM 학습문제 (Baum-Welch 알고리즘)

[하마] 이승현 (wowlsh93@gmail.com) 2017. 4. 13. 19:00


순서 

1. 통계 - 카운팅,min,max,평균,중앙값,산포도,분산,편차,공분산,상관관계 

2. 가설과 추론 (베이지언 - 사후확률,우도) 

3. 군집화 (K-Means)

4. 연관 (Apriori)

5. 함수형으로 데이터 다루기 

6.경사하강법

7. 회귀분석

8. 은닉 마코프법 (HMM) 

9. k-NN

10. DTW 


HMM  (은닉마코프법) 

이번 글은  HMM에 대해서 대략 알고 있는 상태에서 , HMM 의 3가지 문제중 가장 고난이도인 학습문제에 대해 코드로 알아보고자 한다. 

HMM 자체에 대해서 알고 싶은 분은 , 오일석 저자의 "패턴인식" 을 추천한다. 이 책은 진짜 설명을 해주려고 애쓰는 모습이 느껴지는 책이고,그 책으로 어느정도 이해한 후에 그에 파생되는 연구(MHMM, FHMM, CFHSMM 등) 에 대한 논문들을 살펴보면 될거 같다.

아래 링크의 글 또한 매우 자세히 HMM 에 대해서 설명하는 글이니 참고하시라~

http://article.sciencepublishinggroup.com/html/10.11648.j.acm.s.2017060401.12.html#paper-content-4 


HMM learning  by code

N (상태후보개수) 과 M (관측후보개수) 과  T 개의 관측열은 고정 되있다고 가정한다.

O = (O0, O1, . . . , OT −1)


1. 초기화

행렬 A,B,π  에 대해서 각각 초기화 한다. π 는 1 × N 의 갯수를 가지고,  A = {aij}

는  N × N 그리고  B = {bj (k)} 는 N × M 을 가진다. 모든 행렬은 row-stochastic 이다.  

참고로 

π  : 초기상태확률에 대한 행렬 (어떤 상태에서 시작될 것인가?) 
A  : 상태전이확률에 대한 행렬 (a 상태에서 b 상태로 이동하는 확률은?)
B  : 관측확률에 대한 행렬 ( a 상태에서 o1,o2,o3 각각이 관측될 확률은?) 


πi ≈ 1/N   즉 초기값 하나하나들은 대략 1/N 의 확률을 갖게 된다. 

aij ≈ 1/N   즉 상태전이확률을 대략 1/N 의 확률을 갖게 된다. 상태가 N 개니깐, N 개의 상태로 이동한다는 말.

bj (k) ≈ 1/M. 즉 관측확률은 대략 1/M 이된다.  M 은 관측후보개수


2. 슈도코드

2. The α-pass
# α0(i) : 첫번째 관측 (모든 상태)에 대한 전방계산. # c0 : 모든 상태에 대한 첫번째 전방변수(확률들의 합)
c0 = 0
for i = 0 to N − 1 # 모든 상태에 대해서
    α0(i) = πibi(O0) # 초기상태확률 * 초기상태에서 첫번째 관측이 나올 확률
    c0 = c0 + α0(i) # 모든 α0(0),α0(1) 의 합

next i
# α0(i) 에 대한 스케일 조정
c0 = 1/c0
for i = 0 to N − 1
    α0(i) = c0α0(i)
next i
// 각 관측 αt(i)에서의 전방계산

for t = 1 to T − 1 # 모든 관측열에 대해서
    ct = 0 # 해당 시간t에서의 전방변수(확률들의 합)
    for i = 0 to N − 1 # 모든 상태에 대해서
        αt(i) = 0 # t번째 관측에 대해서 전방계산
        for j = 0 to N − 1 # 모든 상태에 대해서
            αt(i) = αt(i) + αt−1(j)aji # 이전전방확률 * j -> i 상태로 전이확률의 합
        next j
        αt(i) = αt(i)bi(Ot) #t(t에서i상태)까지의 전방확률 * t시간의 i상태에서 관측된O의 확률
        ct = ct + αt(i) # 현재 시간(t) 에서의 전방변수 (모든 상태에서의 확률의 합)

    next i
    // αt(i) 에 대한 스케일 조정
    ct = 1/ct
    for i = 0 to N − 1
        αt(i) = ctαt(i)
    next i
next t
3. The β-pass
// Let βT −1(i) = 1, scaled by cT −1
for i = 0 to N − 1 # 모든 상태에 대해서
    βT −1(i) = cT −1
next i
// β-pass
for t = T − 2 to 0 by − 1 # 끝에서 2번째 시간(관측) ~ 첫시간(관측) 까지 하나씩 감소
    for i = 0 to N − 1 # 모든 상태에 대해서
        βt(i) = 0 # t 시간에서 i 상태인 경우에 대한 b-pass변수
        for j = 0 to N − 1 # 모든 상태에 대해서 (상태전이가 N*N 만큼일어나니깐~i*j하는것)
            # 현재t(i)에서의 b-pass 는 상태전이*다음시간관측확률*다음시간이후의b-pass             βt(i) = βt(i) + aij bj (Ot+1)βt+1(j)
        next j
        // scale βt(i) with same scale factor as αt(i)
        βt(i) = ctβt(i)
    next i
next t
# α-pass 와 β-pass 를 구했고 아래에서 그 둘을 이용해서 hidden 변수를 구한다. 4. Compute γt(i, j) and γt(i)
for t = 0 to T − 2
# 모든 시간(관측열)에 대해서 각각 계산한다. (마지막 제외)
    denom = 0
    for i = 0 to N − 1
# 모든 상태에 대해서
        for j = 0 to N − 1
# 모든 상태에 대해서 (N*N)             # t에서의 전방변수(i이전)*전이확률(i*j)*다음상태(j)에서 O관측확률*후방변수(j이후)

            denom = denom + αt(i)aij bj (Ot+1)βt+1(j)
        next j
    next i
    # demon 은 특정시간 t에서의 모든 전,후방확률들의 합     for i = 0 to N − 1
# 모든 상태에 대해서
        γt(i) = 0
        for j = 0 to N − 1
# 모든 상태에 대해서             # 모든 상황(demon) 으로 나누어줌, 즉 i->j 로만 가는 확률
            γt(i, j) = (αt(i)aij bj (Ot+1)βt+1(j))/denom
            # γt(i) 는 결국 i 상태에서 변화되는 모든 γt(i, j) 확률들의 합이다.
            γt(i) = γt(i) + γt(i, j)

        next j
    next i
next t

// Special case for γT −1(i)
denom = 0
for i = 0 to N − 1
    denom = denom + αT −1(i)
next i
for i = 0 to N − 1
    γT −1(i) = αT −1(i)/denom
next i
# γt(i) 는 t시간에 i상태에서 변화되는 모든 확률의 합이다. i->0, i->1, i->j ..i->N


# hidden 변수 γt(i, j) and γt(i) 를 이용하여 (A,B,π) 를 재추정한다.

5. Re-estimate A, B and π
# π 를 재추정한다.
for i = 0 to N − 1 # 모든 상태의 개수만큼 재추정, 즉 어떤 상태로 초기화가 되겠냐?
    πi = γ0(i) # 0시간(첫번째관측)의 i상태에 있을 확률
next i
# A (상태전이확률행렬) 재추정 (i 에서 출발하는 모든 확률로 i->특정j 확률을 나누어서 구함)
for i = 0 to N − 1 # 모든 상태의 개수만큼 재추정
    for j = 0 to N − 1 # 모든 상태의 개수만큼 재추정 (상태변이행렬은 N*N이다.)
        numer = 0
        denom = 0
        for t = 0 to T − 2 # 모든 시간(관측열)에 대해서 계산한다.
            numer = numer + γt(i, j) # 특정 시간t에서 i->j 경우의 γ 를 분자로

            denom = denom + γt(i) # 특정 시간t에서 i 상태인 경우의 γ 를 분모로

        next t
        aij = numer/denom # i->j 경우 / i 에서 이동하는 모든 경우

    next j
next i
// B(관측확률행렬) 재추정 (특정 상태에서 관측될 모든 확률 구함, N*M 이 되겠지)
for i = 0 to N − 1 # 모든 상태의 개수만큼 재추정
    for j = 0 to M − 1 #모든 관측의 개수만큼 재추정
        numer = 0
        denom = 0
        for t = 0 to T − 1 # 모든 시간(관측열)에 대해서 계산한다.
            if(Ot == j) then
                numer = numer + γt(i) #i상태에서 관측열에서의 O가 관측될 확률

            end if

            denom = denom + γt(i) #i상태에서 관측되는 모든것들 합
        next t
        bi(j) = numer/denom #i상태에서 특정O가 관측될 확률/i상태에서 아무거나 관측될 확률

    next j
next i # 새로 평가된 상태전이확률,관측확률,초기확률을 가지고 관측열을 로그우도로 평가한다.
6. Compute log[P(O | λ)]
logProb = 0
for i = 0 to T − 1
    logProb = logProb + log(ci)
next i logProb = −logProb
# 평가결과를 보고 멈출지 계속 순회할지 결정한다.
7. To iterate or not to iterate, that is the question. . .
iters = iters + 1
if (iters < maxIters and logProb > oldLogProb) then
    oldLogProb = logProb
    goto 2
else
    output λ = (π, A, B)
end if


3. 수학식으로 정리 (오일석저 '패턴인식' 에서 발췌) 

a-pass 와 b-pass  :  이전 a-pass * 현재상태로전이 * 현재상태에서 관측확률 로 이루어진다.


감마(i)   :  a-pass  와 b-pass 를 이용해 간단히 만들어진다. k-means 의 경우 E 구간에서 포인트들이 이동하는 모습들이 눈에 보이기 때문에 유추하기 쉽지만, HMM 에서 이걸 어떻게 생각했는지 참 대단하다는 생각이 든다.


감마(i,j) :  역시 a-pass  와 b-pass 를 이용해 간단히 만들어진다. 상태변이확률이 더해졌다.


new A (상태전이확률) :  감마(i) 와 감마(i,j) 로 만들어진다. 여기서는 k(i,j) 가 감마(i,j) 이다.

new B (관측확률) :  감마(i) 로만들어진다.


4. Python 코드 

# HMM의 3가지 문제 ('평가','디코딩','학습') 중 '평가'

def evaluate(self, sequence):
length = len(sequence)
if length == 0:
return 0

prob = 0
alpha = self._forward(sequence)
for state in alpha[length - 1]:
prob += alpha[length - 1][state] # 전방계산에서 마지막 시간에서 모든 상태확률의 합

return prob

# 전방변수 구하기 α-pass
def _forward(self, sequence):
sequence_length = len(sequence) #관측열의 갯수
if sequence_length == 0:
return []

alpha = [{}] # αt(i) ,a-pass 에서 마지막 시간의 상태별 확률들의 합은 평가의 값이다.
    # 초기확률 구하기     for state in self._states:      # 초기상태확률 * 각 상태에서의 첫번째 관측에 대한 관측확률
alpha[0][state] = self.start_prob(state) * self.emit_prob(state, sequence[0])
  # 이 후의 모든 확률 구하기
for index in xrange(1, sequence_length): # 모든 관측열(시간T) 에 대해서 분석
alpha.append({})
for state_to in self._states: # 모든 상태로 도착
prob = 0
for state_from in self._states: # 모든 상태에서 시작           # 이전의 a-pass 과 to상태로의 상태전이확률의 합이 prob 이 된다.
prob += alpha[index - 1][state_from] * self.trans_prob(state_from, state_to)          # 현재 상태에 대한 a-pass 값은 prob * to상태에서의 관측확률

alpha[index][state_to] = prob * self.emit_prob(state_to, sequence[index])

return alpha

# 후방변수 구하기 β-pass
def _backward(self, sequence):
sequence_length = len(sequence)
if sequence_length == 0:
return []

beta = [{}]
for state in self._states:
beta[0][state] = 1

for index in xrange(sequence_length - 1, 0, -1):
beta.insert(0, {})
for state_from in self._states:
prob = 0
for state_to in self._states:
prob += beta[1][state_to] * \
self.trans_prob(state_from, state_to) * \
self.emit_prob(state_to, sequence[index])
beta[0][state_from] = prob

return beta

"""
주어진 `sequence` 로 적절한 상태전이행렬과 관측행렬을 구한다.
`smoothing`은 기본적으로 0이며 관련내용은은 아래 링크에서 확인하라.
`additive smoothing <http://en.wikipedia.org/wiki/Additive_smoothing>`
""" # 학습 시작
def learn(self, sequence, smoothing=0):

length = len(sequence)
alpha = self._forward(sequence) # 전방 a-pass 구하기
beta = self._backward(sequence) # 후방 b-pass 구하기

  # γt(i) 를 구한다. 간단히 말해 (a-pass * b-pass ) / sum
gamma = []
for index in xrange(length): # 모든 관측열(시간T) 에 대해서 계산해준다.
prob_sum = 0
gamma.append({})
for state in self._states: # 모든 관측열(시간T)의 모든 상태에 대해서 계산해준다.        # 특정 시간/상태에서의 a-pass * 특정 시간/상태에서의 b-pass
prob = alpha[index][state] * beta[index][state]
gamma[index][state] = prob
prob_sum += prob

if prob_sum == 0:
continue

for state in self._states: # 모든 상태에 대해서
gamma[index][state] /= prob_sum # 특정시간/상태에 대한 전체로 나눈 감마 확률을 구한다.

xi = []
for index in xrange(length - 1):
prob_sum = 0
xi.append({})
for state_from in self._states:
xi[index][state_from] = {}
for state_to in self._states:
prob = alpha[index][state_from] * beta[index + 1][state_to] * \
self.trans_prob(state_from, state_to) * \
self.emit_prob(state_to, sequence[index + 1])
xi[index][state_from][state_to] = prob
prob_sum += prob

if prob_sum == 0:
continue

for state_from in self._states:
for state_to in self._states:
xi[index][state_from][state_to] /= prob_sum

states_number = len(self._states)
symbols_number = len(self._symbols)
for state in self._states:
# update start probability
self._start_prob[state] = \
(smoothing + gamma[0][state]) / (1 + states_number * smoothing)

# update transition probability
gamma_sum = 0
for index in xrange(length - 1):
gamma_sum += gamma[index][state]

if gamma_sum > 0:
denominator = gamma_sum + states_number * smoothing
for state_to in self._states:
xi_sum = 0
for index in xrange(length - 1):
xi_sum += xi[index][state][state_to]
self._trans_prob[state][state_to] = (smoothing + xi_sum) / denominator
else:
for state_to in self._states:
self._trans_prob[state][state_to] = 0

# update emission probability
gamma_sum += gamma[length - 1][state]
emit_gamma_sum = {}
for symbol in self._symbols:
emit_gamma_sum[symbol] = 0

for index in xrange(length):
emit_gamma_sum[sequence[index]] += gamma[index][state]

if gamma_sum > 0:
denominator = gamma_sum + symbols_number * smoothing
for symbol in self._symbols:
self._emit_prob[state][symbol] = \
(smoothing + emit_gamma_sum[symbol]) / denominator
else:
for symbol in self._symbols:
self._emit_prob[state][symbol] = 0

전체소스: https://github.com/jason2506/PythonHMM/blob/master/hmm.py 





레퍼런스:

https://www.cs.sjsu.edu/~stamp/RUA/HMM.pdf
http://article.sciencepublishinggroup.com/html/10.11648.j.acm.s.2017060401.12.html#paper-content-1
https://github.com/nilmtk/nilmtkhttps://github.com/jason2506/PythonHMM/blob/master/hmm.py 

Comments