【导读】在这篇博文中,我们将使用PyTorch和PyTorch Geometric(PyG),构建图形神经网络框架。
作者| Steeve Huang
除了惊人的速度之外,PyG还提供了一系列精心实现的GNN模型,并在各种论文中进行了说明。 因此,用PyG重现实验将非常方便。
鉴于其在速度和便利性方面的优势,毫无疑问,PyG是最受欢迎和广泛使用的GNN库之一。 让我们深入探讨这个主题吧!
系统要求
PyTorch - 1.1.0
PyTorch Geometric - 1.2.0
PyTorch Geometric 基础
本节将介绍PyG的基础知识,包括torch_geometric.data和torch_geometric.nn。 您将学习如何将几何数据传递到GNN,以及如何设计自定义MessagePassing层,GNN的核心。
数据
torch_geometric.data模块包含一个Data类,允许您轻松地从数据中创建图形。 您只需指定:
与每个节点关联的属性/功能
每个节点的连接/邻接(边缘索引)
让我们使用下图来演示如何创建Data对象
图中有4个节点,v1 ... v4,每个节点与2维特征向量相关联,标签y表示其类。 这两个可以表示为FloatTensors:
x = torch.tensor([[2,1], [5,6], [3,7], [12,0]], dtype=torch.float)y = torch.tensor([0, 1, 0, 1], dtype=torch.float)
图连接(边缘索引)应该用COO格式限制,即第一个列表包含源节点的索引,而目标节点的索引在第二个列表中指定。
edge_index = torch.tensor([[0, 1, 2, 0, 3],
[1, 0, 1, 3, 2]], dtype=torch.long)
请注意,边索引的顺序与创建的Data对象无关,因为此类信息仅用于计算邻接矩阵。 因此,上述edge_index表示与以下相同的信息。
edge_index = torch.tensor([[0, 2, 1, 0, 3],
[3, 1, 0, 1, 2]], dtype=torch.long)
将它们放在一起,我们可以创建一个Data对象,如下所示:
import torch
from torch_geometric.data import Data
x = torch.tensor([[2,1], [5,6], [3,7], [12,0]], dtype=torch.float)
y = torch.tensor([0, 1, 0, 1], dtype=torch.float)
edge_index = torch.tensor([[0, 2, 1, 0, 3],
[3, 1, 0, 1, 2]], dtype=torch.long)
data = Data(x=x, y=y, edge_index=edge_index)
>>> Data(edge_index=[2, 5], x=[4, 2], y=[4])
数据集
数据集创建过程并不简单,但是对于那些使用过torchvision的人来说,就很熟悉,因为PyG遵循其惯例。 PyG提供了两种不同类型的数据集类:InMemoryDataset和Dataset。正如字面意思所指出的那样,前者用于适合RAM的数据,而第二个用于更大的数据。由于它们的实现非常相似,我只介绍InMemoryDataset。
要创建InMemoryDataset对象,需要实现4个函数:
raw_file_names()
它返回一个列表,显示原始未处理文件名列表。如果您只有一个文件,则返回的列表应该只包含1个元素。实际上,您只需返回一个空列表,稍后在process()中指定您的文件。
processed_file_names()
与上一个函数类似,它还返回一个包含所有已处理数据的文件名的列表。调用process()后,通常,返回的列表应该只有一个元素,存储唯一处理过的数据文件名。
dowload()
此函数应将您正在处理的数据下载到self.raw_dir中指定的目录。如果您不需要下载数据,只需简单地在函数中写
pass
即可
process()
这是数据集最重要的方法。 您需要将数据收集到Data对象列表中。 然后,调用self.collate()来计算DataLoader对象将使用的切片。 以下显示了PyG官方网站的自定义数据集示例。
import torch
from torch_geometric.data import InMemoryDataset
class MyOwnDataset(InMemoryDataset):
def __init__(self, root, transform=None, pre_transform=None):
super(MyOwnDataset, self).__init__(root, transform, pre_transform)
self.data, self.slices = torch.load(self.processed_paths[0])
@property
def raw_file_names(self):
return ['some_file_1', 'some_file_2', ...]
@property
def processed_file_names(self):
return ['data.pt']
def download(self):
# Download to `self.raw_dir`.
def process(self):
# Read data into huge `Data` list.
data_list = [...]
if self.pre_filter is not None:
data_list [data for data in data_list if self.pre_filter(data)]
if self.pre_transform is not None:
data_list = [self.pre_transform(data) for data in data_list]
data, slices = self.collate(data_list)
torch.save((data, slices), self.processed_paths[0])
DataLoader
DataLoader类允许您毫不费力地按批次将数据提供给模型。 要创建DataLoader对象,只需指定所需的数据集和批量大小即可。
loader = DataLoader(dataset, batch_size=512, shuffle=True)
DataLoader对象的每次迭代都会产生一个Batch对象,它非常类似于Data对象,但具有属性“batch”。 它指示每个节点与哪个图关联。 由于DataLoader将x,y和edge_index从不同的样本/图表聚合到批处理中,因此GNN模型需要此“批处理”信息来了解批处理中哪些节点属于同一图表以执行计算。
for batch in loader:
batch
>>> Batch(x=[1024, 21], edge_index=[2, 1568], y=[512], batch=[1024])
MessagePassing
消息传递是GNN的本质,它描述了如何学习节点嵌入。 我在上一篇文章中已经讨论过,所以我将简单地用符合PyG文档的术语来解决这个问题。
x表示节点嵌入,e表示边缘特征,φ表示消息函数,□表示聚合函数,γ表示更新函数。如果图中的边没有连通性以外的特征,则e基本上是图的边缘索引。上标表示图层的索引。当k = 1时,x表示每个节点的输入特征。下面我将说明每个函数的工作原理:
propagation(edge_index,size = None,** kwargs):
它接收边缘索引和其他可选信息,例如节点特征(嵌入)。因此,调用此函数将调用消息并进行更新。
message(** kwargs):
您可以指定为每个节点对(x_i,x_j)构造“消息”的方式。由于它遵循传播的调用,它可以传递任何参数传播。需要注意的一点是,您可以使用“_i”和“_j”定义从参数到特定节点的映射。因此,在命名此函数的参数时必须非常小心。
update(aggr_out,** kwargs)
它接收聚合消息和传入传播的其他参数,为每个节点分配新的嵌入值。
例子
让我们看看如何从“大图的归纳表示学习”一文中实现SageConv层。 SageConv的消息传递公式定义为:
这里,我们使用max pooling作为聚合方法。 因此,第一行的右侧可以写成:
这说明了如何构建“消息”。 每个相邻节点嵌入乘以权重矩阵,添加偏差并通过激活函数。 这可以通过torch.nn.Linear轻松完成。
class SAGEConv(MessagePassing):
def __init__(self, in_channels, out_channels):
super(SAGEConv, self).__init__(aggr='max')
self.lin = torch.nn.Linear(in_channels, out_channels)
self.act = torch.nn.ReLU()
def message(self, x_j):
# x_j has shape [E, in_channels]
x_j = self.lin(x_j)
x_j = self.act(x_j)
return x_j
对于更新部分,聚合聚合消息和当前节点嵌入。 然后,它乘以另一个权重矩阵并应用另一个激活函数。
class SAGEConv(MessagePassing):
def __init__(self, in_channels, out_channels):
super(SAGEConv, self).__init__(aggr='max')
self.update_lin = torch.nn.Linear(in_channels + out_channels, in_channels, bias=False)
self.update_act = torch.nn.ReLU()
def update(self, aggr_out, x):
# aggr_out has shape [N, out_channels]
new_embedding = torch.cat([aggr_out, x], dim=1)
new_embedding = self.update_lin(new_embedding)
new_embedding = torch.update_act(new_embedding)
return new_embedding
把它放在一起,我们有以下SageConv层。
import torch
from torch.nn import Sequential as Seq, Linear, ReLU
from torch_geometric.nn import MessagePassing
from torch_geometric.utils import remove_self_loops, add_self_loops
class SAGEConv(MessagePassing):
def __init__(self, in_channels, out_channels):
super(SAGEConv, self).__init__(aggr='max') # "Max" aggregation.
self.lin = torch.nn.Linear(in_channels, out_channels)
self.act = torch.nn.ReLU()
self.update_lin = torch.nn.Linear(in_channels + out_channels, in_channels, bias=False)
self.update_act = torch.nn.ReLU()
def forward(self, x, edge_index):
# x has shape [N, in_channels]
# edge_index has shape [2, E]
edge_index, _ = remove_self_loops(edge_index)
edge_index, _ = add_self_loops(edge_index, num_nodes=x.size(0))
return self.propagate(edge_index, size=(x.size(0), x.size(0)), x=x)
def message(self, x_j):
# x_j has shape [E, in_channels]
x_j = self.lin(x_j)
x_j = self.act(x_j)
return x_j
def update(self, aggr_out, x):
# aggr_out has shape [N, out_channels]
new_embedding = torch.cat([aggr_out, x], dim=1)
new_embedding = self.update_lin(new_embedding)
new_embedding = self.update_act(new_embedding)
return new_embedding
真实世界的例子 - 2015年RecSys挑战赛
2015年RecSys挑战赛正在挑战数据科学家构建基于会话的推荐系统。 要求参与此挑战的人员解决两项任务:
预测是否会有买入事件,然后是一系列点击
预测将购买哪个商品
首先,我们从RecSys Challenge 2015的官方网站下载数据并构建数据集。 我们将从第一个任务开始,因为这个任务更容易。
挑战提供了两组主要数据,yoochoose-clicks.dat和yoochoose-buys.dat,分别包含点击事件和购买事件。 让我们快速浏览一下数据:
预处理
下载数据后,我们对其进行预处理,以便将其输入我们的模型。 对item_id进行分类编码,以确保编码的item_ids(稍后将映射到嵌入矩阵)从0开始。
from sklearn.preprocessing import LabelEncoder
df = pd.read_csv('../input/yoochoose-click.dat', header=None)
df.columns=['session_id','timestamp','item_id','category']
buy_df = pd.read_csv('../input/yoochoose-buys.dat', header=None)
buy_df.columns=['session_id','timestamp','item_id','price','quantity']
item_encoder = LabelEncoder()
df['item_id'] = item_encoder.fit_transform(df.item_id)
df.head()
由于数据非常大,我们对其进行二次采样以便于演示。
#randomly sample a couple of them
sampled_session_id = np.random.choice(df.session_id.unique(), 1000000, replace=False)
df = df.loc[df.session_id.isin(sampled_session_id)]
df.nunique()
为了确定基本事实,即对于给定的会话是否有任何买入事件,我们只是检查yoochoose-clicks.dat中的session_id是否也出现在yoochoose-buys.dat中。
df['label'] = df.session_id.isin(buy_df.session_id)df.head()
数据集构建
在预处理步骤之后,数据可以转换为数据集对象。 在这里,我们将会话中的每个项目视为一个节点,因此同一会话中的所有项目都形成一个图形。 为了构建数据集,我们按session_id对预处理数据进行分组,并迭代这些组。 在每次迭代中,每个组中的item_id被再次分类编码,因为对于每个图,节点索引应从0开始计数。因此,我们有以下内容:
import torch
from torch_geometric.data import InMemoryDataset
from tqdm import tqdm
class YooChooseBinaryDataset(InMemoryDataset):
def __init__(self, root, transform=None, pre_transform=None):
super(YooChooseBinaryDataset, self).__init__(root, transform, pre_transform)
self.data, self.slices = torch.load(self.processed_paths[0])
@property
def raw_file_names(self):
return []
@property
def processed_file_names(self):
return ['../input/yoochoose_click_binary_1M_sess.dataset']
def download(self):
pass
def process(self):
data_list = []
# process by session_id
grouped = df.groupby('session_id')
for session_id, group in tqdm(grouped):
sess_item_id = LabelEncoder().fit_transform(group.item_id)
group = group.reset_index(drop=True)
group['sess_item_id'] = sess_item_id
node_features = group.loc[group.session_id==session_id,['sess_item_id','item_id']].sort_values('sess_item_id').item_id.drop_duplicates().values
node_features = torch.LongTensor(node_features).unsqueeze(1)
target_nodes = group.sess_item_id.values[1:]
source_nodes = group.sess_item_id.values[:-1]
edge_index = torch.tensor([source_nodes, target_nodes], dtype=torch.long)
x = node_features
y = torch.FloatTensor([group.label.values[0]])
data = Data(x=x, edge_index=edge_index, y=y)
data_list.append(data)
data, slices = self.collate(data_list)
torch.save((data, slices), self.processed_paths[0])
在构建数据集之后,我们调用shuffle()以确保它已被随机洗牌,然后将其拆分为三组以进行训练,验证和测试。
dataset = dataset.shuffle()
train_dataset = dataset[:800000]
val_dataset = dataset[800000:900000]
test_dataset = dataset[900000:]
len(train_dataset), len(val_dataset), len(test_dataset)
构建图形神经网络
以下自定义GNN参考了PyG官方Github存储库中的一个示例。 我用上面说明的自我实现的SAGEConv层改变了GraphConv层。 此外,还修改了输出层以匹配二进制分类设置。
embed_dim = 128
from torch_geometric.nn import TopKPooling
from torch_geometric.nn import global_mean_pool as gap, global_max_pool as gmp
import torch.nn.functional as F
class Net(torch.nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = SAGEConv(embed_dim, 128)
self.pool1 = TopKPooling(128, ratio=0.8)
self.conv2 = SAGEConv(128, 128)
self.pool2 = TopKPooling(128, ratio=0.8)
self.conv3 = SAGEConv(128, 128)
self.pool3 = TopKPooling(128, ratio=0.8)
self.item_embedding = torch.nn.Embedding(num_embeddings=df.item_id.max() +1, embedding_dim=embed_dim)
self.lin1 = torch.nn.Linear(256, 128)
self.lin2 = torch.nn.Linear(128, 64)
self.lin3 = torch.nn.Linear(64, 1)
self.bn1 = torch.nn.BatchNorm1d(128)
self.bn2 = torch.nn.BatchNorm1d(64)
self.act1 = torch.nn.ReLU()
self.act2 = torch.nn.ReLU()
def forward(self, data):
x, edge_index, batch = data.x, data.edge_index, data.batch
x = self.item_embedding(x)
x = x.squeeze(1)
x = F.relu(self.conv1(x, edge_index))
x, edge_index, _, batch, _ = self.pool1(x, edge_index, None, batch)
x1 = torch.cat([gmp(x, batch), gap(x, batch)], dim=1)
x = F.relu(self.conv2(x, edge_index))
x, edge_index, _, batch, _ = self.pool2(x, edge_index, None, batch)
x2 = torch.cat([gmp(x, batch), gap(x, batch)], dim=1)
x = F.relu(self.conv3(x, edge_index))
x, edge_index, _, batch, _ = self.pool3(x, edge_index, None, batch)
x3 = torch.cat([gmp(x, batch), gap(x, batch)], dim=1)
x = x1 + x2 + x3
x = self.lin1(x)
x = self.act1(x)
x = self.lin2(x)
x = self.act2(x)
x = F.dropout(x, p=0.5, training=self.training)
x = torch.sigmoid(self.lin3(x)).squeeze(1)
return x
训练
训练我们的自定义GNN非常简单,我们只需迭代从训练集构造的DataLoader并反向传播损失函数。 在这里,我们使用Adam作为优化器,学习率设置为0.005,二进制交叉熵作为损失函数。
def train():
model.train()
loss_all = 0
for data in train_loader:
data = data.to(device)
optimizer.zero_grad()
output = model(data)
label = data.y.to(device)
loss = crit(output, label)
loss.backward()
loss_all += data.num_graphs * loss.item()
optimizer.step()
return loss_all / len(train_dataset)
device = torch.device('cuda')
model = Net().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.005)
crit = torch.nn.BCELoss()
train_loader = DataLoader(train_dataset, batch_size=batch_size)
for epoch in range(num_epochs):
train()
验证
由于大多数会议未跟随任何购买活动,因此该标签与绝大多数负面标签高度不平衡。 因此,除了准确性之外,曲线下面积(AUC)对于该任务来说是更好的度量,因为它只关心正例的评分是否高于负例。 我们使用Sklearn的现成AUC计算功能。
def evaluate(loader):
model.eval()
predictions = []
labels = []
with torch.no_grad():
for data in loader:
data = data.to(device)
pred = model(data).detach().cpu().numpy()
label = data.y.detach().cpu().numpy()
predictions.append(pred)
labels.append(label)
结果
我训练了1个epoch的模型,并测量训练,验证和测试AUC分数:
for epoch in range(1):
loss = train()
train_acc = evaluate(train_loader)
val_acc = evaluate(val_loader)
test_acc = evaluate(test_loader)
print('Epoch: {:03d}, Loss: {:.5f}, Train Auc: {:.5f}, Val Auc: {:.5f}, Test Auc: {:.5f}'.
format(epoch, loss, train_acc, val_acc, test_acc))
只有1百万行训练数据(约占所有数据的10%)和1个训练时期,我们可以获得大约0.73的AUC分数,用于验证和测试集。 如果使用更多数据来训练具有更大训练步骤的模型,则得分很可能会提高。
结论
您已经了解了PyTorch Geometric的基本用法,包括数据集构造,自定义图层以及使用真实数据训练GNN。 这篇文章中的所有代码也可以在我的Github仓库中找到。
https://github.com/khuangaf/Pytorch-Geometric-YooChoose
原文链接:
https://towardsdatascience.com/hands-on-graph-neural-networks-with-pytorch-pytorch-geometric-359487e221a8
-END-
专 · 知
专知,专业可信的人工智能知识分发,让认知协作更快更好!欢迎登录www.zhuanzhi.ai,注册登录专知,获取更多AI知识资料!
欢迎微信扫一扫加入专知人工智能知识星球群,获取最新AI专业干货知识教程视频资料和与专家交流咨询!
请加专知小助手微信(扫一扫如下二维码添加),加入专知人工智能主题群,咨询技术商务合作~
专知《深度学习:算法到实战》课程全部完成!550+位同学在学习,现在报名,限时优惠!网易云课堂人工智能畅销榜首位!
点击“阅读原文”,了解报名专知《深度学习:算法到实战》课程