In [2]:
import numpy as np
import scipy.io

Tutorial 7 - Markov Chains

Load the assignment data.

In [3]:
vit = scipy.io.loadmat('./a4/viterbiData.mat')
p0 = vit['p0']
pT_long = vit['pT_long']
pT_short1 = vit['pT_short1']
pT_short2 = vit['pT_short2']

The data gives the transition matrices for an inhomogenous markov chain so for each time step $t$, we have a different probability. In particular, we have four transition probabilities for each time step: $p(x_t = 1|x_{t-1} = 1)$, $p(x_t = 2|x_{t-1} = 1)$, $p(x_t = 1|x_{t-1} = 2)$, $p(x_t = 2|x_{t-1} = 2)$, so our transition probabilities are stored in a 3D tensor with shape $(2, 2, T)$ where $T$ is the total number of time steps. Here's the short example chain:

In [4]:
print "Transition probability tensor shape: ", pT_short1.shape
print pT_short1
print "p_0", p0
Transition probability tensor shape:  (2, 2, 3)
[[[ 0.41702895  0.94793312  0.10570943]
  [ 0.58297105  0.05206688  0.89429057]]

 [[ 0.20597552  0.08207121  0.14204112]
  [ 0.79402448  0.91792879  0.85795888]]]
p_0 [[ 0.6  0.4]]
In [5]:
pT_long.shape
Out[5]:
(2, 2, 30)

So, for example, at time $t=2$, we have the follow transition probabilities:

In [6]:
pT_short1[:,:,2]
Out[6]:
array([[ 0.10570943,  0.89429057],
       [ 0.14204112,  0.85795888]])

which says: $$p(x_3=0|x_2=0) = 0.10570943,$$ $$p(x_3=1|x_2=0) = 0.89429057,$$ $$p(x_3=0|x_2=1) = 0.14204112,$$ $$p(x_3=1|x_2=1) = 0.85795888.$$

Aside on sampling from a binomial

you can sample by drawing uniforms and testing if they're less than your parameter $p$. e.g. to get 10 biased coin flips which come up heads 75% of the time:

In [34]:
1*(np.random.rand(10) < 0.75) # the 1* is a quick hack to convert from bool to int...
Out[34]:
array([1, 1, 0, 1, 1, 0, 0, 0, 1, 0])
In [36]:
(np.random.rand(100000) < 0.75).mean() # mean of 100000 samples
Out[36]:
0.75061999999999995

Alternatively you can use numpy's binomial function:

In [37]:
np.random.binomial(1, 0.75, size=10)
Out[37]:
array([1, 1, 1, 0, 1, 1, 1, 1, 0, 1])

It also takes vectors of probabilities as input so, for example, to flip a fair coin then a biased coin, you could do:

In [39]:
np.random.binomial(1, [0.5,0.75])
Out[39]:
array([1, 1])
In [41]:
np.random.binomial(1, [0.5,0.75], size=(10, 2)) # 10 samples, fair coin in the first column, biased in the second
Out[41]:
array([[0, 1],
       [1, 1],
       [0, 1],
       [1, 1],
       [0, 0],
       [1, 1],
       [1, 0],
       [0, 1],
       [1, 0],
       [1, 1]])

Ancestral sampling

Let's start by sampling from $p_0$ to get our start state:

In [8]:
n = 10 # number of samples
# Python indexes from 0 so we'll use 0 to represent state 0 and 1 to represent state 1.
x_0 = 1 - ( np.random.rand(n) < p0[0,0]) # The 1 - () ensures that all the state 0 samples = 0 and state 1 samples = 1
(x_0 == 0).mean() # this computes the probability that we're in state 0
Out[8]:
0.69999999999999996

Now, for every on of those samples, we want to sample a value for $x_1$ from $p(x_1 | x_0)$. For pT_short1, we can do this as follows:

In [9]:
x_1 = np.zeros_like(x_0) # empty array to store our samples
for i in x_0:
    print 'x_0 = %d, p(x_1 = 0 | x_0 = %d) = %f' % (i, i, pT_short1[i, 0, 0])
    x_1 = 1 - ( np.random.rand(n) < pT_short1[i, 0, 0])
print x_1
x_0 = 1, p(x_1 = 0 | x_0 = 1) = 0.205976
x_0 = 0, p(x_1 = 0 | x_0 = 0) = 0.417029
x_0 = 1, p(x_1 = 0 | x_0 = 1) = 0.205976
x_0 = 0, p(x_1 = 0 | x_0 = 0) = 0.417029
x_0 = 0, p(x_1 = 0 | x_0 = 0) = 0.417029
x_0 = 0, p(x_1 = 0 | x_0 = 0) = 0.417029
x_0 = 0, p(x_1 = 0 | x_0 = 0) = 0.417029
x_0 = 0, p(x_1 = 0 | x_0 = 0) = 0.417029
x_0 = 1, p(x_1 = 0 | x_0 = 1) = 0.205976
x_0 = 0, p(x_1 = 0 | x_0 = 0) = 0.417029
[1 1 0 0 1 0 1 1 1 0]

We can generalize this to deal with the full set of transition probabilities:

In [10]:
def sample_ancestral(p0, pT, n=10000):
    '''
    p0: (states_0 vector) starting probabilities.
    pT: (states_in x states_out x T tensor) transition probabilities.
    n: (int) number of samples to draw.
    '''
    T = pT.shape[2] +1
    X_samples = np.zeros((n,T), dtype='int')
    for i in xrange(n):
        if np.random.rand() < p0[0][0]:
            X_samples[i, 0] = 0
        else:
            X_samples[i, 0] = 1
        for j in xrange(1, T):
            if np.random.rand() < pT[X_samples[i,j-1], 0, j-1]:
                X_samples[i,j] = 0
            else:
                X_samples[i,j] = 1
    return X_samples
            
In [11]:
X_samples = sample_ancestral(p0, pT_short1, 100000)
(X_samples==0).mean(axis=0)
Out[11]:
array([ 0.59984,  0.33261,  0.37136,  0.12801])

Aside: vectorized code

Python (and Matlab) tend to have very slow for loops... it's a good habit to learn to vectorize your code. Here's the how we'd do it for the sampling code:

In [42]:
def efficient_sample_ancestral(p0, pT, n=10000):
    '''
    p0: starting probabilities.
    pT: transition probabilities.
    n: number of samples to draw.
    '''
    d = pT.shape[2] +1
    X_samples = np.zeros((n,d), dtype='int')
    X_samples[:, 0] = 1-np.random.binomial(1, p0[0,0], X_samples.shape[0])
    for j in xrange(1, d):
        pxt_xt1 = pT[X_samples[:, j-1],0,j-1] # here I'm indexing with the X_samples vector to get a vector of probabilities
        X_samples[:, j] = 1-np.random.binomial(1,pxt_xt1)
    return X_samples
In [13]:
X_samples = efficient_sample_ancestral(p0, pT_short1, 100000)
(X_samples==0).mean(axis=0)
Out[13]:
array([ 0.59851,  0.33208,  0.37076,  0.12951])

Computing marginals with the Chapman-Kolmogorov equations

In the above example, we computed the marginals using sampling, but we can do it exactly using the Chapman-Kolmogorov equations: $$ p(x_t) = \sum_{x_{t-1}} p(x_t | x_{t-1})p( x_{t-1}) $$ Now, we have p(x_0 = 0) from p0.

In [14]:
print 'p(x_0 = 0) = %f' % p0[0,0]
p_0 = p0[0,0]
p(x_0 = 0) = 0.600000

So, to get $p(x_1 = 0)$ we compute, $p(x_1 = 0) = p(x_1 = 0 | x_0=0)p(x_0=0) + p(x_1 = 0 | x_0=1)p(x_0=1)$

In [15]:
p_x_1 = pT_short1[0, 0, 0] * p_0 + pT_short1[1, 0, 0] * (1 - p_0)
p_x_1
Out[15]:
0.33260757719862893

In general for pT_short1:

In [16]:
d = pT_short1.shape[2] + 1
p = np.zeros(d)
p[0] = p0[0,0]
for i in xrange(1,d):
    p[i] = pT_short1[0, 0, i-1] * p[i-1] + pT_short1[1, 0, i-1] * (1 - p[i-1])
print p
[ 0.6         0.33260758  0.37006344  0.12859609]
In [17]:
(efficient_sample_ancestral(p0, pT_short1, 100000)==0).mean(axis=0)
Out[17]:
array([ 0.59906,  0.33123,  0.37021,  0.12851])

The above code was correct for our purposes, but only kept track of $p(x_t = 0)$. We can get $p(x_t = s)$ for all states in a simmilar fashion as follows:

In [18]:
def compute_marginals(p0, pT):
    '''
    p0: starting probabilities.
    pT: transition probabilities.
    '''
    d = pT.shape[2] + 1
    M = np.zeros((2,d))
    M[:,0] = p0[0]
    for i in xrange(1,d):
        M[:,i] = np.dot(M[:,i-1] , pT[:,:,i-1])
    return M
M = compute_marginals(p0, pT_short1)
In [19]:
M[0,:]
Out[19]:
array([ 0.6       ,  0.33260758,  0.37006344,  0.12859609])
In [20]:
M[1,:]
Out[20]:
array([ 0.4       ,  0.66739242,  0.62993656,  0.87140391])
In [21]:
M[0,:] + M[1,:]
Out[21]:
array([ 1.,  1.,  1.,  1.])

Maximizing the marginal probability

Now, we'd like to find the most likely sequence... first let's code up a quick function for telling us the probability of a particular sequence. In your assignment, this is done for you in decodeProb.m, but let's rewrite it here...

In [22]:
def joint_prob(y, p0, pT):
    '''
    y: the sequence to evaluate
    p0: starting probabilities.
    pT: transition probabilities.
    '''
    d = pT.shape[2] + 1
    prob = p0[0, y[0]]
    for i in xrange(1,d):
        prob *= pT[y[i-1], y[i], i-1]
    return prob

The brute force way of finding the most likely sequence is to do the following (exactDecode.m does the same thing with a more elegent way of representing the nested for loops).

In [23]:
y = [1, 0, 0, 1]
joint_prob(y, p0, pT_short1)
Out[23]:
0.06984445627515376
In [24]:
y = [1, 0, 0, 1]
max_prob = -0.
for y1 in [0, 1]:
    for y2 in [0, 1]:
        for y3 in [0,1]:
            for y4 in [0,1]:
                y = [y1, y2, y3, y4]
                p = joint_prob(y, p0, pT_short1)
                if p > max_prob:
                    y_best = y
                    max_prob = p
print 'Most likely sequence: ', y_best
Most likely sequence:  [0, 1, 1, 1]

The first thing you might try is to maximize the marginal probability.

In [25]:
M = compute_marginals(p0,pT_short1)
np.argmax(M,axis=0)
Out[25]:
array([0, 1, 1, 1])

In this case the sequence that maximised our joint probability happend to be the one that maximised the marginal proabability, but in general that won't be true... to find the sequence that maximises the joint probability, we need to use Viterbi decoding below...

Viterbi Decoding

Let's do this more efficiently...

Recall from the lectures, that we're going to memoize the solution. Because each time step has the same number of states, we can use a matrix for this of size (num states, num time steps).

In [26]:
M = np.zeros((2, pT_short1.shape[2] + 1))
T = np.zeros((2,d),dtype='int') # where we'll keep our pointers indicating which state lead us where...

$M_0(x_0) = p(x_0)$ so the first state is trivial

In [27]:
M[:,0] = p0[0]
# we don't update T for time 0 because we just started our chain at 0 so we don't have a pointer to a preceeding state

Now, to get $M_1(x_1)$ we have the following: $M_1(x_1) = \max_{x_0} p(x_1 | x_0) M_0(x_0)$ Which essentially says, "given that I end up at $x_1$, what is the probability of the most likely step that I took to get there". We can do this because from the markov property, we know that given $x_0$, nothing that happened before $x_0$ effects the probability of being in a particular state at time 1 (obviously that's trivally true in this case because $x_0$ is our start state, but this holds more generally), so because we're trying to find the most likely sequence, it suffices to just think about all the ways that I could have gotten to a particular $x_1$ value and pick the most likely of them. For example:

In [28]:
# M[0,1] gives us the probability of the most likely sequence that ends in state 0 at time 1.
ways_to_get_to_state_0 = []
ways_to_get_to_state_0.append(M[0,0] * pT_short1[0, 0, 0]) # start at state 0 at time 0 and go to state 0 at time 1
ways_to_get_to_state_0.append(M[1,0] * pT_short1[1, 0, 0]) # start at state 1 at time 0 and go to state 0 at time 1
M[0,1] = np.max(ways_to_get_to_state_0)
T[0,1] = np.argmax(ways_to_get_to_state_0) # keep track of which of the two states the most likely path came from...

ways_to_get_to_state_1 = []
ways_to_get_to_state_1.append(M[0,0] * pT_short1[0, 1, 0]) # start at state 0 at time 0 and go to state 0 at time 1
ways_to_get_to_state_1.append(M[1,0] * pT_short1[1, 1, 0]) # start at state 1 at time 0 and go to state 0 at time 1
M[1,1] = np.max(ways_to_get_to_state_1)
T[1,1] = np.argmax(ways_to_get_to_state_1) # keep track of which of the two states the most likely path came from...

In general, this is what you'd do:

In [29]:
def viterbi_decode(p0, pT):
    d = pT.shape[2] + 1
    M = np.zeros((2,d))
    T = np.zeros((2,d),dtype='int')
    M[:,0] = p0[0]
    
    for i in xrange(1,d):
        for s in [0, 1]:
            m = [M[0,i-1] * pT[0,s,i-1], M[1,i-1] * pT[1,s,i-1]]
            M[s,i] = np.max(m)
            T[s,i] = np.argmax(m)
    
    y_best = np.zeros(d, dtype='int')
    y_best[-1] = np.argmax(M[:,-1])
    for i in xrange(d-1, 1, -1):
        y_best[i-1] = T[y_best[i], i]
    return y_best, M, T
In [30]:
y_best, M, T = viterbi_decode(p0, pT_short1)
y_best
Out[30]:
array([0, 1, 1, 1])
In [31]:
print M[y_best[-1],-1] == joint_prob(y_best, p0, pT_short1) # check that we haven't made a mistake
True
In [ ]: