https://mydoc.myencyclopedia.top/pub-html/03_gcn.html
https://www.bilibili.com/video/BV1f3411i7MQ?p=3
https://blog.csdn.net/StarfishCu/article/details/109132437GCN学习:用PyG实现自定义layers的GCN网络及训练(五)
使用
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.datasets import Planetoid
import torch_geometric.transforms as T
name_data = 'Cora'
dataset = Planetoid(root='data', name=name_data)
# dataset.transform = T.NormalizeFeatures()
print(f"Number of Classes in {name_data}:", dataset.num_classes)
print(f"Number of Node Features in {name_data}:", dataset.num_node_features)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
data = dataset[0].to(device)
>>>
Number of Classes in Cora: 7
Number of Node Features in Cora: 1433
dataset.data
#Data(x=[2708, 1433], edge_index=[2, 10556], y=[2708], train_mask=[2708], val_mask=[2708], test_mask=[2708])
因为len(dataset)
=1,也就是说数据集里只有一张图,所以只需 data = dataset[0].to(device)
可以看到数据已经转移到了cuda中了
封装函数
train
def train(model, data, optimizer):
model.train()
optimizer.zero_grad()
out = model(data)
loss = F.nll_loss(out[data.train_mask], data.y[data.train_mask])
loss.backward()
optimizer.step()
return loss.item()
Test
@torch.no_grad()
def test(model, data):
model.eval()
_, pred = model(data).max(dim=1)
correct_train = pred[data.train_mask].eq(data.y[data.train_mask]).sum().item()
acc_train = correct_train / data.train_mask.sum().item()
correct_val = pred[data.val_mask].eq(data.y[data.val_mask]).sum().item()
acc_val = correct_val / data.val_mask.sum().item()
correct_test = pred[data.test_mask].eq(data.y[data.test_mask]).sum().item()
acc_test = correct_test / data.test_mask.sum().item()
return acc_train, acc_val, acc_test
这里的@torch.no_grad() 不需要计算梯度,也不会进行反向传播
model.eval() # 测试模式
with torch.no_grad():
pass
#等价于
@torch.no_grad()
def test():
...
绘图工具
def live_plot(model, epoch_num=100):
from livelossplot import PlotLosses
liveloss = PlotLosses()
model = model.to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01, weight_decay=5e-4)
for epoch in range(epoch_num):
loss = train(model, dataset[0], optimizer)
acc_train, acc_val, acc_test = test(model, dataset[0])
logs = {
'loss': loss,
'acc_val': acc_val,
'acc_test': acc_test,
}
liveloss.update(logs)
liveloss.send()
Kipf GCN
https://github.com/tkipf/pygcn
最原始的GCN。借用PyG实现Kipf GCN,由于Kipf GCN使用的是稠密矩阵,而PyG里使用的是稀疏矩阵,所以需要做转化。
#稀疏矩阵→稠密矩阵
def convert_data_to_adj(data):
from torch_geometric.utils import to_dense_adj
feature, edge_index = data.x, data.edge_index
adj = torch.squeeze(to_dense_adj(edge_index))
return feature, adj
函数测试
使用PyG的例子
import torch
from torch_geometric.data import Data
edge_index = torch.tensor([[0, 1, 1, 2],
[1, 0, 2, 1]], dtype=torch.long)
x = torch.tensor([[-1], [0], [1]], dtype=torch.float)
data = Data(x=x, edge_index=edge_index)
f, adj = convert_data_to_adj(data)
print(adj)
>>>
tensor([[0., 1., 0.],
[1., 0., 1.],
[0., 1., 0.]])
测试成功,开始编写Kipf GCN
# https://github.com/tkipf/pygcn/blob/master/pygcn/layers.py
import torch
from torch.nn.parameter import Parameter
from torch.nn.modules.module import Module
class GraphConvolution(Module):
"""
Simple GCN layer, similar to https://arxiv.org/abs/1609.02907
"""
def __init__(self, in_features, out_features, bias=True):
super(GraphConvolution, self).__init__()
self.in_features = in_features
self.out_features = out_features
self.weight = Parameter(torch.FloatTensor(in_features, out_features))
if bias:
self.bias = Parameter(torch.FloatTensor(out_features))
else:
self.register_parameter('bias', None)
self.reset_parameters()
def reset_parameters(self):
stdv = 1. / np.sqrt(self.weight.size(1))
self.weight.data.uniform_(-stdv, stdv)
if self.bias is not None:
self.bias.data.uniform_(-stdv, stdv)
def forward(self, data):
# note in the orig impl @ https://github.com/tkipf/pygcn/blob/master/pygcn/layers.py
# forward is defined as
# def forward(self, input, adj)
# where adj is sparse adj matrix
#增加了这一个语句
input, adj = convert_data_to_adj(data)
# now adj fits orig impl
support = torch.mm(input, self.weight)
output = torch.spmm(adj, support)
if self.bias is not None:
return output + self.bias
else:
return output
def __repr__(self):
return self.__class__.__name__ + ' (' \
+ str(self.in_features) + ' -> ' \
+ str(self.out_features) + ')'
model_gcn_kipf = GraphConvolution(dataset.num_features, dataset.num_classes)
live_plot(model_gcn_kipf, epoch_num=120)
torch_geometric.nn.GCNConv
定义了一个包含2层GCN的网络
from torch_geometric.nn import GCNConv
class GCNConv_Demo(nn.Module):
def __init__(self, in_channels, hidden_channels, out_channels):
super().__init__()
self.conv1 = GCNConv(in_channels, hidden_channels)
self.conv2 = GCNConv(hidden_channels, out_channels)
def forward(self, data):
x, edge_index = data.x, data.edge_index
x = self.conv1(x, edge_index).relu()
x = self.conv2(x, edge_index)
return F.log_softmax(x, dim=1)
net=GCNConv_Demo(dataset.num_features,16,dataset.num_classes)
live_plot(net,50)
Impl GCNConv based on MessagePassing
https://pytorch-geometric.readthedocs.io/en/latest/notes/create_gnn.html
PyG的消息传递机制
from torch_geometric.utils import add_self_loops, degree
import torch_geometric
class GCNConvImpl(torch_geometric.nn.MessagePassing):
def __init__(self, in_channels, out_channels):
super().__init__(aggr='add') # "Add" aggregation (Step 5).
self.lin = torch.nn.Linear(in_channels, out_channels)
def forward(self, x, edge_index):
# x has shape [N, in_channels]
# edge_index has shape [2, E]
# Step 1: Add self-loops to the adjacency matrix.
edge_index, _ = add_self_loops(edge_index, num_nodes=x.size(0))
# Step 2: Linearly transform node feature matrix.
x = self.lin(x)
# Step 3: Compute normalization.
row, col = edge_index
deg = degree(col, x.size(0), dtype=x.dtype)
deg_inv_sqrt = deg.pow(-0.5)
deg_inv_sqrt[deg_inv_sqrt == float('inf')] = 0
norm = deg_inv_sqrt[row] * deg_inv_sqrt[col]
# Step 4-5: Start propagating messages.
return self.propagate(edge_index, x=x, norm=norm)
def message(self, x_j, norm):
# x_j has shape [E, out_channels]
# Step 4: Normalize node features.
return norm.view(-1, 1) * x_j
class GCNConvMessagePassingDemo(torch.nn.Module):
def __init__(self, in_channels, hidden_channels, out_channels):
super().__init__()
self.conv1 = GCNConvImpl(in_channels, hidden_channels)
self.conv2 = GCNConvImpl(hidden_channels, out_channels)
def forward(self, data):
x, edge_index = data.x, data.edge_index
x = self.conv1(x, edge_index).relu()
x = self.conv2(x, edge_index)
return F.log_softmax(x, dim=1)
model_gcn_impl_demo = GCNConvMessagePassingDemo(dataset.num_features, 16, dataset.num_classes)
live_plot(model_gcn_impl_demo)