...

Commits (2)
 # -*- coding: utf-8 -*- """ Created on Fri Jan 9 13:41:56 2015 @author: steve """ import time as _time import numpy as _np from mdptoolbox.mdp_base import MDP class FiniteHorizon(MDP): """A MDP solved using the finite-horizon backwards induction algorithm. Parameters ---------- transitions : array Transition probability matrices. See the documentation for the ``MDP`` class for details. reward : array Reward matrices or vectors. See the documentation for the ``MDP`` class for details. discount : float Discount factor. See the documentation for the ``MDP`` class for details. N : int Number of periods. Must be greater than 0. h : array, optional Terminal reward. Default: a vector of zeros. Data Attributes --------------- V : array Optimal value function. Shape = (S, N+1). ``V[:, n]`` = optimal value function at stage ``n`` with stage in {0, 1...N-1}. ``V[:, N]`` value function for terminal stage. policy : array Optimal policy. ``policy[:, n]`` = optimal policy at stage ``n`` with stage in {0, 1...N}. ``policy[:, N]`` = policy for stage ``N``. time : float used CPU time Notes ----- In verbose mode, displays the current stage and policy transpose. Examples -------- >>> import mdptoolbox, mdptoolbox.example >>> P, R = mdptoolbox.example.forest() >>> fh = mdptoolbox.mdp.FiniteHorizon(P, R, 0.9, 3) >>> fh.run() >>> fh.V array([[ 2.6973, 0.81 , 0. , 0. ], [ 5.9373, 3.24 , 1. , 0. ], [ 9.9373, 7.24 , 4. , 0. ]]) >>> fh.policy array([[0, 0, 0], [0, 0, 1], [0, 0, 0]]) """ def __init__(self, transitions, reward, discount, N, h=None): # Initialise a finite horizon MDP. self.N = int(N) assert self.N > 0, "N must be greater than 0." # Initialise the base class MDP.__init__(self, transitions, reward, discount, None, None) # remove the iteration counter, it is not meaningful for backwards # induction del self.iter # There are value vectors for each time step up to the horizon self.V = _np.zeros((self.S, N + 1)) # There are policy vectors for each time step before the horizon, when # we reach the horizon we don't need to make decisions anymore. self.policy = _np.empty((self.S, N), dtype=int) # Set the reward for the final transition to h, if specified. if h is not None: self.V[:, N] = h def run(self): # Run the finite horizon algorithm. self.time = _time.time() # loop through each time period for n in range(self.N): W, X = self._bellmanOperator(self.V[:, self.N - n]) stage = self.N - n - 1 self.V[:, stage] = X self.policy[:, stage] = W if self.verbose: print(("stage: %s, policy: %s") % ( stage, self.policy[:, stage].tolist())) # update time spent running self.time = _time.time() - self.time # After this we could create a tuple of tuples for the values and # policies. #self.V = tuple(tuple(self.V[:, n].tolist()) for n in range(self.N)) #self.policy = tuple(tuple(self.policy[:, n].tolist()) # for n in range(self.N))
 # -*- coding: utf-8 -*- """ """ import time as _time import numpy as _np import scipy.sparse as _sp from mdptoolbox.mdp_base import MDP class LP(MDP): """A discounted MDP soloved using linear programming. This class requires the Python ``cvxopt`` module to be installed. Arguments --------- transitions : array Transition probability matrices. See the documentation for the ``MDP`` class for details. reward : array Reward matrices or vectors. See the documentation for the ``MDP`` class for details. discount : float Discount factor. See the documentation for the ``MDP`` class for details. h : array, optional Terminal reward. Default: a vector of zeros. Data Attributes --------------- V : tuple optimal values policy : tuple optimal policy time : float used CPU time Examples -------- >>> import mdptoolbox.example >>> P, R = mdptoolbox.example.forest() >>> lp = mdptoolbox.mdp.LP(P, R, 0.9) >>> lp.run() >>> import numpy, mdptoolbox >>> P = numpy.array((((0.5, 0.5), (0.8, 0.2)), ((0, 1), (0.1, 0.9)))) >>> R = numpy.array(((5, 10), (-1, 2))) >>> lp = mdptoolbox.mdp.LP(P, R, 0.9) >>> lp.run() >>> #lp.policy #FIXME: gives (1, 1), should be (1, 0) """ def __init__(self, transitions, reward, discount): # Initialise a linear programming MDP. # import some functions from cvxopt and set them as object methods try: from cvxopt import matrix, solvers self._linprog = solvers.lp self._cvxmat = matrix except ImportError: raise ImportError("The python module cvxopt is required to use " "linear programming functionality.") # initialise the MDP. epsilon and max_iter are not needed MDP.__init__(self, transitions, reward, discount, None, None) # Set the cvxopt solver to be quiet by default, but ... # this doesn't do what I want it to do c.f. issue #3 if not self.verbose: solvers.options['show_progress'] = False def run(self): #Run the linear programming algorithm. self.time = _time.time() # The objective is to resolve : min V / V >= PR + discount*P*V # The function linprog of the optimisation Toolbox of Mathworks # resolves : # min f'* x / M * x <= b # So the objective could be expressed as : # min V / (discount*P-I) * V <= - PR # To avoid loop on states, the matrix M is structured following actions # M(A*S,S) f = self._cvxmat(_np.ones((self.S, 1))) h = _np.array(self.R).reshape(self.S * self.A, 1, order="F") h = self._cvxmat(h, tc='d') M = _np.zeros((self.A * self.S, self.S)) for aa in range(self.A): pos = (aa + 1) * self.S M[(pos - self.S):pos, :] = ( self.discount * self.P[aa] - _sp.eye(self.S, self.S)) M = self._cvxmat(M) # Using the glpk option will make this behave more like Octave # (Octave uses glpk) and perhaps Matlab. If solver=None (ie using the # default cvxopt solver) then V agrees with the Octave equivalent # only to 10e-8 places. This assumes glpk is installed of course. self.V = _np.array(self._linprog(f, M, -h)['x']).reshape(self.S) # apply the Bellman operator self.policy, self.V = self._bellmanOperator() # update the time spent solving self.time = _time.time() - self.time # store value and policy as tuples self.V = tuple(self.V.tolist()) self.policy = tuple(self.policy.tolist())
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
 # -*- coding: utf-8 -*- """ Created on Fri Jan 9 13:47:35 2015 @author: steve """ import time as _time import numpy as _np import mdptoolbox.util as _util from mdptoolbox.policy_iteration import PolicyIteration class PolicyIterationModified(PolicyIteration): """A discounted MDP solved using a modifified policy iteration algorithm. Arguments --------- transitions : array Transition probability matrices. See the documentation for the ``MDP`` class for details. reward : array Reward matrices or vectors. See the documentation for the ``MDP`` class for details. discount : float Discount factor. See the documentation for the ``MDP`` class for details. epsilon : float, optional Stopping criterion. See the documentation for the ``MDP`` class for details. Default: 0.01. max_iter : int, optional Maximum number of iterations. See the documentation for the ``MDP`` class for details. Default is 10. Data Attributes --------------- V : tuple value function policy : tuple optimal policy iter : int number of done iterations time : float used CPU time Examples -------- >>> import mdptoolbox, mdptoolbox.example >>> P, R = mdptoolbox.example.forest() >>> pim = mdptoolbox.mdp.PolicyIterationModified(P, R, 0.9) >>> pim.run() >>> pim.policy (0, 0, 0) >>> expected = (21.81408652334702, 25.054086523347017, 29.054086523347017) >>> all(expected[k] - pim.V[k] < 1e-12 for k in range(len(expected))) True """ def __init__(self, transitions, reward, discount, epsilon=0.01, max_iter=10): # Initialise a (modified) policy iteration MDP. # Maybe its better not to subclass from PolicyIteration, because the # initialisation of the two are quite different. eg there is policy0 # being calculated here which doesn't need to be. The only thing that # is needed from the PolicyIteration class is the _evalPolicyIterative # function. Perhaps there is a better way to do it? PolicyIteration.__init__(self, transitions, reward, discount, None, max_iter, 1) # PolicyIteration doesn't pass epsilon to MDP.__init__() so we will # check it here self.epsilon = float(epsilon) assert epsilon > 0, "'epsilon' must be greater than 0." # computation of threshold of variation for V for an epsilon-optimal # policy if self.discount != 1: self.thresh = self.epsilon * (1 - self.discount) / self.discount else: self.thresh = self.epsilon if self.discount == 1: self.V = _np.zeros(self.S) else: Rmin = min(R.min() for R in self.R) self.V = 1 / (1 - self.discount) * Rmin * _np.ones((self.S,)) def run(self): # Run the modified policy iteration algorithm. if self.verbose: print(' \tIteration\t\tV-variation') self.time = _time.time() done = False while not done: self.iter += 1 self.policy, Vnext = self._bellmanOperator() #[Ppolicy, PRpolicy] = mdp_computePpolicyPRpolicy(P, PR, policy); variation = _util.getSpan(Vnext - self.V) if self.verbose: print((" %s\t\t %s" % (self.iter, variation))) self.V = Vnext if variation < self.thresh: done = True else: is_verbose = False if self.verbose: self.setSilent() is_verbose = True self._evalPolicyIterative(self.V, self.epsilon, self.max_iter) if is_verbose: self.setVerbose() self.time = _time.time() - self.time # store value and policy as tuples self.V = tuple(self.V.tolist()) self.policy = tuple(self.policy.tolist())
 # -*- coding: utf-8 -*- """ """ import math as _math import time as _time import numpy as _np import mdptoolbox.util as _util from mdptoolbox.mdp_base import MDP class QLearning(MDP): """A discounted MDP solved using the Q learning algorithm. Parameters ---------- transitions : array Transition probability matrices. See the documentation for the ``MDP`` class for details. reward : array Reward matrices or vectors. See the documentation for the ``MDP`` class for details. discount : float Discount factor. See the documentation for the ``MDP`` class for details. n_iter : int, optional Number of iterations to execute. This is ignored unless it is an integer greater than the default value. Defaut: 10,000. Data Attributes --------------- Q : array learned Q matrix (SxA) V : tuple learned value function (S). policy : tuple learned optimal policy (S). mean_discrepancy : array Vector of V discrepancy mean over 100 iterations. Then the length of this vector for the default value of N is 100 (N/100). Examples --------- >>> # These examples are reproducible only if random seed is set to 0 in >>> # both the random and numpy.random modules. >>> import numpy as np >>> import mdptoolbox, mdptoolbox.example >>> np.random.seed(0) >>> P, R = mdptoolbox.example.forest() >>> ql = mdptoolbox.mdp.QLearning(P, R, 0.96) >>> ql.run() >>> ql.Q array([[ 11.198909 , 10.34652034], [ 10.74229967, 11.74105792], [ 2.86980001, 12.25973286]]) >>> expected = (11.198908998901134, 11.741057920409865, 12.259732864170232) >>> all(expected[k] - ql.V[k] < 1e-12 for k in range(len(expected))) True >>> ql.policy (0, 1, 1) >>> import mdptoolbox >>> import numpy as np >>> P = np.array([[[0.5, 0.5],[0.8, 0.2]],[[0, 1],[0.1, 0.9]]]) >>> R = np.array([[5, 10], [-1, 2]]) >>> np.random.seed(0) >>> ql = mdptoolbox.mdp.QLearning(P, R, 0.9) >>> ql.run() >>> ql.Q array([[ 33.33010866, 40.82109565], [ 34.37431041, 29.67236845]]) >>> expected = (40.82109564847122, 34.37431040682546) >>> all(expected[k] - ql.V[k] < 1e-12 for k in range(len(expected))) True >>> ql.policy (1, 0) """ def __init__(self, transitions, reward, discount, n_iter=10000): # Initialise a Q-learning MDP. # The following check won't be done in MDP()'s initialisation, so let's # do it here self.max_iter = int(n_iter) assert self.max_iter >= 10000, "'n_iter' should be greater than 10000." # We don't want to send this to MDP because _computePR should not be # run on it, so check that it defines an MDP _util.check(transitions, reward) # Store P, S, and A self._computeP(transitions) self.R = reward self.discount = discount # Initialisations self.Q = _np.zeros((self.S, self.A)) self.mean_discrepancy = [] def run(self): # Run the Q-learning algoritm. discrepancy = [] self.time = _time.time() # initial state choice s = _np.random.randint(0, self.S) for n in range(1, self.max_iter + 1): # Reinitialisation of trajectories every 100 transitions if (n % 100) == 0: s = _np.random.randint(0, self.S) # Action choice : greedy with increasing probability # probability 1-(1/log(n+2)) can be changed pn = _np.random.random() if pn < (1 - (1 / _math.log(n + 2))): # optimal_action = self.Q[s, :].max() a = self.Q[s, :].argmax() else: a = _np.random.randint(0, self.A) # Simulating next state s_new and reward associated to p_s_new = _np.random.random() p = 0 s_new = -1 while (p < p_s_new) and (s_new < (self.S - 1)): s_new = s_new + 1 p = p + self.P[a][s, s_new] try: r = self.R[a][s, s_new] except IndexError: try: r = self.R[s, a] except IndexError: r = self.R[s] # Updating the value of Q # Decaying update coefficient (1/sqrt(n+2)) can be changed delta = r + self.discount * self.Q[s_new, :].max() - self.Q[s, a] dQ = (1 / _math.sqrt(n + 2)) * delta self.Q[s, a] = self.Q[s, a] + dQ # current state is updated s = s_new # Computing and saving maximal values of the Q variation discrepancy.append(_np.absolute(dQ)) # Computing means all over maximal Q variations values if len(discrepancy) == 100: self.mean_discrepancy.append(_np.mean(discrepancy)) discrepancy = [] # compute the value function and the policy self.V = self.Q.max(axis=1) self.policy = self.Q.argmax(axis=1) self.time = _time.time() - self.time # convert V and policy to tuples self.V = tuple(self.V.tolist()) self.policy = tuple(self.policy.tolist())
 # -*- coding: utf-8 -*- """ """ import time as _time import numpy as _np import mdptoolbox.util as _util from mdptoolbox.mdp_base import MDP class RelativeValueIteration(MDP): """A MDP solved using the relative value iteration algorithm. Arguments --------- transitions : array Transition probability matrices. See the documentation for the ``MDP`` class for details. reward : array Reward matrices or vectors. See the documentation for the ``MDP`` class for details. epsilon : float, optional Stopping criterion. See the documentation for the ``MDP`` class for details. Default: 0.01. max_iter : int, optional Maximum number of iterations. See the documentation for the ``MDP`` class for details. Default: 1000. Data Attributes --------------- policy : tuple epsilon-optimal policy average_reward : tuple average reward of the optimal policy cpu_time : float used CPU time Notes ----- In verbose mode, at each iteration, displays the span of U variation and the condition which stopped iterations : epsilon-optimum policy found or maximum number of iterations reached. Examples -------- >>> import mdptoolbox, mdptoolbox.example >>> P, R = mdptoolbox.example.forest() >>> rvi = mdptoolbox.mdp.RelativeValueIteration(P, R) >>> rvi.run() >>> rvi.average_reward 3.2399999999999993 >>> rvi.policy (0, 0, 0) >>> rvi.iter 4 >>> import mdptoolbox >>> import numpy as np >>> P = np.array([[[0.5, 0.5],[0.8, 0.2]],[[0, 1],[0.1, 0.9]]]) >>> R = np.array([[5, 10], [-1, 2]]) >>> rvi = mdptoolbox.mdp.RelativeValueIteration(P, R) >>> rvi.run() >>> expected = (10.0, 3.885235246411831) >>> all(expected[k] - rvi.V[k] < 1e-12 for k in range(len(expected))) True >>> rvi.average_reward 3.8852352464118312 >>> rvi.policy (1, 0) >>> rvi.iter 29 """ def __init__(self, transitions, reward, epsilon=0.01, max_iter=1000): # Initialise a relative value iteration MDP. MDP.__init__(self, transitions, reward, None, epsilon, max_iter) self.epsilon = epsilon self.discount = 1 self.V = _np.zeros(self.S) self.gain = 0 # self.U[self.S] self.average_reward = None def run(self): # Run the relative value iteration algorithm. done = False if self.verbose: print(' Iteration\t\tU variation') self.time = _time.time() while not done: self.iter += 1 self.policy, Vnext = self._bellmanOperator() Vnext = Vnext - self.gain variation = _util.getSpan(Vnext - self.V) if self.verbose: print((" %s\t\t %s" % (self.iter, variation))) if variation < self.epsilon: done = True self.average_reward = self.gain + (Vnext - self.V).min() if self.verbose: print(_util._MSG_STOP_EPSILON_OPTIMAL_POLICY) elif self.iter == self.max_iter: done = True self.average_reward = self.gain + (Vnext - self.V).min() if self.verbose: print(_util._MSG_STOP_MAX_ITER) self.V = Vnext self.gain = float(self.V[self.S - 1]) self.time = _time.time() - self.time # store value and policy as tuples self.V = tuple(self.V.tolist()) self.policy = tuple(self.policy.tolist())
 ... ... @@ -56,6 +56,14 @@ import numpy as _np import mdptoolbox.error as _error _MSG_STOP_MAX_ITER = "Iterating stopped due to maximum number of iterations " \ "condition." _MSG_STOP_EPSILON_OPTIMAL_POLICY = "Iterating stopped, epsilon-optimal " \ "policy found." _MSG_STOP_EPSILON_OPTIMAL_VALUE = "Iterating stopped, epsilon-optimal value " \ "function found." _MSG_STOP_UNCHANGING_POLICY = "Iterating stopped, unchanging policy found." _MDPERR = { "mat_nonneg" : "Transition probabilities must be non-negative.", ... ...
 # -*- coding: utf-8 -*- """ """ import math as _math import time as _time import numpy as _np import mdptoolbox.util as _util from mdptoolbox.mdp_base import MDP class ValueIteration(MDP): """A discounted MDP solved using the value iteration algorithm. Description ----------- ValueIteration applies the value iteration algorithm to solve a discounted MDP. The algorithm consists of solving Bellman's equation iteratively. Iteration is stopped when an epsilon-optimal policy is found or after a specified number (``max_iter``) of iterations. This function uses verbose and silent modes. In verbose mode, the function displays the variation of ``V`` (the value function) for each iteration and the condition which stopped the iteration: epsilon-policy found or maximum number of iterations reached. Parameters ---------- transitions : array Transition probability matrices. See the documentation for the ``MDP`` class for details. reward : array Reward matrices or vectors. See the documentation for the ``MDP`` class for details. discount : float Discount factor. See the documentation for the ``MDP`` class for details. epsilon : float, optional Stopping criterion. See the documentation for the ``MDP`` class for details. Default: 0.01. max_iter : int, optional Maximum number of iterations. If the value given is greater than a computed bound, a warning informs that the computed bound will be used instead. By default, if ``discount`` is not equal to 1, a bound for ``max_iter`` is computed, otherwise ``max_iter`` = 1000. See the documentation for the ``MDP`` class for further details. initial_value : array, optional The starting value function. Default: a vector of zeros. Data Attributes --------------- V : tuple The optimal value function. policy : tuple The optimal policy function. Each element is an integer corresponding to an action which maximises the value function in that state. iter : int The number of iterations taken to complete the computation. time : float The amount of CPU time used to run the algorithm. Methods ------- run() Do the algorithm iteration. setSilent() Sets the instance to silent mode. setVerbose() Sets the instance to verbose mode. Notes ----- In verbose mode, at each iteration, displays the variation of V and the condition which stopped iterations: epsilon-optimum policy found or maximum number of iterations reached. Examples -------- >>> import mdptoolbox, mdptoolbox.example >>> P, R = mdptoolbox.example.forest() >>> vi = mdptoolbox.mdp.ValueIteration(P, R, 0.96) >>> vi.verbose False >>> vi.run() >>> expected = (5.93215488, 9.38815488, 13.38815488) >>> all(expected[k] - vi.V[k] < 1e-12 for k in range(len(expected))) True >>> vi.policy (0, 0, 0) >>> vi.iter 4 >>> import mdptoolbox >>> import numpy as np >>> P = np.array([[[0.5, 0.5],[0.8, 0.2]],[[0, 1],[0.1, 0.9]]]) >>> R = np.array([[5, 10], [-1, 2]]) >>> vi = mdptoolbox.mdp.ValueIteration(P, R, 0.9) >>> vi.run() >>> expected = (40.048625392716815, 33.65371175967546) >>> all(expected[k] - vi.V[k] < 1e-12 for k in range(len(expected))) True >>> vi.policy (1, 0) >>> vi.iter 26 >>> import mdptoolbox >>> import numpy as np >>> from scipy.sparse import csr_matrix as sparse >>> P = [None] * 2 >>> P[0] = sparse([[0.5, 0.5],[0.8, 0.2]]) >>> P[1] = sparse([[0, 1],[0.1, 0.9]]) >>> R = np.array([[5, 10], [-1, 2]]) >>> vi = mdptoolbox.mdp.ValueIteration(P, R, 0.9) >>> vi.run() >>> expected = (40.048625392716815, 33.65371175967546) >>> all(expected[k] - vi.V[k] < 1e-12 for k in range(len(expected))) True >>> vi.policy (1, 0) """ def __init__(self, transitions, reward, discount, epsilon=0.01, max_iter=1000, initial_value=0): # Initialise a value iteration MDP. MDP.__init__(self, transitions, reward, discount, epsilon, max_iter) # initialization of optional arguments if initial_value == 0: self.V = _np.zeros(self.S) else: assert len(initial_value) == self.S, "The initial value must be " \ "a vector of length S." self.V = _np.array(initial_value).reshape(self.S) if self.discount < 1: # compute a bound for the number of iterations and update the # stored value of self.max_iter self._boundIter(epsilon) # computation of threshold of variation for V for an epsilon- # optimal policy self.thresh = epsilon * (1 - self.discount) / self.discount else: # discount == 1 # threshold of variation for V for an epsilon-optimal policy self.thresh = epsilon def _boundIter(self, epsilon): # Compute a bound for the number of iterations. # # for the value iteration # algorithm to find an epsilon-optimal policy with use of span for the # stopping criterion # # Arguments ----------------------------------------------------------- # Let S = number of states, A = number of actions # epsilon = |V - V*| < epsilon, upper than 0, # optional (default : 0.01) # Evaluation ---------------------------------------------------------- # max_iter = bound of the number of iterations for the value # iteration algorithm to find an epsilon-optimal policy with use of # span for the stopping criterion # cpu_time = used CPU time # # See Markov Decision Processes, M. L. Puterman, # Wiley-Interscience Publication, 1994 # p 202, Theorem 6.6.6 # k = max [1 - S min[ P(j|s,a), p(j|s',a')] ] # s,a,s',a' j k = 0 h = _np.zeros(self.S) for ss in range(self.S): PP = _np.zeros((self.A, self.S)) for aa in range(self.A): try: PP[aa] = self.P[aa][:, ss] except ValueError: PP[aa] = self.P[aa][:, ss].todense().A1 # minimum of the entire array. h[ss] = PP.min() k = 1 - h.sum() Vprev = self.V null, value = self._bellmanOperator() # p 201, Proposition 6.6.5 span = _util.getSpan(value - Vprev) max_iter = (_math.log((epsilon * (1 - self.discount) / self.discount) / span ) / _math.log(self.discount * k)) #self.V = Vprev self.max_iter = int(_math.ceil(max_iter)) def run(self): # Run the value iteration algorithm. if self.verbose: print(' Iteration\t\tV-variation') self.time = _time.time() while True: self.iter += 1 Vprev = self.V.copy() # Bellman Operator: compute policy and value functions self.policy, self.V = self._bellmanOperator() # The values, based on Q. For the function "max()": the option # "axis" means the axis along which to operate. In this case it # finds the maximum of the the rows. (Operates along the columns?) variation = _util.getSpan(self.V - Vprev) if self.verbose: print((" %s\t\t %s" % (self.iter, variation))) if variation < self.thresh: if self.verbose: print(_util._MSG_STOP_EPSILON_OPTIMAL_POLICY) break elif self.iter == self.max_iter: if self.verbose: print(_util._MSG_STOP_MAX_ITER) break # store value and policy as tuples self.V = tuple(self.V.tolist()) self.policy = tuple(self.policy.tolist()) self.time = _time.time() - self.time
 # -*- coding: utf-8 -*- """ Created on Fri Jan 9 13:51:08 2015 @author: steve """ import time as _time import numpy as _np import mdptoolbox.util as _util from mdptoolbox.value_iteration import MDP, ValueIteration class ValueIterationGS(ValueIteration): """ A discounted MDP solved using the value iteration Gauss-Seidel algorithm. Parameters ---------- transitions : array Transition probability matrices. See the documentation for the ``MDP`` class for details. reward : array Reward matrices or vectors. See the documentation for the ``MDP`` class for details. discount : float Discount factor. See the documentation for the ``MDP`` class for details. epsilon : float, optional Stopping criterion. See the documentation for the ``MDP`` class for details. Default: 0.01. max_iter : int, optional Maximum number of iterations. See the documentation for the ``MDP`` and ``ValueIteration`` classes for details. Default: computed. initial_value : array, optional The starting value function. Default: a vector of zeros. Data Attribues -------------- policy : tuple epsilon-optimal policy iter : int number of done iterations time : float used CPU time Notes ----- In verbose mode, at each iteration, displays the variation of V and the condition which stopped iterations: epsilon-optimum policy found or maximum number of iterations reached. Examples -------- >>> import mdptoolbox.example, numpy as np >>> P, R = mdptoolbox.example.forest() >>> vigs = mdptoolbox.mdp.ValueIterationGS(P, R, 0.9) >>> vigs.run() >>> expected = (25.5833879767579, 28.830654635546928, 32.83065463554693) >>> all(expected[k] - vigs.V[k] < 1e-12 for k in range(len(expected))) True >>> vigs.policy (0, 0, 0) """ def __init__(self, transitions, reward, discount, epsilon=0.01, max_iter=10, initial_value=0): # Initialise a value iteration Gauss-Seidel MDP. MDP.__init__(self, transitions, reward, discount, epsilon, max_iter) # initialization of optional arguments if initial_value == 0: self.V = _np.zeros(self.S) else: if len(initial_value) != self.S: raise ValueError("The initial value must be a vector of " "length S.") else: try: self.V = initial_value.reshape(self.S) except AttributeError: self.V = _np.array(initial_value) except: raise if self.discount < 1: # compute a bound for the number of iterations and update the # stored value of self.max_iter self._boundIter(epsilon) # computation of threshold of variation for V for an epsilon- # optimal policy self.thresh = epsilon * (1 - self.discount) / self.discount else: # discount == 1 # threshold of variation for V for an epsilon-optimal policy self.thresh = epsilon def run(self): # Run the value iteration Gauss-Seidel algorithm. done = False if self.verbose: print(' Iteration\t\tV-variation') self.time = _time.time() while not done: self.iter += 1 Vprev = self.V.copy() for s in range(self.S): Q = [float(self.R[a][s]+ self.discount * self.P[a][s, :].dot(self.V)) for a in range(self.A)] self.V[s] = max(Q) variation = _util.getSpan(self.V - Vprev) if self.verbose: print((" %s\t\t %s" % (self.iter, variation))) if variation < self.thresh: done = True if self.verbose: print(_util._MSG_STOP_EPSILON_OPTIMAL_POLICY) elif self.iter == self.max_iter: done = True if self.verbose: print(_util._MSG_STOP_MAX_ITER) self.policy = [] for s in range(self.S): Q = _np.zeros(self.A) for a in range(self.A): Q[a] = self.R[a][s] + self.discount * self.P[a][s, :].dot(self.V) self.V[s] = Q.max() self.policy.append(int(Q.argmax())) self.time = _time.time() - self.time self.V = tuple(self.V.tolist()) self.policy = tuple(self.policy)