Commit 9f8fe2a3 authored by Steven Cordwell's avatar Steven Cordwell
Browse files

unittests

parent 7f61f516
......@@ -32,31 +32,195 @@ OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
from numpy import abs, array, matrix, ndarray, ones, zeros
from random import randint, random
from numpy import abs, array, diag, matrix, ndarray, ones, zeros
from numpy.random import rand
from math import ceil, log, sqrt
from random import randint, random
from scipy.sparse import csr_matrix as sparse
from time import time
mdperr = {
"mat_nonneg" :
"PyMDPtoolbox: Probabilities must be non-negative.",
"mat_square" :
"PyMDPtoolbox: The matrix must be square.",
"mat_stoch" :
"PyMDPtoolbox: Rows of the matrix must sum to one (1).",
"mask_numpy" :
"PyMDPtoolbox: mask must be a numpy array or matrix; i.e. type(mask) is "
"ndarray or type(mask) is matrix.",
"mask_SbyS" :
"PyMDPtoolbox: The mask must have shape SxS; i.e. mask.shape = (S, S).",
"obj_shape" :
"PyMDPtoolbox: Object arrays for transition probabilities and rewards "
"must have only 1 dimension: the number of actions A. Each element of "
"the object array contains an SxS ndarray or matrix.",
"obj_square" :
"PyMDPtoolbox: Each element of an object array for transition "
"probabilities and rewards must contain an SxS ndarray or matrix; i.e. "
"P[a].shape = (S, S) or R[a].shape = (S, S).",
"P_ndarray" :
"PyMDPtoolbox: The transition probabilities must be in a numpy array; "
"i.e. type(P) is ndarray.",
"P_shape" :
"PyMDPtoolbox: The transition probability array must have the shape "
"(A, S, S) with S : number of states greater than 0 and A : number of "
"actions greater than 0. i.e. R.shape = (A, S, S)",
"PR_incompat" :
"PyMDPtoolbox: Incompatibility between P and R dimensions.",
"prob_in01" :
"PyMDPtoolbox: Probability p must be in [0; 1].",
"R_ndarray" :
"PyMDPtoolbox: The rewards must be in a numpy array; i.e. type(R) is "
"ndarray.",
"R_shape" :
"PyMDPtoolbox: The reward matrix R must be an array of shape (A, S, S) or "
"(S, A) with S : number of states greater than 0 and A : number of actions "
"greater than 0. i.e. R.shape = (S, A) or (A, S, S).",
"R_gt_0" :
"PyMDPtoolbox: The rewards must be greater than 0.",
"S_gt_1" :
"PyMDPtoolbox: Number of states S must be greater than 1.",
"SA_gt_1" :
"PyMDPtoolbox: The number of states S and the number of actions A must be "
"greater than 1."
}
def exampleForest(S=3, r1=4, r2=2, p=0.1):
"""
Generates a Markov Decision Process example based on a simple forest
management.
See the related documentation for more detail.
Parameters
---------
S : number of states (> 0), optional (default 3)
r1 : reward when forest is in the oldest state and action Wait is performed,
optional (default 4)
r2 : reward when forest is in the oldest state and action Cut is performed,
optional (default 2)
p : probability of wild fire occurence, in ]0, 1[, optional (default 0.1)
Evaluation
----------
P : transition probability matrix (A, S, S)
R : reward matrix (S, A)
"""
if (S <= 1):
raise ValueError(mdperr["S_gt_1"])
if (r1 <= 0) or (r2 <= 0):
raise ValueError(mdperr["R_gt_0"])
if (p < 0 or p > 1):
raise ValueError(mdperr["prob_in01"])
# Definition of Transition matrix P(:,:,1) associated to action Wait (action 1) and
# P(:,:,2) associated to action Cut (action 2)
# | p 1-p 0.......0 | | 1 0..........0 |
# | . 0 1-p 0....0 | | . . . |
# P(:,:,1) = | . . 0 . | and P(:,:,2) = | . . . |
# | . . . | | . . . |
# | . . 1-p | | . . . |
# | p 0 0....0 1-p | | 1 0..........0 |
P = zeros((2, S, S))
P[0, :, :] = (1 - p) * diag(ones(S - 1), 1)
P[0, :, 0] = p
P[0, S - 1, S - 1] = (1 - p)
P[1, :, :] = zeros((S, S))
P[1, :, 0] = 1
# Definition of Reward matrix R1 associated to action Wait and
# R2 associated to action Cut
# | 0 | | 0 |
# | . | | 1 |
# R(:,1) = | . | and R(:,2) = | . |
# | . | | . |
# | 0 | | 1 |
# | r1 | | r2 |
R = zeros((S, 2))
R[S - 1, 0] = r1
R[:, 1] = ones(S)
R[0, 1] = 0
R[S - 1, 1] = r2
return (P, R)
def exampleRand(S, A, is_sparse=False, mask=None):
"""Generates a random Markov Decision Process.
Parameters
----------
S : number of states (> 0)
A : number of actions (> 0)
is_sparse : false to have matrices in plain format, true to have sparse
matrices optional (default false).
mask : matrix with 0 and 1 (0 indicates a place for a zero
probability), optional (SxS) (default, random )
Returns
----------
P : transition probability matrix (SxSxA)
R : reward matrix (SxSxA)
"""
if (S < 1 or A < 1):
raise ValueError(mdperr["SA_gt_1"])
try:
if (mask != None) and ((mask.shape[0] != S) or (mask.shape[1] != S)):
raise ValueError(mdperr["mask_SbyS"])
except AttributeError:
raise TypeError(mdperr["mask_numpy"])
if mask == None:
mask = rand(S, S)
r = random()
mask[mask < r] = 0
mask[mask >= r] = 1
for s in range(S):
if mask[s, :].sum() == 0:
mask[s, randint(0, S - 1)] = 1
if is_sparse:
# definition of transition matrix : square stochastic matrix
P = zeros((A,), dtype=object)
# definition of reward matrix (values between -1 and +1)
R = zeros((A,), dtype=object)
for a in range(A):
PP = mask * rand(S, S)
for s in range(S):
PP[s, :] = PP[s, :] / PP[s, :].sum()
P[a] = sparse(PP)
R[a] = sparse(mask * (2 * rand(S, S) - ones((S, S))))
else:
# definition of transition matrix : square stochastic matrix
P = zeros((A, S, S))
# definition of reward matrix (values between -1 and +1)
R = zeros((A, S, S))
for a in range(A):
P[a, :, :] = mask * rand(S, S)
for s in range(S):
P[a, s, :] = P[a, s, :] / P[a, s, :].sum()
R[a, :, :] = mask * (2 * rand(S, S) - ones((S, S)))
return (P, R)
class MDP():
"""The Markov Decision Problem Toolbox."""
def __init__(self):
""""""
self.verbose = False
def bellmanOperator(self):
"""Apply the Bellman operator on the value function.
"""
Applies the Bellman operator on the value function.
Returns a new value function and a Vprev-improving policy
Parameters
--------------------------------------------------------------
Let S = number of states, A = number of actions
P(SxSxA) = transition matrix
P could be an array with 3 dimensions ora cell array (1xA),
each cell containing a matrix (SxS) possibly sparse
PR(SxA) = reward matrix
PR could be an array with 2 dimensions or a sparse matrix
discount = discount rate, in ]0, 1]
Vprev(S) = value function
Returns
-------------------------------------------------------------
V(S) = new value function
policy(S) = Vprev-improving policy
Updates the value function and the Vprev-improving policy.
Updates
-------
self.value : new value function
self.policy : Vprev-improving policy
"""
Q = matrix(zeros((self.S, self.A)))
for aa in range(self.A):
......@@ -67,29 +231,24 @@ class MDP():
self.policy = Q.argmax(axis=1)
def check(self, P, R):
"""Check if the matrices P and R define a Markov Decision Process
"""Checks if the matrices P and R define a Markov Decision Process.
Let S = number of states, A = number of actions
The transition matrix P must be on the form P(AxSxS) and P[a,:,:]
must be stochastic
The reward matrix R must be on the form (SxSxA) or (SxA)
Arguments
--------------------------------------------------------------
P(SxSxA) = transition matrix
P could be an array with 3 dimensions or a cell array (1xA),
each cell containing a matrix (SxS) possibly sparse
R(SxSxA) or (SxA) = reward matrix
R could be an array with 3 dimensions (SxSxA) or a cell array
(1xA), each cell containing a sparse matrix (SxS) or a 2D
array(SxA) possibly sparse
Evaluation
-------------------------------------------------------------
is_mdp = True if P and R define a Markov Decision Process, False
otherwise
err_msg = error message or None if correct
The transition matrix P must be on the shape (A, S, S) and P[a,:,:]
must be stochastic.
The reward matrix R must be on the shape (A, S, S) or (S, A).
Raises an error if P and R do not define a MDP.
Parameters
---------
P : transition matrix (A, S, S)
P could be an array with 3 dimensions or a object array (A, ),
each cell containing a matrix (S, S) possibly sparse
R : reward matrix (A, S, S) or (S, A)
R could be an array with 3 dimensions (SxSxA) or a object array
(A, ), each cell containing a sparse matrix (S, S) or a 2D
array(S, A) possibly sparse
"""
is_error_detected = False
err_msg = None
# Check of P
# tranitions must be a numpy array either an AxSxS ndarray (with any
......@@ -98,105 +257,100 @@ class MDP():
# be converted to an object array. A numpy object array is similar to a
# MATLAB cell array.
if (not type(P) is ndarray):
return(False, "The transition probabilities must be a numpy array.")
elif ((type(P) is ndarray) and (not P.dtype is object) and (P.ndim != 3)):
return(False, "The transition probability array must have 3 dimensions: AxSxS.")
elif ((type(P) is ndarray) and (P.dtype is object) and (P.ndim > 1)):
return(False, "You are using an object array for the transition probability: The array must have only 1 dimension: A. Each element of the contains a SxS array.")
raise TypeError(mdperr["P_ndarray"])
if (not type(R) is ndarray):
return(False, "The reward must be a numpy array.")
elif ((type(R) is ndarray) and (not R.dtype is object) and (not R.ndim in (2, 3))):
return(False, "The reward array must have 2 or 3 dimensions: AxSxS or SxA.")
elif ((type(R) is ndarray) and (R.dtype is object) and (R.ndim > 1)):
return(False, "You are using an object array for the reward: The array must have only 1 dimension: A. Each element of the contains a SxS array.")
if (P.dtype is object):
P_is_object = True
raise TypeError(mdperr["R_ndarray"])
if (P.dtype == object):
if (P.ndim > 1):
raise ValueError(mdperr["obj_shape"])
else:
P_is_object = True
else:
P_is_object = False
if (P.ndim != 3):
raise ValueError(mdperr["P_shape"])
else:
P_is_object = False
if (R.dtype is object):
R_is_object = True
if (R.dtype == object):
if (R.ndim > 1):
raise ValueError(mdperr["obj_shape"])
else:
R_is_object = True
else:
R_is_object = False
if (not R.ndim in (2, 3)):
raise ValueError(mdperr["R_shape"])
else:
R_is_object = False
if P_is_object:
aP = P.shape[0]
sP0 = P[0].shape[0]
sP1 = P[0].shape[1]
# check to see that each object array element is the same size
# check to see that the other object array elements are the same shape
for aa in range(1, aP):
sP0aa = P[aa].shape[0]
sP1aa = P[aa].shape[1]
if ((sP0aa != sP0) or (sP1aa != sP1)):
is_error_detected = True
err_msg = "You are using and object array for the transition probability: The dimensions of each array within the object array must be equal to each other."
break
raise ValueError(mdperr["obj_square"])
else:
aP, sP0, sP1 = P.shape
if ((sP0 < 1) or (aP < 1) or (sP0 != sP1)):
is_error_detected = True
err_msg = "The transition probability array must have the shape (A, S, S) with S : number of states greater than 0 and A : number of actions greater than 0."
if (not is_error_detected):
aa = 1
while aa <= aP:
if P_is_object:
err_msg = self.checkSquareStochastic(P[aa])
else:
err_msg = self.checkSquareStochastic(P[aa, :, :])
if (err_msg == None):
aa = aa + 1
else:
is_error_detected = True
aa = aP + 1
if (not is_error_detected):
if R_is_object:
sR0 = R[0].shape[0]
sR1 = R[0].shape[1]
aR = R.shape[0]
elif R.ndim == 3:
aR, sR0, sR1 = R.shape
raise ValueError(mdperr["P_shape"])
for aa in range(aP):
if P_is_object:
self.checkSquareStochastic(P[aa])
else:
sR0, aR = R.shape
sR1 = sR0
if ((sR0 < 1) or (aR < 1) or (sR0 != sR1)):
is_error_detected = True
err_msg = "MDP Toolbox ERROR: The reward matrix R must be an array (S,S,A) or (SxA) with S : number of states greater than 0 and A : number of actions greater than 0"
is_error_detected = True
if (not is_error_detected):
if (sP0 != sR0) or (aP != aR):
err_msg = "MDP Toolbox ERROR: Incompatibility between P and R dimensions"
is_error_detected = True
self.checkSquareStochastic(P[aa, :, :])
aa = aa + 1
if R_is_object:
aR = R.shape[0]
sR0 = R[0].shape[0]
sR1 = R[0].shape[1]
# check to see that the other object array elements are the same shape
for aa in range(1, aR):
sR0aa = R[aa].shape[0]
sR1aa = R[aa].shape[1]
if ((sR0aa != sR0) or (sR1aa != sR1)):
raise ValueError(mdperr["obj_square"])
elif (R.ndim == 3):
aR, sR0, sR1 = R.shape
else:
sR0, aR = R.shape
sR1 = sR0
return(((not is_error_detected), err_msg))
if ((sR0 < 1) or (aR < 1) or (sR0 != sR1)):
raise ValueError(mdperr["R_shape"])
if (sP0 != sR0) or (aP != aR):
raise ValueError(mdperr["PR_incompat"])
def checkSquareStochastic(self, Z):
"""Check if Z is a square stochastic matrix
Arguments
--------------------------------------------------------------
Z = a numpy ndarray SxS
Z = a numpy ndarray SxS, possibly sparse (csr_matrix)
Evaluation
-------------------------------------------------------------
error_msg = error message or None if correct
"""
s1, s2 = Z.shape
if (s1 != s2):
return('MDP Toolbox ERROR: Matrix must be square')
raise ValueError(mdperr["mat_square"])
elif (abs(Z.sum(axis=1) - ones(s2))).max() > 10**(-12):
return('MDP Toolbox ERROR: Row sums of the matrix must be 1')
elif (Z < 0).any():
return('MDP Toolbox ERROR: Probabilities must be non-negative')
raise ValueError(mdperr["mat_stoch"])
elif ((type(Z) is ndarray) or (type(Z) is matrix)) and (Z < 0).any():
raise ValueError(mdperr["mat_nonneg"])
elif (type(Z) is sparse) and (Z.data < 0).any():
raise ValueError(mdperr["mat_nonneg"])
else:
return(None)
def computePpolicyPRpolicy(self):
"""Computes the transition matrix and the reward matrix for a policy.
"""
......@@ -253,37 +407,24 @@ class MDP():
if (type(self.R) is ndarray):
self.R = matrix(self.R)
def silent(self):
"""Ask for running resolution functions of the MDP Toolbox in silent
mode.
"""
# self.verbose = False
pass
def span(self, W):
def getSpan(self, W):
"""Returns the span of W
sp(W) = max W(s) - min W(s)
"""
return (W.max() - W.min())
def verbose(self):
def setSilent(self):
"""Ask for running resolution functions of the MDP Toolbox in silent
mode.
"""
self.verbose = False
def setVerbose(self):
"""Ask for running resolution functions of the MDP Toolbox in verbose
mode.
"""
# self.verbose = True
pass
class ExampleForest(MDP):
"""Generate a Markov Decision Process example based on a simple forest
management.
"""
pass
class ExampleRand(MDP):
"""Generate a random Markov Decision Process.
"""
pass
self.verbose = True
class FiniteHorizon(MDP):
"""Resolution of finite-horizon MDP with backwards induction.
......@@ -338,15 +479,15 @@ class QLearning(MDP):
Then the length of this vector for the default value of N is 100.
"""
MDP.__init__()
# Check of arguments
if (discount <= 0) or (discount >= 1):
raise ValueError("MDP Toolbox Error: Discount rate must be in ]0,1[")
elif (n_iter < 10000):
raise ValueError("MDP Toolbox Error: n_iter must be greater than 10000")
is_mdp, err_msg = self.check(transitions, reward)
if (not is_mdp):
raise TypeError(err_msg)
self.check(transitions, reward)
self.computePR(transitions, reward)
......@@ -436,7 +577,7 @@ class ValueIteration(MDP):
Resolve a discounted Markov Decision Problem with value iteration.
"""
def __init__(self, transitions, reward, discount, epsilon=0.01, max_iter=1000, initial_value=0, verbose=False):
def __init__(self, transitions, reward, discount, epsilon=0.01, max_iter=1000, initial_value=0):
"""Resolution of discounted MDP with value iteration algorithm.
Arguments
......@@ -483,11 +624,9 @@ class ValueIteration(MDP):
or maximum number of iterations reached.
"""
self.verbose = verbose
MDP.__init__()
is_mdp, err_msg = self.check(transitions, reward)
if (not is_mdp):
raise TypeError(err_msg)
self.check(transitions, reward)
self.computePR(transitions, reward)
......
......@@ -5,71 +5,221 @@ Created on Sun May 27 23:16:57 2012
@author: -
"""
from mdp import MDP, QLearning
from numpy import array, eye, matrix, ones
from numpy.random import randint
from mdp import MDP
from numpy import array, eye, matrix, zeros
from numpy.random import rand
from scipy.sparse import eye as speye
from scipy.sparse import csr_matrix as sparse
inst = MDP()
SQUARE_ERROR = 'MDP Toolbox ERROR: Matrix must be square'
STOCHASTIC_ERROR = 'MDP Toolbox ERROR: Row sums of the matrix must be 1'
DIM = 10
STATES = 10
ACTIONS = 3
def test_checkSquareStochastic_square_stochastic_array():
P = array([eye(DIM), eye(DIM)])
R = array([ones(DIM), ones(DIM)]).T
assert inst.check(P, R) == (True, None)
# check: square, stochastic and non-negative
def test_checkSquareStochastic_square_stochastic_matrix():
P = matrix(eye(DIM))
def test_check_square_stochastic_nonnegative_array():
P = zeros((ACTIONS, STATES, STATES))
R = zeros((STATES, ACTIONS))
for a in range(ACTIONS):
P[a, :, :] = eye(STATES)
R[:, a] = rand(STATES)
inst.check(P, R)
# check: square, stochastic and non-negative object arrays
def test_check_square_stochastic_nonnegative_object_array():
P = zeros((ACTIONS, ), dtype=object)
R = zeros((STATES, ACTIONS))
for a in range(ACTIONS):
P[a] = eye(STATES)
R[:, a] = rand(STATES)
inst.check(P, R)
def test_check_square_stochastic_nonnegative_object_matrix():
P = zeros((ACTIONS, ), dtype=object)
R = zeros((STATES, ACTIONS))
for a in range(ACTIONS):
P[a] = matrix(eye(STATES))
R[:, a] = rand(STATES)
inst.check(P, R)
def test_check_square_stochastic_nonnegative_object_sparse():
P = zeros((ACTIONS, ), dtype=object)
R = zeros((STATES, ACTIONS))
for a in range(ACTIONS):
P[a] = speye(STATES, STATES).tocsr()
R[:, a] = rand(STATES)
inst.check(P, R)
# checkSquareStochastic: square, stochastic and non-negative
def test_checkSquareStochastic_square_stochastic_nonnegative_array():
P = rand(STATES, STATES)
for s in range(STATES):
P[s, :] = P[s, :] / P[s, :].sum()
assert inst.checkSquareStochastic(P) == None
def test_checkSquareStochastic_square_stochastic_nonnegative_matrix():
P = rand(STATES, STATES)
for s in range(STATES):
P[s, :] = P[s, :] / P[s, :].sum()
P = matrix(P)
assert inst.checkSquareStochastic(P) == None
#def test_checkSquareStochastic_square_stochastic_sparse():
# """FAILS!"""
# pass
# a = speye(DIM, DIM).tocsr()
# assert inst.checkSquareStochastic(a) == None
def test_checkSquareStochastic_square_stochastic_nonnegative_sparse():
P = rand(STATES, STATES)
for s in range(STATES):
P[s, :] = P[s, :] / P[s, :].sum()
P = sparse(P)
assert inst.checkSquareStochastic(P) == None
# checkSquareStochastic: eye
def test_checkSquareStochastic_notsquare_stochastic_array():
P = eye(DIM, DIM + 1)
assert inst.checkSquareStochastic(P) == SQUARE_ERROR
def test_checkSquareStochastic_eye_array():
P = eye(STATES)
assert inst.checkSquareStochastic(P) == None
def test_checkSquareStochastic_notsquare_stochastic_matrix():
P = matrix(eye(DIM, DIM + 1))
assert inst.checkSquareStochastic(P) == SQUARE_ERROR
def test_checkSquareStochastic_eye_matrix():
P = matrix(eye(STATES))
assert inst.checkSquareStochastic(P) == None
def test_checkSquareStochastic_notsquare_stochastic_sparse():
P = speye(DIM, DIM + 1).tocsr()
assert inst.checkSquareStochastic(P) == SQUARE_ERROR
def test_checkSquareStochastic_eye_sparse():
P = speye(STATES, STATES).tocsr()
assert inst.checkSquareStochastic(P) == None
from mdp import exampleRand
def test_exampleRand_dense_shape():