### [mdp] Break apart mdp module into smaller pieces

```Initial attempt at making the mdp module more manageable. It has too
many lines of code making it hard to work on any one algorithm.```
parent 079751c4
 # -*- coding: utf-8 -*- """ Created on Fri Jan 9 13:41:56 2015 @author: steve """ import time as _time import numpy as _np from mdptoolbox.mdp_base import MDP class FiniteHorizon(MDP): """A MDP solved using the finite-horizon backwards induction algorithm. Parameters ---------- transitions : array Transition probability matrices. See the documentation for the ``MDP`` class for details. reward : array Reward matrices or vectors. See the documentation for the ``MDP`` class for details. discount : float Discount factor. See the documentation for the ``MDP`` class for details. N : int Number of periods. Must be greater than 0. h : array, optional Terminal reward. Default: a vector of zeros. Data Attributes --------------- V : array Optimal value function. Shape = (S, N+1). ``V[:, n]`` = optimal value function at stage ``n`` with stage in {0, 1...N-1}. ``V[:, N]`` value function for terminal stage. policy : array Optimal policy. ``policy[:, n]`` = optimal policy at stage ``n`` with stage in {0, 1...N}. ``policy[:, N]`` = policy for stage ``N``. time : float used CPU time Notes ----- In verbose mode, displays the current stage and policy transpose. Examples -------- >>> import mdptoolbox, mdptoolbox.example >>> P, R = mdptoolbox.example.forest() >>> fh = mdptoolbox.mdp.FiniteHorizon(P, R, 0.9, 3) >>> fh.run() >>> fh.V array([[ 2.6973, 0.81 , 0. , 0. ], [ 5.9373, 3.24 , 1. , 0. ], [ 9.9373, 7.24 , 4. , 0. ]]) >>> fh.policy array([[0, 0, 0], [0, 0, 1], [0, 0, 0]]) """ def __init__(self, transitions, reward, discount, N, h=None): # Initialise a finite horizon MDP. self.N = int(N) assert self.N > 0, "N must be greater than 0." # Initialise the base class MDP.__init__(self, transitions, reward, discount, None, None) # remove the iteration counter, it is not meaningful for backwards # induction del self.iter # There are value vectors for each time step up to the horizon self.V = _np.zeros((self.S, N + 1)) # There are policy vectors for each time step before the horizon, when # we reach the horizon we don't need to make decisions anymore. self.policy = _np.empty((self.S, N), dtype=int) # Set the reward for the final transition to h, if specified. if h is not None: self.V[:, N] = h def run(self): # Run the finite horizon algorithm. self.time = _time.time() # loop through each time period for n in range(self.N): W, X = self._bellmanOperator(self.V[:, self.N - n]) stage = self.N - n - 1 self.V[:, stage] = X self.policy[:, stage] = W if self.verbose: print(("stage: %s, policy: %s") % ( stage, self.policy[:, stage].tolist())) # update time spent running self.time = _time.time() - self.time # After this we could create a tuple of tuples for the values and # policies. #self.V = tuple(tuple(self.V[:, n].tolist()) for n in range(self.N)) #self.policy = tuple(tuple(self.policy[:, n].tolist()) # for n in range(self.N))
 # -*- coding: utf-8 -*- """ """ import time as _time import numpy as _np import scipy.sparse as _sp from mdptoolbox.mdp_base import MDP class LP(MDP): """A discounted MDP soloved using linear programming. This class requires the Python ``cvxopt`` module to be installed. Arguments --------- transitions : array Transition probability matrices. See the documentation for the ``MDP`` class for details. reward : array Reward matrices or vectors. See the documentation for the ``MDP`` class for details. discount : float Discount factor. See the documentation for the ``MDP`` class for details. h : array, optional Terminal reward. Default: a vector of zeros. Data Attributes --------------- V : tuple optimal values policy : tuple optimal policy time : float used CPU time Examples -------- >>> import mdptoolbox.example >>> P, R = mdptoolbox.example.forest() >>> lp = mdptoolbox.mdp.LP(P, R, 0.9) >>> lp.run() >>> import numpy, mdptoolbox >>> P = numpy.array((((0.5, 0.5), (0.8, 0.2)), ((0, 1), (0.1, 0.9)))) >>> R = numpy.array(((5, 10), (-1, 2))) >>> lp = mdptoolbox.mdp.LP(P, R, 0.9) >>> lp.run() >>> #lp.policy #FIXME: gives (1, 1), should be (1, 0) """ def __init__(self, transitions, reward, discount): # Initialise a linear programming MDP. # import some functions from cvxopt and set them as object methods try: from cvxopt import matrix, solvers self._linprog = solvers.lp self._cvxmat = matrix except ImportError: raise ImportError("The python module cvxopt is required to use " "linear programming functionality.") # initialise the MDP. epsilon and max_iter are not needed MDP.__init__(self, transitions, reward, discount, None, None) # Set the cvxopt solver to be quiet by default, but ... # this doesn't do what I want it to do c.f. issue #3 if not self.verbose: solvers.options['show_progress'] = False def run(self): #Run the linear programming algorithm. self.time = _time.time() # The objective is to resolve : min V / V >= PR + discount*P*V # The function linprog of the optimisation Toolbox of Mathworks # resolves : # min f'* x / M * x <= b # So the objective could be expressed as : # min V / (discount*P-I) * V <= - PR # To avoid loop on states, the matrix M is structured following actions # M(A*S,S) f = self._cvxmat(_np.ones((self.S, 1))) h = _np.array(self.R).reshape(self.S * self.A, 1, order="F") h = self._cvxmat(h, tc='d') M = _np.zeros((self.A * self.S, self.S)) for aa in range(self.A): pos = (aa + 1) * self.S M[(pos - self.S):pos, :] = ( self.discount * self.P[aa] - _sp.eye(self.S, self.S)) M = self._cvxmat(M) # Using the glpk option will make this behave more like Octave # (Octave uses glpk) and perhaps Matlab. If solver=None (ie using the # default cvxopt solver) then V agrees with the Octave equivalent # only to 10e-8 places. This assumes glpk is installed of course. self.V = _np.array(self._linprog(f, M, -h)['x']).reshape(self.S) # apply the Bellman operator self.policy, self.V = self._bellmanOperator() # update the time spent solving self.time = _time.time() - self.time # store value and policy as tuples self.V = tuple(self.V.tolist()) self.policy = tuple(self.policy.tolist())
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
 # -*- coding: utf-8 -*- """ Created on Fri Jan 9 13:47:35 2015 @author: steve """ import time as _time import numpy as _np import mdptoolbox.util as _util from mdptoolbox.policy_iteration import PolicyIteration class PolicyIterationModified(PolicyIteration): """A discounted MDP solved using a modifified policy iteration algorithm. Arguments --------- transitions : array Transition probability matrices. See the documentation for the ``MDP`` class for details. reward : array Reward matrices or vectors. See the documentation for the ``MDP`` class for details. discount : float Discount factor. See the documentation for the ``MDP`` class for details. epsilon : float, optional Stopping criterion. See the documentation for the ``MDP`` class for details. Default: 0.01. max_iter : int, optional Maximum number of iterations. See the documentation for the ``MDP`` class for details. Default is 10. Data Attributes --------------- V : tuple value function policy : tuple optimal policy iter : int number of done iterations time : float used CPU time Examples -------- >>> import mdptoolbox, mdptoolbox.example >>> P, R = mdptoolbox.example.forest() >>> pim = mdptoolbox.mdp.PolicyIterationModified(P, R, 0.9) >>> pim.run() >>> pim.policy (0, 0, 0) >>> expected = (21.81408652334702, 25.054086523347017, 29.054086523347017) >>> all(expected[k] - pim.V[k] < 1e-12 for k in range(len(expected))) True """ def __init__(self, transitions, reward, discount, epsilon=0.01, max_iter=10): # Initialise a (modified) policy iteration MDP. # Maybe its better not to subclass from PolicyIteration, because the # initialisation of the two are quite different. eg there is policy0 # being calculated here which doesn't need to be. The only thing that # is needed from the PolicyIteration class is the _evalPolicyIterative # function. Perhaps there is a better way to do it? PolicyIteration.__init__(self, transitions, reward, discount, None, max_iter, 1) # PolicyIteration doesn't pass epsilon to MDP.__init__() so we will # check it here self.epsilon = float(epsilon) assert epsilon > 0, "'epsilon' must be greater than 0." # computation of threshold of variation for V for an epsilon-optimal # policy if self.discount != 1: self.thresh = self.epsilon * (1 - self.discount) / self.discount else: self.thresh = self.epsilon if self.discount == 1: self.V = _np.zeros(self.S) else: Rmin = min(R.min() for R in self.R) self.V = 1 / (1 - self.discount) * Rmin * _np.ones((self.S,)) def run(self): # Run the modified policy iteration algorithm. if self.verbose: print(' \tIteration\t\tV-variation') self.time = _time.time() done = False while not done: self.iter += 1 self.policy, Vnext = self._bellmanOperator() #[Ppolicy, PRpolicy] = mdp_computePpolicyPRpolicy(P, PR, policy); variation = _util.getSpan(Vnext - self.V) if self.verbose: print((" %s\t\t %s" % (self.iter, variation))) self.V = Vnext if variation < self.thresh: done = True else: is_verbose = False if self.verbose: self.setSilent() is_verbose = True self._evalPolicyIterative(self.V, self.epsilon, self.max_iter) if is_verbose: self.setVerbose() self.time = _time.time() - self.time # store value and policy as tuples self.V = tuple(self.V.tolist()) self.policy = tuple(self.policy.tolist())
 # -*- coding: utf-8 -*- """ """ import math as _math import time as _time import numpy as _np import mdptoolbox.util as _util from mdptoolbox.mdp_base import MDP class QLearning(MDP): """A discounted MDP solved using the Q learning algorithm. Parameters ---------- transitions : array Transition probability matrices. See the documentation for the ``MDP`` class for details. reward : array Reward matrices or vectors. See the documentation for the ``MDP`` class for details. discount : float Discount factor. See the documentation for the ``MDP`` class for details. n_iter : int, optional Number of iterations to execute. This is ignored unless it is an integer greater than the default value. Defaut: 10,000. Data Attributes --------------- Q : array learned Q matrix (SxA) V : tuple learned value function (S). policy : tuple learned optimal policy (S). mean_discrepancy : array Vector of V discrepancy mean over 100 iterations. Then the length of this vector for the default value of N is 100 (N/100). Examples --------- >>> # These examples are reproducible only if random seed is set to 0 in >>> # both the random and numpy.random modules. >>> import numpy as np >>> import mdptoolbox, mdptoolbox.example >>> np.random.seed(0) >>> P, R = mdptoolbox.example.forest() >>> ql = mdptoolbox.mdp.QLearning(P, R, 0.96) >>> ql.run() >>> ql.Q array([[ 11.198909 , 10.34652034], [ 10.74229967, 11.74105792], [ 2.86980001, 12.25973286]]) >>> expected = (11.198908998901134, 11.741057920409865, 12.259732864170232) >>> all(expected[k] - ql.V[k] < 1e-12 for k in range(len(expected))) True >>> ql.policy (0, 1, 1) >>> import mdptoolbox >>> import numpy as np >>> P = np.array([[[0.5, 0.5],[0.8, 0.2]],[[0, 1],[0.1, 0.9]]]) >>> R = np.array([[5, 10], [-1, 2]]) >>> np.random.seed(0) >>> ql = mdptoolbox.mdp.QLearning(P, R, 0.9) >>> ql.run() >>> ql.Q array([[ 33.33010866, 40.82109565], [ 34.37431041, 29.67236845]]) >>> expected = (40.82109564847122, 34.37431040682546) >>> all(expected[k] - ql.V[k] < 1e-12 for k in range(len(expected))) True >>> ql.policy (1, 0) """ def __init__(self, transitions, reward, discount, n_iter=10000): # Initialise a Q-learning MDP. # The following check won't be done in MDP()'s initialisation, so let's # do it here self.max_iter = int(n_iter) assert self.max_iter >= 10000, "'n_iter' should be greater than 10000." # We don't want to send this to MDP because _computePR should not be # run on it, so check that it defines an MDP _util.check(transitions, reward) # Store P, S, and A self._computeP(transitions) self.R = reward self.discount = discount # Initialisations self.Q = _np.zeros((self.S, self.A)) self.mean_discrepancy = [] def run(self): # Run the Q-learning algoritm. discrepancy = [] self.time = _time.time() # initial state choice s = _np.random.randint(0, self.S) for n in range(1, self.max_iter + 1): # Reinitialisation of trajectories every 100 transitions if (n % 100) == 0: s = _np.random.randint(0, self.S) # Action choice : greedy with increasing probability # probability 1-(1/log(n+2)) can be changed pn = _np.random.random() if pn < (1 - (1 / _math.log(n + 2))): # optimal_action = self.Q[s, :].max() a = self.Q[s, :].argmax() else: a = _np.random.randint(0, self.A) # Simulating next state s_new and reward associated to p_s_new = _np.random.random() p = 0 s_new = -1 while (p < p_s_new) and (s_new < (self.S - 1)): s_new = s_new + 1 p = p + self.P[a][s, s_new] try: r = self.R[a][s, s_new] except IndexError: try: r = self.R[s, a] except IndexError: r = self.R[s] # Updating the value of Q # Decaying update coefficient (1/sqrt(n+2)) can be changed delta = r + self.discount * self.Q[s_new, :].max() - self.Q[s, a] dQ = (1 / _math.sqrt(n + 2)) * delta self.Q[s, a] = self.Q[s, a] + dQ # current state is updated s = s_new # Computing and saving maximal values of the Q variation discrepancy.append(_np.absolute(dQ)) # Computing means all over maximal Q variations values if len(discrepancy) == 100: self.mean_discrepancy.append(_np.mean(discrepancy)) discrepancy = [] # compute the value function and the policy self.V = self.Q.max(axis=1) self.policy = self.Q.argmax(axis=1) self.time = _time.time() - self.time # convert V and policy to tuples self.V = tuple(self.V.tolist()) self.policy = tuple(self.policy.tolist())
 # -*- coding: utf-8 -*- """ """ import time as _time import numpy as _np import mdptoolbox.util as _util from mdptoolbox.mdp_base import MDP class RelativeValueIteration(MDP): """A MDP solved using the relative value iteration algorithm. Arguments --------- transitions : array Transition probability matrices. See the documentation for the ``MDP`` class for details. reward : array Reward matrices or vectors. See the documentation for the ``MDP`` class for details. epsilon : float, optional Stopping criterion. See the documentation for the ``MDP`` class for details. Default: 0.01. max_iter : int, optional Maximum number of iterations. See the documentation for the ``MDP`` class for details. Default: 1000. Data Attributes --------------- policy : tuple epsilon-optimal policy average_reward : tuple average reward of the optimal policy cpu_time : float used CPU time Notes ----- In verbose mode, at each iteration, displays the span of U variation and the condition which stopped iterations : epsilon-optimum policy found or maximum number of iterations reached. Examples -------- >>> import mdptoolbox, mdptoolbox.example >>> P, R = mdptoolbox.example.forest() >>> rvi = mdptoolbox.mdp.RelativeValueIteration(P, R) >>> rvi.run() >>> rvi.average_reward 3.2399999999999993 >>> rvi.policy (0, 0, 0) >>> rvi.iter 4 >>> import mdptoolbox >>> import numpy as np >>> P = np.array([[[0.5, 0.5],[0.8, 0.2]],[[0, 1],[0.1, 0.9]]]) >>> R = np.array([[5, 10], [-1, 2]]) >>> rvi = mdptoolbox.mdp.RelativeValueIteration(P, R) >>> rvi.run() >>> expected = (10.0, 3.885235246411831) >>> all(expected[k] - rvi.V[k] < 1e-12 for k in range(len(expected))) True >>> rvi.average_reward 3.8852352464118312 >>> rvi.policy (1, 0) >>> rvi.iter 29 """ def __init__(self, transitions, reward, epsilon=0.01, max_iter=1000): # Initialise a relative value iteration MDP. MDP.__init__(self, transitions, reward, None, epsilon, max_iter) self.epsilon = epsilon self.discount = 1 self.V = _np.zeros(self.S) self.gain = 0 # self.U[self.S] self.average_reward = None def run(self): # Run the relative value iteration algorithm. done = False if self.verbose: print(' Iteration\t\tU variation') self.time = _time.time() while not done: self.iter += 1 self.policy, Vnext = self._bellmanOperator() Vnext = Vnext - self.gain variation = _util.getSpan(Vnext - self.V) if self.verbose: print((" %s\t\t %s" % (self.iter, variation))) if variation < self.epsilon: done = True self.average_reward = self.gain + (Vnext - self.V).min() if self.verbose: print(_util._MSG_STOP_EPSILON_OPTIMAL_POLICY) elif self.iter == self.max_iter: done = True self.average_reward = self.gain + (Vnext - self.V).min() if self.verbose: print(_util._MSG_STOP_MAX_ITER) self.V = Vnext