Commit e0d3c9b8 authored by Steven Cordwell's avatar Steven Cordwell

[rewards] Handle sparse rewards better

Sparse rewards were broken. This commit adds several more functions to
MDP class to make the reward computation code more modular and hopefully
more correct. Rewards given as sparse matrices are converted to a dense
vector. Future work will ensure that rewwards gieven in sparse format
remain sparse. Fixes #7.
parent 66995d31
......@@ -73,6 +73,17 @@ _MSG_STOP_EPSILON_OPTIMAL_VALUE = "Iterating stopped, epsilon-optimal value " \
"function found."
_MSG_STOP_UNCHANGING_POLICY = "Iterating stopped, unchanging policy found."
def _computeDimensions(transition):
A = len(transition)
try:
if transition.ndim == 3:
S = transition.shape[1]
else:
S = transition[0].shape[0]
except AttributeError:
S = transition[0].shape[0]
return S, A
class MDP(object):
"""A Markov Decision Problem.
......@@ -166,21 +177,26 @@ class MDP(object):
if self.discount == 1:
print("WARNING: check conditions of convergence. With no "
"discount, convergence can not be assumed.")
# if the max_iter is None then the algorithm is assumed to not use it
# in its computations
if max_iter is not None:
self.max_iter = int(max_iter)
assert self.max_iter > 0, "The maximum number of iterations " \
"must be greater than 0."
# check that epsilon is something sane
if epsilon is not None:
self.epsilon = float(epsilon)
assert self.epsilon > 0, "Epsilon must be greater than 0."
# we run a check on P and R to make sure they are describing an MDP. If
# an exception isn't raised then they are assumed to be correct.
_util.check(transitions, reward)
# computePR will assign the variables self.S, self.A, self.P and self.R
self._computePR(transitions, reward)
self.S, self.A = _computeDimensions(transitions)
self.P = self._computeTransition(transitions)
self.R = self._computeReward(reward, transitions)
# the verbosity is by default turned off
self.verbose = False
# Initially the time taken to perform the computations is set to None
......@@ -234,60 +250,58 @@ class MDP(object):
# self.V = Q.max(axis=1)
# self.policy = Q.argmax(axis=1)
def _computeP(self, P):
# Set self.P as a tuple of length A, with each element storing an S×S
# matrix.
self.A = len(P)
try:
if P.ndim == 3:
self.S = P.shape[1]
else:
self.S = P[0].shape[0]
except AttributeError:
self.S = P[0].shape[0]
# convert P to a tuple of numpy arrays
self.P = tuple(P[aa] for aa in range(self.A))
def _computeTransition(self, transition):
return tuple(transition[a] for a in range(self.A))
def _computePR(self, P, R):
def _computeReward(self, reward, transition):
# Compute the reward for the system in one state chosing an action.
# Arguments
# ---------
# Let S = number of states, A = number of actions
# P(SxSxA) = transition matrix
# P could be an array with 3 dimensions or a cell array (1xA),
# each cell containing a matrix (SxS) possibly sparse
# R(SxSxA) or (SxA) = reward matrix
# R could be an array with 3 dimensions (SxSxA) or a cell array
# (1xA), each cell containing a sparse matrix (SxS) or a 2D
# array(SxA) possibly sparse
# Evaluation
# ----------
# PR(SxA) = reward matrix
#
# We assume that P and R define a MDP i,e. assumption is that
# _util.check(P, R) has already been run and doesn't fail.
#
# First compute store P, S, and A
self._computeP(P)
# Set self.R as a tuple of length A, with each element storing an 1×S
# vector.
try:
if R.ndim == 1:
r = _np.array(R).reshape(self.S)
self.R = tuple(r for aa in range(self.A))
elif R.ndim == 2:
self.R = tuple(_np.array(R[:, aa]).reshape(self.S)
for aa in range(self.A))
if reward.ndim == 1:
return self._computeVectorReward(reward)
elif reward.ndim == 2:
return self._computeArrayReward(reward)
else:
self.R = tuple(_np.multiply(P[aa], R[aa]).sum(1).reshape(self.S)
for aa in range(self.A))
except AttributeError:
if len(R) == self.A:
self.R = tuple(_np.multiply(P[aa], R[aa]).sum(1).reshape(self.S)
for aa in range(self.A))
r = tuple(map(self._computeMatrixReward, reward, transition))
return r
except (AttributeError, ValueError):
if len(reward) == self.A:
r = tuple(map(self._computeMatrixReward, reward, transition))
return r
else:
return self._computeVectorReward(reward)
def _computeVectorReward(self, reward):
if _sp.issparse(reward):
raise NotImplementedError
else:
r = _np.array(reward).reshape(self.S)
return tuple(r for a in range(self.A))
def _computeArrayReward(self, reward):
if _sp.issparse(reward):
raise NotImplementedError
else:
func = lambda x: _np.array(x).reshape(self.S)
return tuple(func(reward[:, a]) for a in range(self.A))
def _computeMatrixReward(self, reward, transition):
if _sp.issparse(reward):
# An approach like this might be more memory efficeint
#reward.data = reward.data * transition[reward.nonzero()]
#return reward.sum(1).A.reshape(self.S)
# but doesn't work as it is.
return reward.multiply(transition).sum(1).A.reshape(self.S)
elif _sp.issparse(transition):
return transition.multiply(reward).sum(1).A.reshape(self.S)
else:
r = _np.array(R).reshape(self.S)
self.R = tuple(r for aa in range(self.A))
return _np.multiply(transition, reward).sum(1).reshape(self.S)
def run(self):
# Raise error because child classes should implement this function.
......@@ -968,7 +982,8 @@ class QLearning(MDP):
_util.check(transitions, reward)
# Store P, S, and A
self._computeP(transitions)
self.S, self.A = _computeDimensions(transitions)
self.P = self._computeTransition(transitions)
self.R = reward
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment