前段时间给出了 Q-Learning 在排班调度中的应用,现在给出 DQN 的实现。
(搬运下背景)
假设有一个客服排班的任务,我们需要为 100 个人安排一个星期的排班问题,并且有以下约束条件:
一天被划分为 24 个时间段,即每个时间段为 1 个小时;
每个客服一个星期需要上七天班,每次上班八小时;
每个客服两次上班时间需要间隔 12 小时;
客服值班时,一个星期最早是 0,最晚 24*7 - 1。
评判标准:
最优化问题可以使用启发式算法来做,上次用强化学习,这次用深度强化学习。
对 DQN 不太了解的可以去看先前的文章(我们用的是 2013 版的 DQN,没有双网络)。
相对 Q-Learning 来说,不仅改变了 Agent,还在 Env 方面做了一些改进,具体的改变可以看下代码。
import random
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers
from copy import deepcopy
from collections import defaultdict, deque
gpus = tf.config.experimental.list_physical_devices(device_type='GPU')
# GPU 随使用量增长
tf.config.experimental.set_memory_growth(gpus[0], True)
# 设定最大显存
tf.config.experimental.set_virtual_device_configuration(
gpus[0],
[tf.config.experimental.VirtualDeviceConfiguration(memory_limit=1024*12)]
)
环境这里主要是改变了 list_2_str。原本的把所有的 list 拼接成 string 作为 state,现在需要把二维 list 展开成一维 list 作为神经网络的输入。
person_n = 10 # 给 10 个人排班
class Env():
def __init__(self):
# 10 个人, 7 天,每个 bar 都可以向左向右移动,也可以不移动 '-1'
self.actions_space = ['{}{}L'.format(i, j) for i in range(person_n) for j in range(7)] + \
['{}{}R'.format(i, j) for i in range(person_n) for j in range(7)] + ['-1']
self.n_actions = len(self.actions_space)
self.act_list = [random.randint(int(person_n)/2, int(person_n)) for i in range(24*7)]
self.w_list = [i / sum(self.act_list) for i in self.act_list]
self.state = [[i*24 for i in range(7)] for i in range(person_n)]
self.n_state = person_n * 7 * 24
self.punish = -1000
print(self.act_list)
def list_2_str(self, l):
# 拼接完整的 list
state_list = [[0] * 24 * 7] * person_n
for person in range(person_n):
for i in l[person]:
for j in range(8):
state_list[person][i+j] = 1
return [i for state in state_list for i in state]
def reset(self):
self.state = [[i*24 for i in range(7)] for i in range(person_n)]
return self.list_2_str(self.state)
# 给当前排班打分,考虑权重
def reward(self, tmp_state):
# 判断每个人的排班要间隔 8+12 小时,否则 socre = -1000
for i in range(person_n):
# 星期天和星期一的排班间隔 8+12 小时
if (tmp_state[i][0] + (24*7-1) - tmp_state[i][6]) < 20:
return self.punish
for j in range(6):
if (tmp_state[i][j+1] - tmp_state[i][j]) < 20:
return self.punish
# 拼接完整的 list
state_list = [[0] * 24 * 7] * person_n
for person in range(person_n):
for i in tmp_state[person]:
for j in range(8):
state_list[person][i+j] = 1
plan_list = np.sum(state_list, axis=0).tolist()
s_list = [abs(plan_list[i] - self.act_list[i])/self.act_list[i] for i in range(len(plan_list))]
# 奖励越大越好,所以加个负号
score = -np.sum([s_list[i]*self.w_list[i] for i in range(len(s_list))])
return score
def step(self, action):
actions_str = self.actions_space[action]
if actions_str == '-1':
return self.list_2_str(self.state), self.reward(self.state)
else:
num = int(actions_str[0])
day = int(actions_str[1])
move = actions_str[2]
# 这里做了点改进,不好的状态就不作为当前状态了,所以创建了一个临时状态来判断是否是好状态。
tmp_state = deepcopy(self.state)
if move == 'R':
if tmp_state[num][day] == (24*7-8-1):
tmp_state[num][day] = tmp_state[num][day] + 1
return self.list_2_str(tmp_state), self.punish
tmp_state[num][day] = tmp_state[num][day] + 1
if move == 'L':
if tmp_state[num][day] == 0:
tmp_state[num][day] = tmp_state[num][day] - 1
return self.list_2_str(tmp_state), self.punish
tmp_state[num][day] = tmp_state[num][day] - 1
reward = self.reward(tmp_state)
if reward == self.punish:
return self.list_2_str(tmp_state), self.punish
self.state = tmp_state
return self.list_2_str(self.state), self.reward(self.state)
创建 DQNAgent
class DQNAgent:
def __init__(self, state_size, action_size):
self.state_size = state_size
self.action_size = action_size
self.memory = deque(maxlen=2000)
self.discount_factor = 0.9
self.epsilon = 1.0 # exploration rate
self.epsilon_min = 0.01
self.epsilon_decay = 0.999
self.learning_rate = 0.01
self.model = self._build_model()
def _build_model(self):
model = tf.keras.Sequential()
model.add(layers.Dense(512, input_dim=self.state_size, activation='relu'))
model.add(layers.BatchNormalization())
model.add(layers.Dropout(0.5))
model.add(layers.Dense(512, activation='relu'))
model.add(layers.BatchNormalization())
model.add(layers.Dropout(0.5))
model.add(layers.Dense(256, activation='relu'))
model.add(layers.BatchNormalization())
model.add(layers.Dropout(0.5))
model.add(layers.Dense(self.action_size, activation='softmax'))
model.compile(loss='mse', optimizer=tf.keras.optimizers.Adam(lr=self.learning_rate))
return model
def memorize(self, state, action, reward, next_state):
# 收集样本
self.memory.append((state, action, reward, next_state))
def get_action(self, state):
# 得到行为
if np.random.rand() <= self.epsilon:
return random.randrange(self.action_size)
act_values = self.model.predict(state)
act_values = act_values[0]
max_action = np.random.choice(np.where(act_values == np.max(act_values))[0])
return max_action # returns action
def replay(self, batch_size):
# 训练
minibatch = random.sample(self.memory, batch_size)
for state, action, reward, next_state in minibatch:
target = reward + self.discount_factor * np.amax(self.model.predict(next_state)[0])
target_f = self.model.predict(state)
target_f[0][action] = target
self.model.fit(state, target_f, epochs=1, verbose=0)
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
def load(self, name):
self.model.load_weights(name)
def save(self, name):
self.model.save_weights(name)
训练
env = Env()
agent = DQNAgent(env.n_state, env.n_actions)
episodes = 1
batch_size = 32
bst_reward = -500
bst_state = env.state
for e in range(episodes):
state = env.reset()
print('---------- ', e, ' ------------')
# 训练 1w 次就行了,比较慢
for i in range(10000):
state = np.reshape(state, [1, env.n_state])
action = agent.get_action(state)
next_state, reward = env.step(action)
next_state = np.reshape(next_state, [1, env.n_state])
# 不好的状态就不记录了
if reward != -1000:
state = deepcopy(next_state)
agent.memorize(state, action, reward, next_state)
if len(agent.memory) > batch_size:
agent.replay(batch_size)
if bst_reward < reward:
bst_reward = reward
bst_state = deepcopy(env.state)
print('episode: {}/{}, i:{}, reward: {}, e: {:.2}'.format(e, episodes, i, bst_reward, agent.epsilon))
episode: 0/1, i:0, reward: -0.7850318471337578, e: 1.0
episode: 0/1, i:1, reward: -0.7770700636942675, e: 1.0
episode: 0/1, i:2, reward: -0.7722929936305731, e: 1.0
episode: 0/1, i:3, reward: -0.7675159235668789, e: 1.0
episode: 0/1, i:4, reward: -0.7627388535031847, e: 1.0
... ...
episode: 0/1, i:2333, reward: -0.36305732484076436, e: 0.25
episode: 0/1, i:2424, reward: -0.3598726114649682, e: 0.24
episode: 0/1, i:2796, reward: -0.3535031847133758, e: 0.21
episode: 0/1, i:3179, reward: -0.3487261146496815, e: 0.19
episode: 0/1, i:7086, reward: -0.34076433121019106, e: 0.086
最终的拟合度为 0.34。
对 Q-Learning 进行了类似的修改,保证其除了 Agent 外其他策略都一致(除了迭代次数,DQN 为 1 w,Q-Learning 为 10 w),得到的结果为(比之前的 Q-Learning 效果要好很多):
episode: 0/1, i:0, reward: -0.7874015748031497, e: 0.1
episode: 0/1, i:1, reward: -0.7826771653543307, e: 0.1
episode: 0/1, i:3, reward: -0.7795275590551182, e: 0.1
episode: 0/1, i:4, reward: -0.7748031496062993, e: 0.1
episode: 0/1, i:6, reward: -0.7716535433070866, e: 0.1
episode: 0/1, i:7, reward: -0.7685039370078741, e: 0.1
... ...
episode: 0/1, i:7898, reward: -0.352755905511811, e: 0.1
episode: 0/1, i:7906, reward: -0.35118110236220473, e: 0.1
episode: 0/1, i:7916, reward: -0.34488188976377954, e: 0.1
episode: 0/1, i:8077, reward: -0.3401574803149606, e: 0.1
episode: 0/1, i:8130, reward: -0.33700787401574805, e: 0.1
episode: 0/1, i:16242, reward: -0.3291338582677166, e: 0.1
可以看到,Q-Learning 相比于 DQN 来说速度更快(1 分钟迭代 10 W 次),效果更好一点。
当然,这也只是在当前场景下,使用了简单的模型,大家可以进行更多的尝试。
此外,我还实验了用 CNN 来代替 NN,但效果不是太好(-0.44)。
后面可能会去试下 DQN 的诸多改进版。
由于微信平台算法改版,公号内容将不再以时间排序展示,如果大家想第一时间看到我们的推送,强烈建议星标我们和给我们多点点【在看】。星标具体步骤为:
(1)点击页面最上方"AINLP",进入公众号主页。
(2)点击右上角的小点点,在弹出页面点击“设为星标”,就可以啦。
感谢支持,比心。
推荐阅读
征稿启示| 200元稿费+5000DBC(价值20个小时GPU算力)
完结撒花!李宏毅老师深度学习与人类语言处理课程视频及课件(附下载)
模型压缩实践系列之——bert-of-theseus,一个非常亲民的bert压缩方法
文本自动摘要任务的“不完全”心得总结番外篇——submodular函数优化
斯坦福大学NLP组Python深度学习自然语言处理工具Stanza试用
关于AINLP
AINLP 是一个有趣有AI的自然语言处理社区,专注于 AI、NLP、机器学习、深度学习、推荐算法等相关技术的分享,主题包括文本摘要、智能问答、聊天机器人、机器翻译、自动生成、知识图谱、预训练模型、推荐系统、计算广告、招聘信息、求职经验分享等,欢迎关注!加技术交流群请添加AINLPer(id:ainlper),备注工作/研究方向+加群目的。
阅读至此了,分享、点赞、在看三选一吧🙏