...
 
Commits (2)
# -*- coding: utf-8 -*-
"""
Created on Fri Jan 9 13:41:56 2015
@author: steve
"""
import time as _time
import numpy as _np
from mdptoolbox.mdp_base import MDP
class FiniteHorizon(MDP):
"""A MDP solved using the finite-horizon backwards induction algorithm.
Parameters
----------
transitions : array
Transition probability matrices. See the documentation for the ``MDP``
class for details.
reward : array
Reward matrices or vectors. See the documentation for the ``MDP`` class
for details.
discount : float
Discount factor. See the documentation for the ``MDP`` class for
details.
N : int
Number of periods. Must be greater than 0.
h : array, optional
Terminal reward. Default: a vector of zeros.
Data Attributes
---------------
V : array
Optimal value function. Shape = (S, N+1). ``V[:, n]`` = optimal value
function at stage ``n`` with stage in {0, 1...N-1}. ``V[:, N]`` value
function for terminal stage.
policy : array
Optimal policy. ``policy[:, n]`` = optimal policy at stage ``n`` with
stage in {0, 1...N}. ``policy[:, N]`` = policy for stage ``N``.
time : float
used CPU time
Notes
-----
In verbose mode, displays the current stage and policy transpose.
Examples
--------
>>> import mdptoolbox, mdptoolbox.example
>>> P, R = mdptoolbox.example.forest()
>>> fh = mdptoolbox.mdp.FiniteHorizon(P, R, 0.9, 3)
>>> fh.run()
>>> fh.V
array([[ 2.6973, 0.81 , 0. , 0. ],
[ 5.9373, 3.24 , 1. , 0. ],
[ 9.9373, 7.24 , 4. , 0. ]])
>>> fh.policy
array([[0, 0, 0],
[0, 0, 1],
[0, 0, 0]])
"""
def __init__(self, transitions, reward, discount, N, h=None):
# Initialise a finite horizon MDP.
self.N = int(N)
assert self.N > 0, "N must be greater than 0."
# Initialise the base class
MDP.__init__(self, transitions, reward, discount, None, None)
# remove the iteration counter, it is not meaningful for backwards
# induction
del self.iter
# There are value vectors for each time step up to the horizon
self.V = _np.zeros((self.S, N + 1))
# There are policy vectors for each time step before the horizon, when
# we reach the horizon we don't need to make decisions anymore.
self.policy = _np.empty((self.S, N), dtype=int)
# Set the reward for the final transition to h, if specified.
if h is not None:
self.V[:, N] = h
def run(self):
# Run the finite horizon algorithm.
self.time = _time.time()
# loop through each time period
for n in range(self.N):
W, X = self._bellmanOperator(self.V[:, self.N - n])
stage = self.N - n - 1
self.V[:, stage] = X
self.policy[:, stage] = W
if self.verbose:
print(("stage: %s, policy: %s") % (
stage, self.policy[:, stage].tolist()))
# update time spent running
self.time = _time.time() - self.time
# After this we could create a tuple of tuples for the values and
# policies.
#self.V = tuple(tuple(self.V[:, n].tolist()) for n in range(self.N))
#self.policy = tuple(tuple(self.policy[:, n].tolist())
# for n in range(self.N))
# -*- coding: utf-8 -*-
"""
"""
import time as _time
import numpy as _np
import scipy.sparse as _sp
from mdptoolbox.mdp_base import MDP
class LP(MDP):
"""A discounted MDP soloved using linear programming.
This class requires the Python ``cvxopt`` module to be installed.
Arguments
---------
transitions : array
Transition probability matrices. See the documentation for the ``MDP``
class for details.
reward : array
Reward matrices or vectors. See the documentation for the ``MDP`` class
for details.
discount : float
Discount factor. See the documentation for the ``MDP`` class for
details.
h : array, optional
Terminal reward. Default: a vector of zeros.
Data Attributes
---------------
V : tuple
optimal values
policy : tuple
optimal policy
time : float
used CPU time
Examples
--------
>>> import mdptoolbox.example
>>> P, R = mdptoolbox.example.forest()
>>> lp = mdptoolbox.mdp.LP(P, R, 0.9)
>>> lp.run()
>>> import numpy, mdptoolbox
>>> P = numpy.array((((0.5, 0.5), (0.8, 0.2)), ((0, 1), (0.1, 0.9))))
>>> R = numpy.array(((5, 10), (-1, 2)))
>>> lp = mdptoolbox.mdp.LP(P, R, 0.9)
>>> lp.run()
>>> #lp.policy #FIXME: gives (1, 1), should be (1, 0)
"""
def __init__(self, transitions, reward, discount):
# Initialise a linear programming MDP.
# import some functions from cvxopt and set them as object methods
try:
from cvxopt import matrix, solvers
self._linprog = solvers.lp
self._cvxmat = matrix
except ImportError:
raise ImportError("The python module cvxopt is required to use "
"linear programming functionality.")
# initialise the MDP. epsilon and max_iter are not needed
MDP.__init__(self, transitions, reward, discount, None, None)
# Set the cvxopt solver to be quiet by default, but ...
# this doesn't do what I want it to do c.f. issue #3
if not self.verbose:
solvers.options['show_progress'] = False
def run(self):
#Run the linear programming algorithm.
self.time = _time.time()
# The objective is to resolve : min V / V >= PR + discount*P*V
# The function linprog of the optimisation Toolbox of Mathworks
# resolves :
# min f'* x / M * x <= b
# So the objective could be expressed as :
# min V / (discount*P-I) * V <= - PR
# To avoid loop on states, the matrix M is structured following actions
# M(A*S,S)
f = self._cvxmat(_np.ones((self.S, 1)))
h = _np.array(self.R).reshape(self.S * self.A, 1, order="F")
h = self._cvxmat(h, tc='d')
M = _np.zeros((self.A * self.S, self.S))
for aa in range(self.A):
pos = (aa + 1) * self.S
M[(pos - self.S):pos, :] = (
self.discount * self.P[aa] - _sp.eye(self.S, self.S))
M = self._cvxmat(M)
# Using the glpk option will make this behave more like Octave
# (Octave uses glpk) and perhaps Matlab. If solver=None (ie using the
# default cvxopt solver) then V agrees with the Octave equivalent
# only to 10e-8 places. This assumes glpk is installed of course.
self.V = _np.array(self._linprog(f, M, -h)['x']).reshape(self.S)
# apply the Bellman operator
self.policy, self.V = self._bellmanOperator()
# update the time spent solving
self.time = _time.time() - self.time
# store value and policy as tuples
self.V = tuple(self.V.tolist())
self.policy = tuple(self.policy.tolist())
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
# -*- coding: utf-8 -*-
"""
Created on Fri Jan 9 13:47:35 2015
@author: steve
"""
import time as _time
import numpy as _np
import mdptoolbox.util as _util
from mdptoolbox.policy_iteration import PolicyIteration
class PolicyIterationModified(PolicyIteration):
"""A discounted MDP solved using a modifified policy iteration algorithm.
Arguments
---------
transitions : array
Transition probability matrices. See the documentation for the ``MDP``
class for details.
reward : array
Reward matrices or vectors. See the documentation for the ``MDP`` class
for details.
discount : float
Discount factor. See the documentation for the ``MDP`` class for
details.
epsilon : float, optional
Stopping criterion. See the documentation for the ``MDP`` class for
details. Default: 0.01.
max_iter : int, optional
Maximum number of iterations. See the documentation for the ``MDP``
class for details. Default is 10.
Data Attributes
---------------
V : tuple
value function
policy : tuple
optimal policy
iter : int
number of done iterations
time : float
used CPU time
Examples
--------
>>> import mdptoolbox, mdptoolbox.example
>>> P, R = mdptoolbox.example.forest()
>>> pim = mdptoolbox.mdp.PolicyIterationModified(P, R, 0.9)
>>> pim.run()
>>> pim.policy
(0, 0, 0)
>>> expected = (21.81408652334702, 25.054086523347017, 29.054086523347017)
>>> all(expected[k] - pim.V[k] < 1e-12 for k in range(len(expected)))
True
"""
def __init__(self, transitions, reward, discount, epsilon=0.01,
max_iter=10):
# Initialise a (modified) policy iteration MDP.
# Maybe its better not to subclass from PolicyIteration, because the
# initialisation of the two are quite different. eg there is policy0
# being calculated here which doesn't need to be. The only thing that
# is needed from the PolicyIteration class is the _evalPolicyIterative
# function. Perhaps there is a better way to do it?
PolicyIteration.__init__(self, transitions, reward, discount, None,
max_iter, 1)
# PolicyIteration doesn't pass epsilon to MDP.__init__() so we will
# check it here
self.epsilon = float(epsilon)
assert epsilon > 0, "'epsilon' must be greater than 0."
# computation of threshold of variation for V for an epsilon-optimal
# policy
if self.discount != 1:
self.thresh = self.epsilon * (1 - self.discount) / self.discount
else:
self.thresh = self.epsilon
if self.discount == 1:
self.V = _np.zeros(self.S)
else:
Rmin = min(R.min() for R in self.R)
self.V = 1 / (1 - self.discount) * Rmin * _np.ones((self.S,))
def run(self):
# Run the modified policy iteration algorithm.
if self.verbose:
print(' \tIteration\t\tV-variation')
self.time = _time.time()
done = False
while not done:
self.iter += 1
self.policy, Vnext = self._bellmanOperator()
#[Ppolicy, PRpolicy] = mdp_computePpolicyPRpolicy(P, PR, policy);
variation = _util.getSpan(Vnext - self.V)
if self.verbose:
print((" %s\t\t %s" % (self.iter, variation)))
self.V = Vnext
if variation < self.thresh:
done = True
else:
is_verbose = False
if self.verbose:
self.setSilent()
is_verbose = True
self._evalPolicyIterative(self.V, self.epsilon, self.max_iter)
if is_verbose:
self.setVerbose()
self.time = _time.time() - self.time
# store value and policy as tuples
self.V = tuple(self.V.tolist())
self.policy = tuple(self.policy.tolist())
# -*- coding: utf-8 -*-
"""
"""
import math as _math
import time as _time
import numpy as _np
import mdptoolbox.util as _util
from mdptoolbox.mdp_base import MDP
class QLearning(MDP):
"""A discounted MDP solved using the Q learning algorithm.
Parameters
----------
transitions : array
Transition probability matrices. See the documentation for the ``MDP``
class for details.
reward : array
Reward matrices or vectors. See the documentation for the ``MDP`` class
for details.
discount : float
Discount factor. See the documentation for the ``MDP`` class for
details.
n_iter : int, optional
Number of iterations to execute. This is ignored unless it is an
integer greater than the default value. Defaut: 10,000.
Data Attributes
---------------
Q : array
learned Q matrix (SxA)
V : tuple
learned value function (S).
policy : tuple
learned optimal policy (S).
mean_discrepancy : array
Vector of V discrepancy mean over 100 iterations. Then the length of
this vector for the default value of N is 100 (N/100).
Examples
---------
>>> # These examples are reproducible only if random seed is set to 0 in
>>> # both the random and numpy.random modules.
>>> import numpy as np
>>> import mdptoolbox, mdptoolbox.example
>>> np.random.seed(0)
>>> P, R = mdptoolbox.example.forest()
>>> ql = mdptoolbox.mdp.QLearning(P, R, 0.96)
>>> ql.run()
>>> ql.Q
array([[ 11.198909 , 10.34652034],
[ 10.74229967, 11.74105792],
[ 2.86980001, 12.25973286]])
>>> expected = (11.198908998901134, 11.741057920409865, 12.259732864170232)
>>> all(expected[k] - ql.V[k] < 1e-12 for k in range(len(expected)))
True
>>> ql.policy
(0, 1, 1)
>>> import mdptoolbox
>>> import numpy as np
>>> P = np.array([[[0.5, 0.5],[0.8, 0.2]],[[0, 1],[0.1, 0.9]]])
>>> R = np.array([[5, 10], [-1, 2]])
>>> np.random.seed(0)
>>> ql = mdptoolbox.mdp.QLearning(P, R, 0.9)
>>> ql.run()
>>> ql.Q
array([[ 33.33010866, 40.82109565],
[ 34.37431041, 29.67236845]])
>>> expected = (40.82109564847122, 34.37431040682546)
>>> all(expected[k] - ql.V[k] < 1e-12 for k in range(len(expected)))
True
>>> ql.policy
(1, 0)
"""
def __init__(self, transitions, reward, discount, n_iter=10000):
# Initialise a Q-learning MDP.
# The following check won't be done in MDP()'s initialisation, so let's
# do it here
self.max_iter = int(n_iter)
assert self.max_iter >= 10000, "'n_iter' should be greater than 10000."
# We don't want to send this to MDP because _computePR should not be
# run on it, so check that it defines an MDP
_util.check(transitions, reward)
# Store P, S, and A
self._computeP(transitions)
self.R = reward
self.discount = discount
# Initialisations
self.Q = _np.zeros((self.S, self.A))
self.mean_discrepancy = []
def run(self):
# Run the Q-learning algoritm.
discrepancy = []
self.time = _time.time()
# initial state choice
s = _np.random.randint(0, self.S)
for n in range(1, self.max_iter + 1):
# Reinitialisation of trajectories every 100 transitions
if (n % 100) == 0:
s = _np.random.randint(0, self.S)
# Action choice : greedy with increasing probability
# probability 1-(1/log(n+2)) can be changed
pn = _np.random.random()
if pn < (1 - (1 / _math.log(n + 2))):
# optimal_action = self.Q[s, :].max()
a = self.Q[s, :].argmax()
else:
a = _np.random.randint(0, self.A)
# Simulating next state s_new and reward associated to <s,s_new,a>
p_s_new = _np.random.random()
p = 0
s_new = -1
while (p < p_s_new) and (s_new < (self.S - 1)):
s_new = s_new + 1
p = p + self.P[a][s, s_new]
try:
r = self.R[a][s, s_new]
except IndexError:
try:
r = self.R[s, a]
except IndexError:
r = self.R[s]
# Updating the value of Q
# Decaying update coefficient (1/sqrt(n+2)) can be changed
delta = r + self.discount * self.Q[s_new, :].max() - self.Q[s, a]
dQ = (1 / _math.sqrt(n + 2)) * delta
self.Q[s, a] = self.Q[s, a] + dQ
# current state is updated
s = s_new
# Computing and saving maximal values of the Q variation
discrepancy.append(_np.absolute(dQ))
# Computing means all over maximal Q variations values
if len(discrepancy) == 100:
self.mean_discrepancy.append(_np.mean(discrepancy))
discrepancy = []
# compute the value function and the policy
self.V = self.Q.max(axis=1)
self.policy = self.Q.argmax(axis=1)
self.time = _time.time() - self.time
# convert V and policy to tuples
self.V = tuple(self.V.tolist())
self.policy = tuple(self.policy.tolist())
# -*- coding: utf-8 -*-
"""
"""
import time as _time
import numpy as _np
import mdptoolbox.util as _util
from mdptoolbox.mdp_base import MDP
class RelativeValueIteration(MDP):
"""A MDP solved using the relative value iteration algorithm.
Arguments
---------
transitions : array
Transition probability matrices. See the documentation for the ``MDP``
class for details.
reward : array
Reward matrices or vectors. See the documentation for the ``MDP`` class
for details.
epsilon : float, optional
Stopping criterion. See the documentation for the ``MDP`` class for
details. Default: 0.01.
max_iter : int, optional
Maximum number of iterations. See the documentation for the ``MDP``
class for details. Default: 1000.
Data Attributes
---------------
policy : tuple
epsilon-optimal policy
average_reward : tuple
average reward of the optimal policy
cpu_time : float
used CPU time
Notes
-----
In verbose mode, at each iteration, displays the span of U variation
and the condition which stopped iterations : epsilon-optimum policy found
or maximum number of iterations reached.
Examples
--------
>>> import mdptoolbox, mdptoolbox.example
>>> P, R = mdptoolbox.example.forest()
>>> rvi = mdptoolbox.mdp.RelativeValueIteration(P, R)
>>> rvi.run()
>>> rvi.average_reward
3.2399999999999993
>>> rvi.policy
(0, 0, 0)
>>> rvi.iter
4
>>> import mdptoolbox
>>> import numpy as np
>>> P = np.array([[[0.5, 0.5],[0.8, 0.2]],[[0, 1],[0.1, 0.9]]])
>>> R = np.array([[5, 10], [-1, 2]])
>>> rvi = mdptoolbox.mdp.RelativeValueIteration(P, R)
>>> rvi.run()
>>> expected = (10.0, 3.885235246411831)
>>> all(expected[k] - rvi.V[k] < 1e-12 for k in range(len(expected)))
True
>>> rvi.average_reward
3.8852352464118312
>>> rvi.policy
(1, 0)
>>> rvi.iter
29
"""
def __init__(self, transitions, reward, epsilon=0.01, max_iter=1000):
# Initialise a relative value iteration MDP.
MDP.__init__(self, transitions, reward, None, epsilon, max_iter)
self.epsilon = epsilon
self.discount = 1
self.V = _np.zeros(self.S)
self.gain = 0 # self.U[self.S]
self.average_reward = None
def run(self):
# Run the relative value iteration algorithm.
done = False
if self.verbose:
print(' Iteration\t\tU variation')
self.time = _time.time()
while not done:
self.iter += 1
self.policy, Vnext = self._bellmanOperator()
Vnext = Vnext - self.gain
variation = _util.getSpan(Vnext - self.V)
if self.verbose:
print((" %s\t\t %s" % (self.iter, variation)))
if variation < self.epsilon:
done = True
self.average_reward = self.gain + (Vnext - self.V).min()
if self.verbose:
print(_util._MSG_STOP_EPSILON_OPTIMAL_POLICY)
elif self.iter == self.max_iter:
done = True
self.average_reward = self.gain + (Vnext - self.V).min()
if self.verbose:
print(_util._MSG_STOP_MAX_ITER)
self.V = Vnext
self.gain = float(self.V[self.S - 1])
self.time = _time.time() - self.time
# store value and policy as tuples
self.V = tuple(self.V.tolist())
self.policy = tuple(self.policy.tolist())
......@@ -56,6 +56,14 @@ import numpy as _np
import mdptoolbox.error as _error
_MSG_STOP_MAX_ITER = "Iterating stopped due to maximum number of iterations " \
"condition."
_MSG_STOP_EPSILON_OPTIMAL_POLICY = "Iterating stopped, epsilon-optimal " \
"policy found."
_MSG_STOP_EPSILON_OPTIMAL_VALUE = "Iterating stopped, epsilon-optimal value " \
"function found."
_MSG_STOP_UNCHANGING_POLICY = "Iterating stopped, unchanging policy found."
_MDPERR = {
"mat_nonneg" :
"Transition probabilities must be non-negative.",
......
# -*- coding: utf-8 -*-
"""
"""
import math as _math
import time as _time
import numpy as _np
import mdptoolbox.util as _util
from mdptoolbox.mdp_base import MDP
class ValueIteration(MDP):
"""A discounted MDP solved using the value iteration algorithm.
Description
-----------
ValueIteration applies the value iteration algorithm to solve a
discounted MDP. The algorithm consists of solving Bellman's equation
iteratively.
Iteration is stopped when an epsilon-optimal policy is found or after a
specified number (``max_iter``) of iterations.
This function uses verbose and silent modes. In verbose mode, the function
displays the variation of ``V`` (the value function) for each iteration and
the condition which stopped the iteration: epsilon-policy found or maximum
number of iterations reached.
Parameters
----------
transitions : array
Transition probability matrices. See the documentation for the ``MDP``
class for details.
reward : array
Reward matrices or vectors. See the documentation for the ``MDP`` class
for details.
discount : float
Discount factor. See the documentation for the ``MDP`` class for
details.
epsilon : float, optional
Stopping criterion. See the documentation for the ``MDP`` class for
details. Default: 0.01.
max_iter : int, optional
Maximum number of iterations. If the value given is greater than a
computed bound, a warning informs that the computed bound will be used
instead. By default, if ``discount`` is not equal to 1, a bound for
``max_iter`` is computed, otherwise ``max_iter`` = 1000. See the
documentation for the ``MDP`` class for further details.
initial_value : array, optional
The starting value function. Default: a vector of zeros.
Data Attributes
---------------
V : tuple
The optimal value function.
policy : tuple
The optimal policy function. Each element is an integer corresponding
to an action which maximises the value function in that state.
iter : int
The number of iterations taken to complete the computation.
time : float
The amount of CPU time used to run the algorithm.
Methods
-------
run()
Do the algorithm iteration.
setSilent()
Sets the instance to silent mode.
setVerbose()
Sets the instance to verbose mode.
Notes
-----
In verbose mode, at each iteration, displays the variation of V
and the condition which stopped iterations: epsilon-optimum policy found
or maximum number of iterations reached.
Examples
--------
>>> import mdptoolbox, mdptoolbox.example
>>> P, R = mdptoolbox.example.forest()
>>> vi = mdptoolbox.mdp.ValueIteration(P, R, 0.96)
>>> vi.verbose
False
>>> vi.run()
>>> expected = (5.93215488, 9.38815488, 13.38815488)
>>> all(expected[k] - vi.V[k] < 1e-12 for k in range(len(expected)))
True
>>> vi.policy
(0, 0, 0)
>>> vi.iter
4
>>> import mdptoolbox
>>> import numpy as np
>>> P = np.array([[[0.5, 0.5],[0.8, 0.2]],[[0, 1],[0.1, 0.9]]])
>>> R = np.array([[5, 10], [-1, 2]])
>>> vi = mdptoolbox.mdp.ValueIteration(P, R, 0.9)
>>> vi.run()
>>> expected = (40.048625392716815, 33.65371175967546)
>>> all(expected[k] - vi.V[k] < 1e-12 for k in range(len(expected)))
True
>>> vi.policy
(1, 0)
>>> vi.iter
26
>>> import mdptoolbox
>>> import numpy as np
>>> from scipy.sparse import csr_matrix as sparse
>>> P = [None] * 2
>>> P[0] = sparse([[0.5, 0.5],[0.8, 0.2]])
>>> P[1] = sparse([[0, 1],[0.1, 0.9]])
>>> R = np.array([[5, 10], [-1, 2]])
>>> vi = mdptoolbox.mdp.ValueIteration(P, R, 0.9)
>>> vi.run()
>>> expected = (40.048625392716815, 33.65371175967546)
>>> all(expected[k] - vi.V[k] < 1e-12 for k in range(len(expected)))
True
>>> vi.policy
(1, 0)
"""
def __init__(self, transitions, reward, discount, epsilon=0.01,
max_iter=1000, initial_value=0):
# Initialise a value iteration MDP.
MDP.__init__(self, transitions, reward, discount, epsilon, max_iter)
# initialization of optional arguments
if initial_value == 0:
self.V = _np.zeros(self.S)
else:
assert len(initial_value) == self.S, "The initial value must be " \
"a vector of length S."
self.V = _np.array(initial_value).reshape(self.S)
if self.discount < 1:
# compute a bound for the number of iterations and update the
# stored value of self.max_iter
self._boundIter(epsilon)
# computation of threshold of variation for V for an epsilon-
# optimal policy
self.thresh = epsilon * (1 - self.discount) / self.discount
else: # discount == 1
# threshold of variation for V for an epsilon-optimal policy
self.thresh = epsilon
def _boundIter(self, epsilon):
# Compute a bound for the number of iterations.
#
# for the value iteration
# algorithm to find an epsilon-optimal policy with use of span for the
# stopping criterion
#
# Arguments -----------------------------------------------------------
# Let S = number of states, A = number of actions
# epsilon = |V - V*| < epsilon, upper than 0,
# optional (default : 0.01)
# Evaluation ----------------------------------------------------------
# max_iter = bound of the number of iterations for the value
# iteration algorithm to find an epsilon-optimal policy with use of
# span for the stopping criterion
# cpu_time = used CPU time
#
# See Markov Decision Processes, M. L. Puterman,
# Wiley-Interscience Publication, 1994
# p 202, Theorem 6.6.6
# k = max [1 - S min[ P(j|s,a), p(j|s',a')] ]
# s,a,s',a' j
k = 0
h = _np.zeros(self.S)
for ss in range(self.S):
PP = _np.zeros((self.A, self.S))
for aa in range(self.A):
try:
PP[aa] = self.P[aa][:, ss]
except ValueError:
PP[aa] = self.P[aa][:, ss].todense().A1
# minimum of the entire array.
h[ss] = PP.min()
k = 1 - h.sum()
Vprev = self.V
null, value = self._bellmanOperator()
# p 201, Proposition 6.6.5
span = _util.getSpan(value - Vprev)
max_iter = (_math.log((epsilon * (1 - self.discount) / self.discount) /
span ) / _math.log(self.discount * k))
#self.V = Vprev
self.max_iter = int(_math.ceil(max_iter))
def run(self):
# Run the value iteration algorithm.
if self.verbose:
print(' Iteration\t\tV-variation')
self.time = _time.time()
while True:
self.iter += 1
Vprev = self.V.copy()
# Bellman Operator: compute policy and value functions
self.policy, self.V = self._bellmanOperator()
# The values, based on Q. For the function "max()": the option
# "axis" means the axis along which to operate. In this case it
# finds the maximum of the the rows. (Operates along the columns?)
variation = _util.getSpan(self.V - Vprev)
if self.verbose:
print((" %s\t\t %s" % (self.iter, variation)))
if variation < self.thresh:
if self.verbose:
print(_util._MSG_STOP_EPSILON_OPTIMAL_POLICY)
break
elif self.iter == self.max_iter:
if self.verbose:
print(_util._MSG_STOP_MAX_ITER)
break
# store value and policy as tuples
self.V = tuple(self.V.tolist())
self.policy = tuple(self.policy.tolist())
self.time = _time.time() - self.time
# -*- coding: utf-8 -*-
"""
Created on Fri Jan 9 13:51:08 2015
@author: steve
"""
import time as _time
import numpy as _np
import mdptoolbox.util as _util
from mdptoolbox.value_iteration import MDP, ValueIteration
class ValueIterationGS(ValueIteration):
"""
A discounted MDP solved using the value iteration Gauss-Seidel algorithm.
Parameters
----------
transitions : array
Transition probability matrices. See the documentation for the ``MDP``
class for details.
reward : array
Reward matrices or vectors. See the documentation for the ``MDP`` class
for details.
discount : float
Discount factor. See the documentation for the ``MDP`` class for
details.
epsilon : float, optional
Stopping criterion. See the documentation for the ``MDP`` class for
details. Default: 0.01.
max_iter : int, optional
Maximum number of iterations. See the documentation for the ``MDP``
and ``ValueIteration`` classes for details. Default: computed.
initial_value : array, optional
The starting value function. Default: a vector of zeros.
Data Attribues
--------------
policy : tuple
epsilon-optimal policy
iter : int
number of done iterations
time : float
used CPU time
Notes
-----
In verbose mode, at each iteration, displays the variation of V
and the condition which stopped iterations: epsilon-optimum policy found
or maximum number of iterations reached.
Examples
--------
>>> import mdptoolbox.example, numpy as np
>>> P, R = mdptoolbox.example.forest()
>>> vigs = mdptoolbox.mdp.ValueIterationGS(P, R, 0.9)
>>> vigs.run()
>>> expected = (25.5833879767579, 28.830654635546928, 32.83065463554693)
>>> all(expected[k] - vigs.V[k] < 1e-12 for k in range(len(expected)))
True
>>> vigs.policy
(0, 0, 0)
"""
def __init__(self, transitions, reward, discount, epsilon=0.01,
max_iter=10, initial_value=0):
# Initialise a value iteration Gauss-Seidel MDP.
MDP.__init__(self, transitions, reward, discount, epsilon, max_iter)
# initialization of optional arguments
if initial_value == 0:
self.V = _np.zeros(self.S)
else:
if len(initial_value) != self.S:
raise ValueError("The initial value must be a vector of "
"length S.")
else:
try:
self.V = initial_value.reshape(self.S)
except AttributeError:
self.V = _np.array(initial_value)
except:
raise
if self.discount < 1:
# compute a bound for the number of iterations and update the
# stored value of self.max_iter
self._boundIter(epsilon)
# computation of threshold of variation for V for an epsilon-
# optimal policy
self.thresh = epsilon * (1 - self.discount) / self.discount
else: # discount == 1
# threshold of variation for V for an epsilon-optimal policy
self.thresh = epsilon
def run(self):
# Run the value iteration Gauss-Seidel algorithm.
done = False
if self.verbose:
print(' Iteration\t\tV-variation')
self.time = _time.time()
while not done:
self.iter += 1
Vprev = self.V.copy()
for s in range(self.S):
Q = [float(self.R[a][s]+
self.discount * self.P[a][s, :].dot(self.V))
for a in range(self.A)]
self.V[s] = max(Q)
variation = _util.getSpan(self.V - Vprev)
if self.verbose:
print((" %s\t\t %s" % (self.iter, variation)))
if variation < self.thresh:
done = True
if self.verbose:
print(_util._MSG_STOP_EPSILON_OPTIMAL_POLICY)
elif self.iter == self.max_iter:
done = True
if self.verbose:
print(_util._MSG_STOP_MAX_ITER)
self.policy = []
for s in range(self.S):
Q = _np.zeros(self.A)
for a in range(self.A):
Q[a] = self.R[a][s] + self.discount * self.P[a][s, :].dot(self.V)
self.V[s] = Q.max()
self.policy.append(int(Q.argmax()))
self.time = _time.time() - self.time
self.V = tuple(self.V.tolist())
self.policy = tuple(self.policy)