### qlearning

parent 9f8fe2a3
 ... ... @@ -32,7 +32,7 @@ OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ from numpy import abs, array, diag, matrix, ndarray, ones, zeros from numpy import abs, array, diag, matrix, mean, ndarray, ones, zeros from numpy.random import rand from math import ceil, log, sqrt from random import randint, random ... ... @@ -182,9 +182,9 @@ def exampleRand(S, A, is_sparse=False, mask=None): if is_sparse: # definition of transition matrix : square stochastic matrix P = zeros((A,), dtype=object) P = zeros((A, ), dtype=object) # definition of reward matrix (values between -1 and +1) R = zeros((A,), dtype=object) R = zeros((A, ), dtype=object) for a in range(A): PP = mask * rand(S, S) for s in range(S): ... ... @@ -233,11 +233,11 @@ class MDP(): def check(self, P, R): """Checks if the matrices P and R define a Markov Decision Process. Let S = number of states, A = number of actions The transition matrix P must be on the shape (A, S, S) and P[a,:,:] must be stochastic. The reward matrix R must be on the shape (A, S, S) or (S, A). Raises an error if P and R do not define a MDP. Let S = number of states, A = number of actions. The transition matrix P must be on the shape (A, S, S) and P[a,:,:] must be stochastic. The reward matrix R must be on the shape (A, S, S) or (S, A). Raises an error if P and R do not define a MDP. Parameters --------- ... ... @@ -447,39 +447,72 @@ class PolicyIterationModified(MDP): pass class QLearning(MDP): """Evaluation of the matrix Q, using the Q learning algorithm. """Evaluates the matrix Q, using the Q learning algorithm. Let S = number of states, A = number of actions Parameters ---------- P : transition matrix (SxSxA) P could be an array with 3 dimensions or a cell array (1xA), each cell containing a sparse matrix (SxS) R : reward matrix(SxSxA) or (SxA) R could be an array with 3 dimensions (SxSxA) or a cell array (1xA), each cell containing a sparse matrix (SxS) or a 2D array(SxA) possibly sparse discount : discount rate in ]0; 1[ n_iter : number of iterations to execute (optional). Default value = 10000; it is an integer greater than the default value. Results ------- Q : learned Q matrix (SxA) value : learned value function (S). policy : learned optimal policy (S). mean_discrepancy : vector of V discrepancy mean over 100 iterations Then the length of this vector for the default value of N is 100 (N/100). Examples --------- >>> import mdp >>> P, R = mdp.exampleForest() >>> ql = mdp.QLearning(P, R, 0.96) >>> ql.iterate() >>> ql.Q array([[ 0. , 0. ], [ 19.33424718, 22.59057966], [ 93.26171093, 81.5913097 ]]) >>> ql.value array([ 0. , 22.59057966, 93.26171093]) >>> ql.policy array([0, 1, 0]) >>> import mdp >>> import numpy as np >>> P = np.array([[[0.5, 0.5],[0.8, 0.2]],[[0, 1],[0.1, 0.9]]]) >>> R = np.array([[5, 10], [-1, 2]]) >>> ql = mdp.QLearning(P, R, 0.9) >>> ql.iterate() >>> ql.Q array([[ 94.58710689, 99.99027483], [ 16.9150663 , 19.99761987]]) >>> ql.value array([ 99.99027483, 19.99761987]) >>> ql.policy array([1, 1]) """ def __init__(self, transitions, reward, discount, n_iter=10000): """Evaluation of the matrix Q, using the Q learning algorithm Arguments ----------------------------------------------------------------------- Let S = number of states, A = number of actions transitions(SxSxA) = transition matrix P could be an array with 3 dimensions or a cell array (1xA), each cell containing a sparse matrix (SxS) reward(SxSxA) or (SxA) = reward matrix R could be an array with 3 dimensions (SxSxA) or a cell array (1xA), each cell containing a sparse matrix (SxS) or a 2D array(SxA) possibly sparse discount = discount rate in ]0; 1[ n_iter(optional) = number of iterations to execute. Default value = 10000; it is an integer greater than the default value. Evaluation ----------------------------------------------------------------------- Q(SxA) = learned Q matrix value(S) = learned value function. policy(S) = learned optimal policy. mean_discrepancy(N/100) = vector of V discrepancy mean over 100 iterations Then the length of this vector for the default value of N is 100. """ MDP.__init__() MDP.__init__(self) # Check of arguments if (discount <= 0) or (discount >= 1): ... ... @@ -499,7 +532,7 @@ class QLearning(MDP): self.Q = zeros((self.S, self.A)) #self.dQ = zeros(self.S, self.A) self.mean_discrepancy = [] self.discrepancy = zeros((self.S, 100)) self.discrepancy = [] self.time = None ... ... @@ -515,34 +548,32 @@ class QLearning(MDP): # Reinitialisation of trajectories every 100 transitions if ((n % 100) == 0): s = randint(1, self.S) s = randint(0, self.S - 1) # Action choice : greedy with increasing probability # probability 1-(1/log(n+2)) can be changed pn = random() if (pn < (1 - (1 / log(n + 2)))): optimal_action, a = self.Q[s, ].max() # optimal_action = self.Q[s, :].max() a = self.Q[s, :].argmax() else: a = randint(0, self.A - 1) # Simulating next state s_new and reward associated to p_s_new = random() p = 0 s_new = 0 while ((p < p_s_new) and (s_new < s)): s_new = s_new + 1 if (type(self.P) is object): p = p + self.P[a][s, s_new] else: p = p + self.P[a, s, s_new] if (type(self.R) is object): p = p + self.P[a][s, s_new] if (self.R.dtype == object): r = self.R[a][s, s_new] elif (self.R.ndim == 3): r = self.R(a, s, s_new) r = self.R[a, s, s_new] else: r = self.R(s, a) r = self.R[s, a] # Updating the value of Q # Decaying update coefficient (1/sqrt(n+2)) can be changed delta = r + self.discount * self.Q[s_new, ].max() - self.Q[s, a] ... ... @@ -553,12 +584,12 @@ class QLearning(MDP): s = s_new # Computing and saving maximal values of the Q variation self.discrepancy[(n % 100) + 1, ] = abs(dQ) self.discrepancy.append(abs(dQ)) # Computing means all over maximal Q variations values if ((n % 100) == 99): self.mean_discrepancy.append(self.discrepancy.mean(1)) self.discrepancy = zeros((self.S, 100)) self.mean_discrepancy.append(mean(self.discrepancy)) self.discrepancy = [] # compute the value function and the policy self.value = self.Q.max(axis=1) ... ... @@ -574,57 +605,124 @@ class RelativeValueIteration(MDP): class ValueIteration(MDP): """ Resolve a discounted Markov Decision Problem with value iteration. Solves discounted MDP with the value iteration algorithm. Description ----------- mdp_value_iteration applies the value iteration algorithm to solve discounted MDP. The algorithm consists in solving Bellman's equation iteratively. Iterating is stopped when an epsilon-optimal policy is found or after a specified number (max_iter) of iterations. This function uses verbose and silent modes. In verbose mode, the function displays the variation of V (value function) for each iteration and the condition which stopped iterations: epsilon-policy found or maximum number of iterations reached. Let S = number of states, A = number of actions. Parameters ---------- P : transition matrix P could be a numpy ndarray with 3 dimensions (AxSxS) or a numpy ndarray of dytpe=object with 1 dimenion (1xA), each element containing a numpy ndarray (SxS) or scipy sparse matrix. R : reward matrix R could be a numpy ndarray with 3 dimensions (AxSxS) or numpy ndarray of dtype=object with 1 dimension (1xA), each element containing a sparse matrix (SxS). R also could be a numpy ndarray with 2 dimensions (SxA) possibly sparse. discount : discount rate Greater than 0, less than or equal to 1. Beware to check conditions of convergence for discount = 1. epsilon : epsilon-optimal policy search Greater than 0, optional (default: 0.01). max_iter : maximum number of iterations to be done Greater than 0, optional (default: computed) initial_value : starting value function optional (default: zeros(S,1)). Data Attributes --------------- value : value function A vector which stores the optimal value function. It exists only after the iterate() method has been called. Shape is (S, ). policy : epsilon-optimal policy A vector which stores the optimal policy. It exists only after the iterate() method has been called. Shape is (S, ). iter : number of done iterations An integer time : used CPU time A float Methods ------- iterate() Starts the loop for the algorithm to be completed. setSilent() Sets the instance to silent mode. setVerbose() Sets the instance to verbose mode. Notes ----- In verbose mode, at each iteration, displays the variation of V and the condition which stopped iterations: epsilon-optimum policy found or maximum number of iterations reached. Examples -------- >>> import mdp >>> P, R = mdp.exampleForest() >>> vi = mdp.ValueIteration(P, R, 0.96) >>> vi.verbose False >>> vi.iterate() >>> vi.value array([ 5.93215488, 9.38815488, 13.38815488]) >>> vi.policy array([0, 0, 0]) >>> vi.iter 4 >>> vi.time 0.002871990203857422 >>> import mdp >>> import numpy as np >>> P = np.array([[[0.5, 0.5],[0.8, 0.2]],[[0, 1],[0.1, 0.9]]]) >>> R = np.array([[5, 10], [-1, 2]]) >>> vi = mdp.ValueIteration(P, R, 0.9) >>> vi.iterate() >>> vi.value array([ 40.04862539, 33.65371176]) >>> vi.policy array([1, 0]) >>> vi.iter 26 >>> vi.time 0.010202884674072266 >>> import mdp >>> import numpy as np >>> from scipy.sparse import csr_matrix as sparse >>> P = np.zeros((2, ), dtype=object) >>> P = sparse([[0.5, 0.5],[0.8, 0.2]]) >>> P = sparse([[0, 1],[0.1, 0.9]]) >>> R = np.array([[5, 10], [-1, 2]]) >>> vi = mdp.ValueIteration(P, R, 0.9) >>> vi.iterate() >>> vi.value array([ 40.04862539, 33.65371176]) >>> vi.policy array([1, 0]) """ def __init__(self, transitions, reward, discount, epsilon=0.01, max_iter=1000, initial_value=0): """Resolution of discounted MDP with value iteration algorithm. Arguments --------- Let S = number of states, A = number of actions. transitions = transition matrix P could be a numpy ndarray with 3 dimensions (AxSxS) or a numpy ndarray of dytpe=object with 1 dimenion (1xA), each element containing a numpy ndarray (SxS) or scipy sparse matrix. reward = reward matrix R could be a numpy ndarray with 3 dimensions (AxSxS) or numpy ndarray of dtype=object with 1 dimension (1xA), each element containing a sparse matrix (SxS). R also could be a numpy ndarray with 2 dimensions (SxA) possibly sparse. discount = discount rate in ]0; 1] Beware to check conditions of convergence for discount = 1. epsilon = epsilon-optimal policy search Greater than 0, optional (default: 0.01). max_iter = maximum number of iterations to be done. greater than 0, optional (default: computed) initial_value = starting value function. optional (default: zeros(S,1)). Evaluation ---------- value(S) = value function. policy(S) = epsilon-optimal policy. iter = number of done iterations. time = used CPU time. Notes ----- In verbose mode, at each iteration, displays the variation of V and the condition which stopped iterations: epsilon-optimum policy found or maximum number of iterations reached. """ """Resolution of discounted MDP with value iteration algorithm.""" MDP.__init__() MDP.__init__(self) self.check(transitions, reward) ... ... @@ -652,7 +750,7 @@ class ValueIteration(MDP): # threshold of variation for V for an epsilon-optimal policy self.thresh = epsilon self.itr = 0 self.iter = 0 self.time = None ... ... @@ -690,7 +788,7 @@ class ValueIteration(MDP): k = 1 - h.sum() V1 = self.bellmanOperator(self.value) # p 201, Proposition 6.6.5 max_iter = log( (epsilon * (1 - self.discount) / self.discount) / self.span(V1 - self.value) ) / log(self.discount * k) max_iter = log( (epsilon * (1 - self.discount) / self.discount) / self.getSpan(V1 - self.value) ) / log(self.discount * k) return ceil(max_iter) def iterate(self): ... ... @@ -699,7 +797,7 @@ class ValueIteration(MDP): self.time = time() done = False while not done: self.itr = self.itr + 1 self.iter = self.iter + 1 Vprev = self.value ... ... @@ -709,16 +807,16 @@ class ValueIteration(MDP): # The values, based on Q. For the function "max()": the option # "axis" means the axis along which to operate. In this case it # finds the maximum of the the rows. (Operates along the columns?) variation = self.span(self.value - Vprev) variation = self.getSpan(self.value - Vprev) if self.verbose: print(" %s %s" % (self.itr, variation)) print(" %s %s" % (self.iter, variation)) if variation < self.thresh: done = True if self.verbose: print("...iterations stopped, epsilon-optimal policy found") elif (self.itr == self.max_iter): elif (self.iter == self.max_iter): done = True if self.verbose: print("...iterations stopped by maximum number of iteration condition") ... ...
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!