节点包含多种类型(int,float array,text)的原始特征,需要映射、聚合成固定的长度,才能接入后续模型。详细代码发生在layers.LinearProjector
class LinearProjector(nn.Module):
"""
Projects each input feature of the graph linearly and sums them up
"""
def __init__(self, full_graph, ntype, textset, hidden_dims):
super().__init__()
self.ntype = ntype
# 遍历graph中node的每种特征,根据特征类型,定义相应的特征转化器
# 比如,如果特征类型是float矩阵,就定义一个nn.Linear线性变化为指定维度
# 比如,如果特征类型是int,就定义Embedding矩阵,将id型特征转化为向量
self.inputs = _init_input_modules(full_graph, ntype, textset, hidden_dims)
def forward(self, ndata):
projections = []
for feature, data in ndata.items():
# NID是计算子图中节点、边在原图中的编号,没必要用做特征
if feature == dgl.NID:
continue
module = self.inputs[feature]# 根据特征名取出相应的特征转化器
result = module(data) # 将原始特征值转化为hidden_dims长的向量
projections.append(result)
# 将每个特征都映射后的hidden_dims长的向量,element-wise相加
# 返回一个[#nodes, hidden_dims]的Tensor
return torch.stack(projections, 1).sum(1)
详细代码见layers.WeightedSAGEConv。与DGL提供的普通SAGEConv相比,在聚合时考虑了边上的权重。
class WeightedSAGEConv(nn.Module):
def __init__(self, input_dims, hidden_dims, output_dims, act=F.relu):
super().__init__()
self.act = act
self.Q = nn.Linear(input_dims, hidden_dims)
self.W = nn.Linear(input_dims + hidden_dims, output_dims)
self.reset_parameters()
self.dropout = nn.Dropout(0.5)
def reset_parameters(self):
......
def forward(self, g, h, weights):
"""
g : graph
h : node features
weights : scalar edge weights
"""
h_src, h_dst = h
with g.local_scope():
# 将src节点上的原始特征映射成hidden_dims长,存储于'n'字段
g.srcdata['n'] = self.act(self.Q(self.dropout(h_src)))
g.edata['w'] = weights.float()
# src节点上的特征'n'乘以边上的权重,构成消息'm'
# dst节点将所有接收到的消息'm',相加起来,存入dst节点的'n'字段
g.update_all(fn.u_mul_e('n', 'w', 'm'), fn.sum('m', 'n'))
# 将边上的权重w拷贝成消息'm'
# dst节点将所有接收到的消息'm',相加起来,存入dst节点的'ws'字段
g.update_all(fn.copy_e('w', 'm'), fn.sum('m', 'ws'))
n = g.dstdata['n']# 邻居节点的embedding的加权和
ws = g.dstdata['ws'].unsqueeze(1).clamp(min=1)# 边上权重之和
# 先将邻居节点的embedding,做加权平均
# 再拼接上一轮卷积后,dst节点自身的embedding
# 再经过线性变化与非线性激活,得到这一轮卷积后各dst节点的embedding
z = self.act(self.W(self.dropout(torch.cat([n / ws, h_dst], 1))))
# 本轮卷积后,各dst节点的embedding除以模长,进行归一化
z_norm = z.norm(2, 1, keepdim=True)
z_norm = torch.where(z_norm == 0, torch.tensor(1.).to(z_norm), z_norm)
z = z / z_norm
return z
逐层卷积,得到各节点最终的embedding。详细代码见SAGENet。
class SAGENet(nn.Module):
def __init__(self, hidden_dims, n_layers):
super().__init__()
self.convs = nn.ModuleList()
for _ in range(n_layers):
self.convs.append(WeightedSAGEConv(hidden_dims, hidden_dims, hidden_dims))
def forward(self, blocks, h):
for layer, block in zip(self.convs, blocks):
# 从h中分离出h_dst,应该是一种比较老的写法
# 直接写成h_dst = h[:block.number_of_dst_nodes()]即可
# 看了一下源码,这里用名称前缀来判断是否是dst的写法,应该就是block.number_of_dst_nodes()的内部实现
h_dst = h[:block.number_of_nodes('DST/' + block.ntypes[0])]
h = layer(block, (h, h_dst), block.edata['weights'])
return h
SAGENet已经得到了由batch所构建的图上所有节点(heads + tails + neg_tails)的embedding,这个模块给pos_graph和neg_graph中的每条边打分。打分逻辑就是,某边两端节点的点积,再加上两端节点的bias。
class ItemToItemScorer(nn.Module):
def __init__(self, full_graph, ntype):
super().__init__()
n_nodes = full_graph.number_of_nodes(ntype)
self.bias = nn.Parameter(torch.zeros(n_nodes))# 原图中所有item都定义了bias
def _add_bias(self, edges):
bias_src = self.bias[edges.src[dgl.NID]]
bias_dst = self.bias[edges.dst[dgl.NID]]
# 边上两顶点的embedding的点积,再加上两端节点的bias
return {'s': edges.data['s'] + bias_src + bias_dst}
def forward(self, item_item_graph, h):
"""
输入节点组成的图(item_item_graph)和节点上的最终embedding(h),计算item_item_graph中每条边上的得分
调用该函数时,item_item_graph会被传入pos_graph,或neg_graph,
这两幅图,都是由batch中的heads + tails + neg_tails组成的,只不过中间连接的边不同
无论传入的是pos_graph还是neg_graph,h都是相同的,
都是batch中heads + tails + neg_tails这些节点上的最终embedding
"""
with item_item_graph.local_scope():
item_item_graph.ndata['h'] = h
# 边两端节点的embedding做点积
item_item_graph.apply_edges(fn.u_dot_v('h', 'h', 's'))
# 再加上首尾两节点的bias
item_item_graph.apply_edges(self._add_bias)
pair_score = item_item_graph.edata['s']
return pair_score
详细代码见model.PinSAGEModel
class PinSAGEModel(nn.Module):
def __init__(self, full_graph, ntype, textsets, hidden_dims, n_layers):
super().__init__()
# 负责将节点上的各种特征都映射成向量,并聚合在一起,形成这个节点的原始特征向量
self.proj = layers.LinearProjector(full_graph, ntype, textsets, hidden_dims)
# 负责多层图卷积,得到各节点最终的embedding
self.sage = layers.SAGENet(hidden_dims, n_layers)
# 负责根据首尾两端的节点的embedding,计算边上的得分
self.scorer = layers.ItemToItemScorer(full_graph, ntype)
def forward(self, pos_graph, neg_graph, blocks):
""" pos_graph, neg_graph, blocks的最后一层都对应batch中heads+tails+neg_tails这些节点
"""
# 得到batch中heads+tails+neg_tails这些节点的最终embedding
h_item = self.get_repr(blocks)
# 得到heads->tails这些边上的得分
pos_score = self.scorer(pos_graph, h_item)
# 得到heads->neg_tails这些边上的得分
neg_score = self.scorer(neg_graph, h_item)
# pos_graph与neg_graph边数相等,因此neg_score与pos_score相减
# 返回margin hinge loss,这里的margin是1
return (neg_score - pos_score + 1).clamp(min=0)
def get_repr(self, blocks):
h_item = self.proj(blocks[0].srcdata)# 将输入节点上的原始特征映射成hidden_dims长的向量
h_item_dst = self.proj(blocks[-1].dstdata)# 将输出节点上的原始特征映射成hidden_dims长的向量
# 通过self.sage,经过多层卷积,得到输出节点上的卷积结果
# 再加上这些输出节点上原始特征的映射结果
# 得到输出节点上最终的向量表示
return h_item_dst + self.sage(blocks, h_item)
理解了“训练数据供应”和“模块各模块”,训练过程只不过是以上模块的调用,非常清晰。详细代码见model.train,简化版代码如下。
def train(dataset, args):
g = dataset['train-graph']
......
# Assign user and movie IDs and use them as features (to learn an individual trainable
# embedding for each entity)
g.nodes[user_ntype].data['id'] = torch.arange(g.number_of_nodes(user_ntype))
g.nodes[item_ntype].data['id'] = torch.arange(g.number_of_nodes(item_ntype))
# *************** 准备数据流
# 负责抽取一个batch,一个batch包含了heads,tails,neg_tails
batch_sampler = sampler_module.ItemToItemBatchSampler(g, user_ntype, item_ntype, args.batch_size)
# 由一个batch中的heads,tails,neg_tails构建训练这个batch所需要的
# pos_graph,neg_graph和blocks
neighbor_sampler = sampler_module.NeighborSampler(
g, user_ntype, item_ntype, args.random_walk_length,
args.random_walk_restart_prob, args.num_random_walks, args.num_neighbors,
args.num_layers)
# 只是neighbor_sampler的一层封装,基本上还是根据batch构建pos_graph,neg_graph和blocks的功能
# 只不是在抽取出来的blocks上,再把原图上的节点特征拷贝进来
collator = sampler_module.PinSAGECollator(neighbor_sampler, g, item_ntype, textset)
# 每次next()返回:pos_graph,neg_graph和blocks,做训练之用
dataloader = DataLoader(
batch_sampler,
collate_fn=collator.collate_train,
num_workers=args.num_workers)
# 每次next()返回blocks,做训练中测试之用(不能用于serving,因为低效,也因为获取block的过程中也有随机的成分)
dataloader_test = DataLoader(
torch.arange(g.number_of_nodes(item_ntype)),
batch_size=args.batch_size,
collate_fn=collator.collate_test,
num_workers=args.num_workers)
dataloader_it = iter(dataloader)
# *************** 准备模型
model = PinSAGEModel(g, item_ntype, textset, args.hidden_dims, args.num_layers).to(device)
opt = torch.optim.Adam(model.parameters(), lr=args.lr)
# *************** 训练
for epoch_id in range(args.num_epochs):
model.train()
for batch_id in tqdm.trange(args.batches_per_epoch):
pos_graph, neg_graph, blocks = next(dataloader_it)
loss = model(pos_graph, neg_graph, blocks).mean()
opt.zero_grad()
loss.backward()
opt.step()
至此,DGL PinSAGE example的主要实现代码注释、解析完毕,请感兴趣的同学对照源代码学习。Hopefully, it can help 😃
由于微信平台算法改版,公号内容将不再以时间排序展示,如果大家想第一时间看到我们的推送,强烈建议星标我们和给我们多点点【在看】。星标具体步骤为:
(1)点击页面最上方"AINLP",进入公众号主页。
(2)点击右上角的小点点,在弹出页面点击“设为星标”,就可以啦。
感谢支持,比心。
推荐阅读
征稿启示| 200元稿费+5000DBC(价值20个小时GPU算力)
完结撒花!李宏毅老师深度学习与人类语言处理课程视频及课件(附下载)
模型压缩实践系列之——bert-of-theseus,一个非常亲民的bert压缩方法
文本自动摘要任务的“不完全”心得总结番外篇——submodular函数优化
斯坦福大学NLP组Python深度学习自然语言处理工具Stanza试用
关于AINLP
AINLP 是一个有趣有AI的自然语言处理社区,专注于 AI、NLP、机器学习、深度学习、推荐算法等相关技术的分享,主题包括文本摘要、智能问答、聊天机器人、机器翻译、自动生成、知识图谱、预训练模型、推荐系统、计算广告、招聘信息、求职经验分享等,欢迎关注!加技术交流群请添加AINLPer(id:ainlper),备注工作/研究方向+加群目的。
阅读至此了,分享、点赞、在看三选一吧🙏