感觉就是动态规划在决策方向的具体化实现。
也是从后向前推导,后面的最有状态加上转移奖赏,然后选择最优状态。
人工手动确定的就是各个数值和状态之间的切换奖赏数值(根据实际设定)。
import numpyimport sysdef markov_decision_process(o, g, h, s, T, f, p, test=False):def cal_transition_function():"""Return P where P[before_sold][after_sold] is the probablity of given before_sold products at the begining of the month, there is after_sold products left at the end if the month. See table 1"""if test:return numpy.array([[1, 0, 0, 0],[3 / 4, 1 / 4, 0, 0],[1 / 4, 1 / 2, 1 / 4, 0],[0, 1 / 4, 1 / 2, 1 / 4]])P = numpy.zeros((s + 1, s + 1))# First dimension is products amount before soldfor before_sold in range(s + 1):# Senond dimension is products amount after sold# But we just loop the demand for convenient, and add the probablity to correspond element in Pfor demand in range(s + 1):after_sold = 0if (demand >= before_sold):after_sold = 0else:after_sold = before_sold - demandP[before_sold][after_sold] += p[demand]return PP = cal_transition_function()print(f'Transition probablity P[before_sold][after_sold] =\n{P}\n')def cal_reward_matrix():"""Return R where R[current_stock][action] is the reward got if the decision maker take action `action` when current stock is `current_stock`. See table 2"""if test:return numpy.array([[0, -1, -2, -5],[5, 0, -3, 0],[6, -1, 0, 0],[5, 0, 0, 0]])def F(stock):"""Return corresponding F accoring f and p. See equation (14)"""reward = 0for demand in range(s + 1):if demand <= stock:reward += p[demand] * f(demand)else:# Sold out all stockreward += p[demand] * f(stock)return rewardR = numpy.zeros((s + 1, s + 1))# First dimension is products amount before taking actionfor before_action in range(s + 1):# Second dimension is the action's inbound products amountfor inbound in range(s + 1):# no need to calculate ❌ in table 2if (before_action + inbound > s):continueR[before_action][inbound] = F(before_action + inbound) - o(inbound) - h(before_action + inbound)return RR = cal_reward_matrix()print(f'Reward matrix R[current_stock][action] =\n{R}\n')def cal_cumulative_reward():"""Return a tuple (u, a), See equation (12)where u[time][current_stock] is the cumulative maximum reward when there are `current_stock` stock at time `time`.and where a[time][current_stock] is the best action to take when stock amount is `current_stock` at time `time`."""u = numpy.zeros((T + 1, s + 1))a = numpy.zeros((T + 1, s + 1))# First dimension is time, in range [0, 1, ..., T]# We should initialize u[T] firstfor stock in range(s + 1):u[T][stock] = g(T)# Then we begin to get u[T-1], u[T-2], ..., u[0]for t in range(T - 1, -1, -1):print('Calculating time:', t)# Second dimension is current stockfor current_stock in range(s + 1):print('\t if current stock is:', current_stock)best_action = 0maximum_reward = 0# For all actions, we calculate a cumulative reward, and then take the maxmimum, see equation (16)for inbound in range(0, s + 1 - current_stock):# reward from current_stock to next_stockreward = R[current_stock][inbound]# reward after current_stockfor next_stock in range(s + 1):reward += P[current_stock + inbound][next_stock] * u[t + 1][next_stock]print('\t\t if take action:', inbound, 'the cumulative reward will be:', reward)if reward > maximum_reward:maximum_reward = rewardbest_action = inboundu[t][current_stock] = maximum_rewarda[t][current_stock] = best_actionprint('\t so we choose action:', best_action, 'and will get max reward:', maximum_reward)print('\n')return (u, a)(u, a) = cal_cumulative_reward()print(f'Cumulative maximum reward u[time][current_stock] =\n{u}\n')print(f'Best action a[time][current_stock] = \n{a}\n')def run_example(T=3, test=False):# Model parameters, see equation (13)def o(u):return 2 * u;def g(u):return 0;def h(u):return u;def f(u):return 8 * u;s = 3# p_0, p_1, ..., p_s-1, q_s# p_0, p_1, p_2, p_3 for this examplep = [1 / 4, 1 / 2, 1 / 4, 0]markov_decision_process(o, g, h, s, T, f, p, test=test)T=10run_example(T=T, test=False)
