# -*- coding: utf-8 -*- """ """ import math as _math import time as _time import numpy as _np import mdptoolbox.util as _util from mdptoolbox.mdp_base import MDP class ValueIteration(MDP): """A discounted MDP solved using the value iteration algorithm. Description ----------- ValueIteration applies the value iteration algorithm to solve a discounted MDP. The algorithm consists of solving Bellman's equation iteratively. Iteration is stopped when an epsilon-optimal policy is found or after a specified number (``max_iter``) of iterations. This function uses verbose and silent modes. In verbose mode, the function displays the variation of ``V`` (the value function) for each iteration and the condition which stopped the iteration: epsilon-policy found or maximum number of iterations reached. Parameters ---------- transitions : array Transition probability matrices. See the documentation for the ``MDP`` class for details. reward : array Reward matrices or vectors. See the documentation for the ``MDP`` class for details. discount : float Discount factor. See the documentation for the ``MDP`` class for details. epsilon : float, optional Stopping criterion. See the documentation for the ``MDP`` class for details. Default: 0.01. max_iter : int, optional Maximum number of iterations. If the value given is greater than a computed bound, a warning informs that the computed bound will be used instead. By default, if ``discount`` is not equal to 1, a bound for ``max_iter`` is computed, otherwise ``max_iter`` = 1000. See the documentation for the ``MDP`` class for further details. initial_value : array, optional The starting value function. Default: a vector of zeros. Data Attributes --------------- V : tuple The optimal value function. policy : tuple The optimal policy function. Each element is an integer corresponding to an action which maximises the value function in that state. iter : int The number of iterations taken to complete the computation. time : float The amount of CPU time used to run the algorithm. Methods ------- run() Do the algorithm iteration. setSilent() Sets the instance to silent mode. setVerbose() Sets the instance to verbose mode. Notes ----- In verbose mode, at each iteration, displays the variation of V and the condition which stopped iterations: epsilon-optimum policy found or maximum number of iterations reached. Examples -------- >>> import mdptoolbox, mdptoolbox.example >>> P, R = mdptoolbox.example.forest() >>> vi = mdptoolbox.mdp.ValueIteration(P, R, 0.96) >>> vi.verbose False >>> vi.run() >>> expected = (5.93215488, 9.38815488, 13.38815488) >>> all(expected[k] - vi.V[k] < 1e-12 for k in range(len(expected))) True >>> vi.policy (0, 0, 0) >>> vi.iter 4 >>> import mdptoolbox >>> import numpy as np >>> P = np.array([[[0.5, 0.5],[0.8, 0.2]],[[0, 1],[0.1, 0.9]]]) >>> R = np.array([[5, 10], [-1, 2]]) >>> vi = mdptoolbox.mdp.ValueIteration(P, R, 0.9) >>> vi.run() >>> expected = (40.048625392716815, 33.65371175967546) >>> all(expected[k] - vi.V[k] < 1e-12 for k in range(len(expected))) True >>> vi.policy (1, 0) >>> vi.iter 26 >>> import mdptoolbox >>> import numpy as np >>> from scipy.sparse import csr_matrix as sparse >>> P = [None] * 2 >>> P[0] = sparse([[0.5, 0.5],[0.8, 0.2]]) >>> P[1] = sparse([[0, 1],[0.1, 0.9]]) >>> R = np.array([[5, 10], [-1, 2]]) >>> vi = mdptoolbox.mdp.ValueIteration(P, R, 0.9) >>> vi.run() >>> expected = (40.048625392716815, 33.65371175967546) >>> all(expected[k] - vi.V[k] < 1e-12 for k in range(len(expected))) True >>> vi.policy (1, 0) """ def __init__(self, transitions, reward, discount, epsilon=0.01, max_iter=1000, initial_value=0): # Initialise a value iteration MDP. MDP.__init__(self, transitions, reward, discount, epsilon, max_iter) # initialization of optional arguments if initial_value == 0: self.V = _np.zeros(self.S) else: assert len(initial_value) == self.S, "The initial value must be " \ "a vector of length S." self.V = _np.array(initial_value).reshape(self.S) if self.discount < 1: # compute a bound for the number of iterations and update the # stored value of self.max_iter self._boundIter(epsilon) # computation of threshold of variation for V for an epsilon- # optimal policy self.thresh = epsilon * (1 - self.discount) / self.discount else: # discount == 1 # threshold of variation for V for an epsilon-optimal policy self.thresh = epsilon def _boundIter(self, epsilon): # Compute a bound for the number of iterations. # # for the value iteration # algorithm to find an epsilon-optimal policy with use of span for the # stopping criterion # # Arguments ----------------------------------------------------------- # Let S = number of states, A = number of actions # epsilon = |V - V*| < epsilon, upper than 0, # optional (default : 0.01) # Evaluation ---------------------------------------------------------- # max_iter = bound of the number of iterations for the value # iteration algorithm to find an epsilon-optimal policy with use of # span for the stopping criterion # cpu_time = used CPU time # # See Markov Decision Processes, M. L. Puterman, # Wiley-Interscience Publication, 1994 # p 202, Theorem 6.6.6 # k = max [1 - S min[ P(j|s,a), p(j|s',a')] ] # s,a,s',a' j k = 0 h = _np.zeros(self.S) for ss in range(self.S): PP = _np.zeros((self.A, self.S)) for aa in range(self.A): try: PP[aa] = self.P[aa][:, ss] except ValueError: PP[aa] = self.P[aa][:, ss].todense().A1 # minimum of the entire array. h[ss] = PP.min() k = 1 - h.sum() Vprev = self.V null, value = self._bellmanOperator() # p 201, Proposition 6.6.5 span = _util.getSpan(value - Vprev) max_iter = (_math.log((epsilon * (1 - self.discount) / self.discount) / span ) / _math.log(self.discount * k)) #self.V = Vprev self.max_iter = int(_math.ceil(max_iter)) def run(self): # Run the value iteration algorithm. if self.verbose: print(' Iteration\t\tV-variation') self.time = _time.time() while True: self.iter += 1 Vprev = self.V.copy() # Bellman Operator: compute policy and value functions self.policy, self.V = self._bellmanOperator() # The values, based on Q. For the function "max()": the option # "axis" means the axis along which to operate. In this case it # finds the maximum of the the rows. (Operates along the columns?) variation = _util.getSpan(self.V - Vprev) if self.verbose: print((" %s\t\t %s" % (self.iter, variation))) if variation < self.thresh: if self.verbose: print(_util._MSG_STOP_EPSILON_OPTIMAL_POLICY) break elif self.iter == self.max_iter: if self.verbose: print(_util._MSG_STOP_MAX_ITER) break # store value and policy as tuples self.V = tuple(self.V.tolist()) self.policy = tuple(self.policy.tolist()) self.time = _time.time() - self.time