"""
Dynamic Program class - implements only backward dynamic programming.
Ideal usage is to have an objective function defined by the user, however they'd like.
The user defines as many StateVariable instances as they have state variables in their DP and they define
a DecisionVariable. The objective function should be prepared to take arguments with keyword values for each of
these, where the keyword is determined by the name attribute on each instance. It then returns a value.
For situations with multiple state variables, we have reducers, which, prior to minimization or maximization
will reduce the number of state variables to one so that we can get a single F* for each input scenario.
This is a class Reducer with a defined interface, and can then be extended. For our levee problem, we have a
ProbabilisticReducer which keeps track of the probability of each set of state variables and can collapse by
a master variable. Probabilities must be provided by the user.
# TODO: Make sure to check how we can make the decision variable properly interact with the state variable - thinking
# of making sure that the decision variable adds to the correct state variable items
# TODO: Missing plan for how we assign probabilities here - needs to be incorporated somewhere
# TODO: Missing choice constraints (eg, minimim possible selection at stage ___ is ___)
At that point, usage for the levee problem should be something like:
.. code-block:: python
import support # user's extra code to support the objective function
import dypy as dp
objective_function = support.objective_function
height_var = dp.StateVariable("height")
flow_var = dp.StateVariable("peak_flow")
variance_var = dp.StateVariable("variance")
decision_var = dp.DecisionVariable()
decision_var.related_state = height_var # tell the decision variable which state it impacts
dynamic_program = dp.DynamicProgram(objective_function=objective_function,
state_variables=(height_var, flow_var, variance_var),
decison_variable=decision_var)
dynamic_program.optimize() # runs the backward recursion
dynamic_program.get_optimal_values() # runs the forward method to obtain choices
"""
import logging
import itertools
import six
import numpy
import pandas
from .reducers import Reducer, ProbabilisticReducer, VariableReducer
from .variables import StateVariable, DecisionVariable
from . import support
log = logging.getLogger("dypy")
__author__ = "nickrsan"
__version__ = "0.0.3b"
MAXIMIZE = numpy.max
MINIMIZE = numpy.min
[docs]def default_objective(*args, **kwargs):
"""
A bare objective function that makes all cells 0 - provided as a default so we can build stages where someone
might do a manual override, but the DP should check to make sure the objective isn't the default with all cells
at 0 before solving.
:param args:
:param kwargs:
:return:
"""
return 0
[docs]class DynamicProgram(object):
"""
This object actually runs the DP - doesn't currently support multiple decision variables or probabilities.
Currently designed to only handle backward DPs.
Attributes:
DynamicProgram.stages list of Stage objects
:param objective_function: What function provides values to populate each cell in each stage? Not required if
you build your own stages with their own values
:param selection_constraints: Is there a minimum value that must be achieved in the selection?
If so, this should be a list with the required quantity at each time step
:param decision_variable: a DecisionVariable instance object
:param discount_rate: give the discount rate in timestep_size units. Though timesteps don't need to be in years, think
of the discount rate as applying per smallest possible timestep size, so if your timestep_size is 40, then
your discount rate will be for one timestep in 40 and will compound across the 40 timesteps in the single stage.
Defaults to 0, which results in no discounting, and discounting is only enabled automatically if you use the DiscountedSimplePrior, or another Prior object
that handles discounting.
:param calculation_function: What function are we using to evaluate? Basically, is this a maximization (benefit)
or minimization (costs) setup. Provide the function object for max or min. Provide the actual `min` or `max functions
(don't run it, just the name) or if convenient, use the shortcuts dp.MINIMIZE or dp.MAXIMIZE
:param max_selections: Up to what value of the state variable can we use to make our decision? If not provided,
all values of the state variable will be allowed
:param prior: Which class should be used to handle priors (best values from future stages) - SimplePrior is best in many
cases, but may not apply everywhere and will be slow for large tables. Just provide the class object, not an instance.
"""
def __init__(self, objective_function=default_objective, timestep_size=None, time_horizon=None, discount_rate=0, state_variables=None, max_selections=None, selection_constraints=None, decision_variable=None, calculation_function=None, prior=None):
self.stages = []
self.timestep_size = timestep_size
self.time_horizon = time_horizon
self.discount_rate = discount_rate
self.max_selections = max_selections
self._all_states = None
self._all_states_df = None
self._initial_states =None
self._state_keys = None
self.state_variables = list()
if state_variables:
for variable in state_variables:
self.add_state_variable(variable, setup_state=False)
self._setup_state_variables() # set up the state variables for the DP once all are added
# set up decision variables passed in
if not decision_variable:
self.decision_variable = None
else:
self.decision_variable = decision_variable
# Calculation Function
self.objective_function = objective_function
self.calculation_function = calculation_function
# Default Prior Handler
self.prior_handler_class = prior
if self.calculation_function not in (numpy.max, numpy.min, MAXIMIZE, MINIMIZE):
raise ValueError("Calculation function must be either 'numpy.max', 'numpy.min' or one of the aliases in this package of dp.MAXIMIZE or dp.MINIMIZE")
if self.calculation_function is numpy.min:
self.index_calculation_function = numpy.argmin
else: # we can do else because only options are numpy.min and numpy.max, with ValueError raised above if it's not one of those
self.index_calculation_function = numpy.argmax
# make values that we use as bounds in our calculations - when maximizing, use a negative number, and when minimizing, get close to infinity
# we use this for any routes through the DP that get excluded
if self.calculation_function is numpy.max:
self.exclusion_value = -9223372036854775808
else:
self.exclusion_value = 9223372036854775808 # max value for a signed 64 bit int - this should force it to not be selected in minimization
[docs] def add_state_variable(self, variable, setup_state=True):
"""
Adds a state variable object to this dynamic program - automatically sets up the dynamic program
to run with the new state variable when setup_state is True. Otherwise waits for you to call it manually.
If you provide your state variables at DynamicProgram creation, then this method does not need to be called.
:param variable: A StateVariable object - afterward, will be available in .state_variables
:param setup_state: If True, runs _setup_state_variables after finishing. Default is True,
but when adding a bunch of state variables, it's faster just to manually
run it once at the end. It runs it by default so that for simple DPs, you
don't need to think about it, but advanced, larger DPs, may wnat to set it
to False and run _setup_state_variables once all are added
:return:
"""
if not isinstance(variable, StateVariable):
raise ValueError("Provided variable must be a StateVariable object. Can't add variable of type {} to DP".format(type(variable)))
self.state_variables.append(variable)
if setup_state:
self._setup_state_variables()
def _setup_state_variables(self):
"""
After adding state variables, performs setup operations for the DP
:return:
"""
self._index_state_variables() # make sure to reindex the variables when we add one
self._all_states = list(itertools.product(*[var.values for var in self.state_variables]))
self._all_states_df = pandas.DataFrame(self._all_states, columns=[var.variable_id for var in self.state_variables])
self._setup_initial_state()
self._state_keys = [var.variable_id for var in self.state_variables] # will be in same order as provided to all_states
def _setup_initial_state(self):
var_values = []
for var in self.state_variables:
if var.initial_state is not None: # if it specifies an initial value for this state, use it and only it
var_values.append([var.initial_state]) # make it a list before appending - we'll use that in a moment
else:
var_values.append(var.values)
self._initial_states = list(itertools.product(*var_values))
def _index_state_variables(self):
for index, variable in enumerate(self.state_variables):
if not isinstance(variable, StateVariable): # this is a bit silly to have this check twice, but this method checks it even if the user passes a list of StateVariables
raise ValueError("Provided variable must be a StateVariable object. Can't add variable of type {} to DP".format(type(variable)))
variable.column_index = index # tell the variable what column it is
[docs] def add_stage(self, name):
"""
Adds an empty stage to this DynamicProgram, tied to the currently set DecisionVariable and Prior classes
associated with this DynamicProgram. Gives the stage the name provided as a parameter, usually generated
automatically by the DP.
This method is called automatically by default, but is exposed as a public method primarily so you can
control its usage if you desire, or so you can override its behavior in a subclass (such as if you wish
to manually handle Stage creation in order to control the process for a specific dynamic programming problem).
:param name: The name associated with the stage. Generated by the name prefix and the stage ID when called via build_stages
:return: None
"""
stage = Stage(name=name, decision_variable=self.decision_variable, parent_dp=self, prior_handler=self.prior_handler_class)
self.stages.append(stage)
self._index_stages()
def _index_stages(self):
# assigning .number allows us to have constraints on the number of items selected at each stage
if len(self.stages) > 1: # if there are at least two, then set the next and previous objects on the first and last
self.stages[0].next_stage = self.stages[1]
self.stages[0].number = 0
self.stages[-1].previous_stage = self.stages[-2]
self.stages[-1].number = len(self.stages) - 1
for i, stage in enumerate(self.stages[1:-1]): # for all stages except the first and last, then we set both next and previous
i+=1 # need to add 1 or it won't work because we're slicing off the first item
self.stages[i].next_stage = self.stages[i+1]
self.stages[i].previous_stage = self.stages[i-1]
self.stages[i].number = i
[docs] def build_stages(self, name_prefix="Step"):
"""
Make a stage for every timestep
:param name_prefix: The string that will go before the stage number when printing information
:return:
"""
for stage_id in range(0, self.time_horizon+1, self.timestep_size):
self.add_stage(name="{} {}".format(name_prefix, stage_id))
def _is_default(self):
"""
Checks if we are using the default objective function *and* haven't modified any stage values. We don't want
to solve the DP in this case, but instead raise an error to tell the user they're not using the solver
appropriately
:return:
"""
log.debug("checking that stages are not in default state")
for stage in self.stages:
if numpy.sum(stage.matrix) != 0:
return False # not in default state - have other values
return True # if we make it out here, we're in the default state
[docs] def run(self):
if self._is_default():
raise ValueError("Objective function must be provided before running DP. Either provide it as an argument "
"at creation, or set the .objective_function parameter to a function object (*not* a function result)")
if not self.decision_variable or len(self.state_variables) == 0:
raise ValueError("Decision Variable and State Variables must be attached to DynamicProgram before running. Use .add_state_variable to attach additional state variables, or set .decision_variable to a DecisionVariable object first")
# build a matrix where everything is 0 - need to figure out what the size of the x axis is
# this matrix should just have a column for each timestep (we'll pull these out later), which will then be used by
# each stage to actually build its own matrix
rows = int(self.time_horizon/self.timestep_size) # this is the wrong way to do this - the matrix should
matrix = numpy.zeros((rows, ))
for stage in self.stages:
stage.build()
# TODO: This needs to be updated for the more modern form - this probable is controlled by the stage instead
#for stage in range(rows):
# for index, row in enumerate(matrix):
# matrix[index][stage] = support.present_value(self.objective_function(), index, year=stage*self.timestep_size, discount_rate=self.discount_rate )
# This next section is old code from a prior simple DP - it will be removed, but was how the set of stages was
# built previously so I can see what the usage was like while building this for multiple objectives
# initiate the optimization and retrieval of the best values
self.stages[-1].optimize()
self.stages[0].get_optimal_values()
[docs]class Stage(object):
def __init__(self, name, decision_variable, parent_dp, previous=None, next_stage=None, prior_handler=None):
"""
:param name:
:param decision_variable: an iterable containing benefit or cost values at each choice step
:param max_selections: How many total items are we selecting?
:param previous: The previous stage, if one exists
:param next_stage: The next stage, if one exists
"""
self.name = name
self.parent_dp = parent_dp
self.decision_variable = decision_variable
self.prior_handler_class = prior_handler
self.prior_handler = None
self.create_prior_handler()
self.decision_amount = None
self.future_value_of_decision = None
self.next_stage = next_stage
self.previous_stage = previous
self.matrix = None # this will be created from the parameters when .optimize is run
self.number = None
self.pass_data = []
self.choices_index = []
[docs] def create_prior_handler(self):
if self.prior_handler_class is not None:
if isinstance(self.prior_handler_class, Prior): # if it's already an instantiated object
self.prior_handler = self.prior_handler_class # set the item
self.prior_handler.stage = self # tell it that it applies to this item
self.prior_handler_class = self.prior_handler.__class__ # and set the class
else:
self.prior_handler = self.prior_handler_class(stage=self) # otherwise, create it
[docs] def build(self):
"""
Builds the stage table - evaluates the objective function for each location, passing the various
values to it so it can provide a result. Does *not* handle prior data though - that is handled when
we actually run the DP. We separate this out so that people who want to intervene between generating
the stage table and handling the prior can do so.
:return:
"""
log.info("Building stage {}".format(self.number))
if self.number == 0:
states = self.parent_dp._initial_states # use only the initial states if provided for the first stage
else:
states = self.parent_dp._all_states # otherwise use all states for the future
state_kwargs = self.parent_dp._state_keys
decisions = self.decision_variable.values
self.matrix = numpy.zeros((len(states), len(decisions)))
for row_id, state_values in enumerate(states):
for column_id, decision_value in enumerate(decisions):
state_data = dict(zip(state_kwargs, state_values)) # build the kwarg pairs into a dictionary we can pass into the objective function
decision_data = {self.decision_variable.variable_id: decision_value}
objective_value = self.parent_dp.objective_function(stage=self, **support.merge_dicts(state_data,
decision_data))
self.matrix[row_id][column_id] = objective_value # have this on a separate line for debugging
def __str__(self):
return self.name
def __repr__(self):
return self.name
[docs] def optimize(self, prior=None):
"""
This handles the actual backward DP calculation - assumes that the stage has a matrix that is built
already with appropriate values. Calls the Prior object to apply future values back to the current stage,
when necessary.
* Need to think about where reducers fit in
* Need to figure out how we want to handle both selection constraints, but also decision/state interactions
Right now, we can say which state a decision feeds back on - but what's going through my head about that
is:
1. Do we need that if we expect an objective function? I think we do because that determines
how we take values from the future and propogate them backward. This is where maybe we
might not actually be able to use numpy broadcasting? Or maybe we have to shift the array
we're broadcasting to align based on the best decision - not sure - need to think of how
we're going to handle that a bit more
2. What about where there's some limit imposed by decision var / state var interaction. For
example, with the course studying test problem - in the last stage, the state variable
can't be less than the decision variable. Maybe that's always the case though, and is
only a last stage problem?
:param prior:
:return:
"""
#if self.parent_dp.selection_constraints and self.number is None:
# raise ValueError("Stage number(.number) must be identified in sequence in order to use selection constraints")
# # set up maximum selection constraints based on what will have been required previously
# if self.parent_dp.selection_constraints: # then we have selections constraints
# for row_index, row_value in enumerate(self.matrix):
# if row_index >= len(self.matrix) - self.parent_dp.selection_constraints[self.number]: # if the row is less than the constrained amount for this stage
# for column_index, column_value in enumerate(self.matrix[row_index]):
# self.matrix[row_index][column_index] = self.parent_dp.exclusion_value
#
# # now calculate the remaining values
if self.prior_handler is None:
raise ValueError("Prior handler not initialized - make sure you specify which class handles priors either"
"at DynamicProgram initialization, or when building your stages. Set .prior_handler_class on either"
"one to the class object that will handle priors.")
self.prior_handler.data = prior
self.prior_handler.matrix = self.matrix
# Add in previous stage
if self.prior_handler.exists(): # if we have prior data
self.matrix = self.prior_handler.apply() # then apply it to the current matrix
self.pass_data = self.parent_dp.calculation_function(self.matrix, axis=1) # sum the rows and find the best
self.choices_index = self.parent_dp.index_calculation_function(self.matrix, axis=1) # get the column of the min/max value
if self.previous_stage:
self.previous_stage.optimize(self.pass_data) # now run the prior stage
def _filter_available_states(self):
"""
An important function that creates the matrix of available choices when
we do the forward calculation - by default, goes through each state variable
and subsets the matrix of values to the ones that are feasible based
on the state variable's current state. If the variable has no current state,
all options are evaluated. If it has a current states, then it is subset
based on the variable's availability function, which defines the relationship
of the current state to available choices. See documentation about variables
for more.
:return: None - sets self.search_matrix to available values and self.search_states to the corresponding state values
"""
if self.number == 0: # in the first state, we've already filtered it
self.search_matrix = self.matrix
# TODO: Doesn't set the search states
return
self.search_matrix = self.matrix # start with the full matrix
self.search_states = self.parent_dp._all_states_df
for variable in self.parent_dp.state_variables:
if variable.current_state is None: # skip any variable without a current state - we'll use all options then
continue
# this next line is a doozy - it gets the row ids where the *current* state variable's
# current value has the relationship defined in the variable's availability function
# - for example, if the current state of the current var is 2 and the availability
# function is numpy.equal, then it gets the row indices where this state var equals 2
index = self.search_states.index[variable.availability_function(self.search_states[variable.variable_id], variable.current_state)]
self.search_states = self.search_states.loc[index] # subset the search states - this will then be used for the next state var
self.search_matrix = self.search_matrix[index] # subset the search matrix to match - this will be used by the later function to decide what's best
[docs] def get_optimal_values(self, prior=0):
"""
After running the backward DP, moves forward and finds the best choices at each stage.
:param prior: The value of the choice at each stage
:return:
"""
# TODO: we'll want to implement the max_selections again using the available state filtering - might just be numpy.less
#if self.parent_dp.max_selections: # this format probably only works for 1 state variable and will need to be re-engineered
# max_selections = self.parent_dp.max_selections
#else:
# max_selections = len(self.parent_dp._all_states)
#amount_remaining = max_selections - prior
self._filter_available_states() # gives us self.search_matrix with available options
if self.search_matrix.any():
# start with all options
self.best_option = self.parent_dp.calculation_function(self.search_matrix)
coord_sets = numpy.argwhere(self.search_matrix == self.best_option)
self.row_of_best = coord_sets[0][0]
self.column_of_best = coord_sets[0][1]
else:
self.best_option = 0
self.row_of_best = 0
self.column_of_best = 0
#if self.selection_constraints:
# number_of_items = max([0, column_of_best - self.selection_constraints[self.number]]) # take the max with 0 - if it's negative, it should be 0
#else:
self.decision_amount = self.parent_dp.decision_variable.values[self.column_of_best]
self.future_value_of_decision = self.best_option
log.info("{} - Decision Amount at Stage: {}, Total Cost/Benefit: {}".format(self.name, self.decision_amount, self.best_option))
# TODO: Should make the value of each state at the time of decision in each stage accessible on stage objects
if self.decision_variable.related_state is not None: # update the state of the state variable to match the new decision
if self.decision_variable.related_state.current_state is None:
self.decision_variable.related_state.current_state = self.decision_amount
else:
self.decision_variable.related_state.current_state += self.decision_amount
if self.next_stage:
self.next_stage.get_optimal_values(prior=self.decision_amount + prior)
[docs]class Prior(object):
"""
The Prior classes provide different ways of applying future stage values to earlier stages. SimplePrior
includes an implementation that will work for most single variable problems, but which may not work for
multi-variable problems. This class can be subclassed and have the apply method overridden to provide
a different implementation. The apply method should return the new matrix
:param stage: The Stage object to attach this prior to - allows for different stages to have different prior
application methods, if needed
:param data: The values from the future stage to apply back to the previous stage
:param matrix: The matrix (2D numpy array) for consideration in the current stage
:param transformation: A function - can be used by subclasses to transform values prior to applying them
"""
def __init__(self, stage, data=None, matrix=None):
self.data = data
self.matrix = matrix
self.parent_stage = stage
self._transformation = lambda x: x # default transformation just returns the value
self._transformation_kwargs = dict()
self.setUp()
[docs] def exists(self):
if self.data is not None:
return True
return False
[docs] def apply(self):
raise NotImplementedError("Must use a subclass of Prior, not Prior class itself")
[docs]class SimplePrior(Prior):
"""
A simple implementation of the prior class - given a single set of optimal values from the future stage (as a 1D
numpy array), it applies these values to the previous stage by flipping the axis of the decision
and shifting the values down by 1 as we move across the decisions.
This prior application method *won't* work for multiple state variables or for state variables with different
discretization than decision variables. For more advanced situations with multiple states, we'll need to use
the linked_state attribute on the decision variables to understand how to apply priors. This functionality
may need to be re-engineered or expanded.
"""
[docs] def apply(self):
for row_index, row_value in reversed(list(enumerate(self.matrix))): # go from bottom to top because we modify the items a row below as we go, but need their valus to be intact, so starting at the bottom allows us to use clean values for calculations
for column_index, column_value in reversed(list(enumerate(row_value))):
if row_index - column_index <= 0: # in these cases, we can't actually pull the prior value
continue
#if column_value == 0 and row_index - column_index >= 0: # we only need to calculate fields that meet this condition - the way it's calculated, others will cause an IndexError anyway
stage_value = self.matrix[row_index][column_index] # the value for this stage is on 1:1 line, so, it is where the indices are equal
if stage_value == self.parent_stage.parent_dp.exclusion_value: # skip anything excluded elsewhere
continue
try:
prior_value = self.data[row_index-column_index]
prior_value = self._transformation(prior_value, **self._transformation_kwargs)
self.matrix[row_index+1][column_index] = stage_value + prior_value
except IndexError:
continue
return self.matrix
[docs]class DiscountedSimplePrior(SimplePrior):
"""
Same as SimplePrior, but discounts values from future stages before applying them to the current
stage. This discounting is cumulative (as in stage n+2 will be discounted and applied to stage n+1
and then that summed value will then be discounted and applied to stage n.
Discount rate is taken from the DynamicProgram object and will be transformed (simply) to cover the timestep
size on the DynamicProgram - see the DynamicProgram parameter documentation for more.
"""
[docs] def setUp(self):
self._transformation = support.present_value
self._transformation_kwargs = {'year': self.parent_stage.parent_dp.timestep_size,
'discount_rate': self.parent_stage.parent_dp.discount_rate}