my_fedchain

FedChain

github 仓库 : git@github.com:PromiseChan/my_fed_chain.git

进度:

fedchain VS fedAVG

数据集: cifar10

数据分布 算法 精度
iid fedchain 68.52%
iid fedavg 67.55%
iid fedrecon 54.20%
iid fedchain+fedrecon 47.10%
iin fedchain+fedrecon(改) 47.50%
niid fedchain 44.12%
niid fedavg 57.75%
niid fedrecon 50.35%
niid fedchain+fedrecon 38.14%
niid fedchain+fedrecon(改) 42.58%

数据集: eminist

数据分布 算法 精度
iid fedchain 96.89%
iid fedavg 97.43%
iid fedrecon 98.00%
iid fedchain+fedrecon 97.30%
iid fedchain+fedrecon(改) 98.30%
niid fedchain 97.88%
niid fedavg 97%
niid fedrecon 97.10%
niid fedchain+fedrecon 93.85%
niin fedchain+fedrecon(改) 94.98%
  • fedchain+ fedrecon + 元学习改进 (niid)

  • fedchain + fedrecon + 元学习 (niid)

  • fedchain + fedrecon (niid)

  • fedchain+ fedrecon + 元学习改进 (iid)

  • fedchain + fedrecon + 元学习 (iid)

  • fedchain + fedrecon (iid)

  • fedchain(实验结果)

  • fedchain(iid+niid) 代码

  • fedsgd (实验结果保存,local_epoch = 1, batch_size = ∞, n_client=100)

  • fedsgd (iid+niid)代码

  • fedavg(实验结果保存, local_epoch = 3, batch_size = 64,n_client=10)

  • fedavg(iid+nni) 代码

  • cifar10 model 调通

  • myArgs util 抽取

  • iid+niid 划分 util

fedSGD_iid

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
from copy import deepcopy

import torch
import dataset_utils
import argparse
import torchvision
import torchvision.transforms as transforms
import numpy as np
import cifar10Model
from torch.utils.tensorboard import SummaryWriter
from myArgs import myArgs
import sys


def init_args():
args = myArgs()
args.n_clients = 100
args.BATCH_SIZE = sys.maxsize
args.local_epochs = 1
return args


def get_dataset(args):
# 创建一个转换器,将torchvision数据集的输出范围[0,1]转换为归一化范围的张量[-1,1]。
transform = transforms.Compose([
transforms.Resize((24, 24)),
transforms.ToTensor(),
])

trainset = torchvision.datasets.CIFAR10(
root='../src/data/CIFAR10/',
train=True,
download=True,
transform=transform
)

# 创建测试集
testset = torchvision.datasets.CIFAR10(
root='../src/data/CIFAR10/',
train=False,
download=True,
transform=transform,
)
return trainset, testset


def init_model(args,trainset):
model = cifar10Model.cnn_cifar10(len(trainset.classes),trainset.data[0].shape[2],args)
return model


if __name__ == "__main__":
writer = SummaryWriter()
# 创建summary writer时,指定文件路径
writer = SummaryWriter(log_dir="fedsgd_iid")
# 初始化参数
args = init_args()
# 获取数据集
trainset,testset = get_dataset(args)
# 构造各个客户端的iid数据集
all_client_trainloaders= dataset_utils.get_all_clients_iid_dataloader_list(trainset,args)
# 初始化全局模型
global_model = init_model(args,trainset)
global_model = global_model.to(device="cuda:0")
global_criterion = torch.nn.CrossEntropyLoss()
test_loader = torch.utils.data.DataLoader(testset, batch_size=args.BATCH_SIZE)

# 初始化各个客户端上的模型
client_models = [ deepcopy(global_model).to(device="cuda:0") for _ in range(args.n_clients)]
# 初始化各个客户端的优化器
client_optimizers = [ torch.optim.SGD(client_models[i].parameters(), lr=0.01) for i in range(args.n_clients)]
# 初始化各个客户端的损失函数
client_criterions = [ torch.nn.CrossEntropyLoss() for _ in range(args.n_clients)]
# 开始训练
for epoch in range(args.global_epochs):
# 每个客户端进行本地训练
for client_i in range(args.n_clients):
loss_list = []
accuracy_list = []
client_i_model = client_models[client_i]
client_i_optimizer = client_optimizers[client_i]
client_i_criterion = client_criterions[client_i]
for local_epoch in range(args.local_epochs):
for batch_idx, (inputs, targets) in enumerate(all_client_trainloaders[client_i]):
inputs, targets = inputs.to(device="cuda:0"), targets.to(device="cuda:0")
client_i_optimizer.zero_grad()
outputs = client_i_model(inputs)
loss = client_i_criterion(outputs, targets)

loss_list.append(loss.item())
accuracy_list.append(outputs.argmax(dim=1).eq(targets).sum().item() / len(targets))

loss.backward()
client_i_optimizer.step()
client_models[client_i] = client_i_model
print('Client {} | Global_Epoch {} | Loss {:.4f} | Accuracy {:.4f}'.format(client_i, epoch, np.mean(loss_list), np.mean(accuracy_list)))

# 每个客户端的模型参数进行平均
for client_i in range(args.n_clients):
for name, param in client_models[client_i].named_parameters():
if client_i == 0:
global_model.state_dict()[name].data = param.data
else:
global_model.state_dict()[name].data += param.data
for name, param in global_model.named_parameters():
param.data /= args.n_clients
# 测试全局模型
test_loss = 0
correct = 0
total = 0
with torch.no_grad():
for batch_idx, (inputs, targets) in enumerate(test_loader):
inputs, targets = inputs.to(device="cuda:0"), targets.to(device="cuda:0")
outputs = global_model(inputs)
loss = global_criterion(outputs, targets)
test_loss += loss.item()
_, predicted = outputs.max(1)
total += targets.size(0)
correct += predicted.eq(targets).sum().item()

writer.add_scalar('loss', test_loss / (batch_idx + 1), epoch)
writer.add_scalar('accuracy', 100. * correct / total, epoch)
print('Epoch: %d | Loss: %.3f | Acc: %.3f%% (%d/%d)'
% (epoch, test_loss / (batch_idx + 1), 100. * correct / total, correct, total))

# 将全局模型的参数赋值给各个客户端
for client_i in range(args.n_clients):
client_models[client_i].load_state_dict(global_model.state_dict())

# 保存模型
if epoch % 20 == 0:
torch.save(global_model.state_dict(), 'model_save/fedsgd_iid_{}.pth'.format(epoch))

pass


fedSGD_niid

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import sys
from copy import deepcopy

import torch
import dataset_utils
import argparse
import torchvision
import torchvision.transforms as transforms
import numpy as np
import cifar10Model
from torch.utils.tensorboard import SummaryWriter
from myArgs import myArgs


def init_args():
args = myArgs()
args.n_clients = 100
args.BATCH_SIZE = sys.maxsize
args.local_epochs = 1
return args


def get_dataset(args):
# 创建一个转换器,将torchvision数据集的输出范围[0,1]转换为归一化范围的张量[-1,1]。
transform = transforms.Compose([
transforms.Resize((24, 24)),
transforms.ToTensor(),
])

trainset = torchvision.datasets.CIFAR10(
root='../src/data/CIFAR10/',
train=True,
download=True,
transform=transform
)

# 创建测试集
testset = torchvision.datasets.CIFAR10(
root='../src/data/CIFAR10/',
train=False,
download=True,
transform=transform,
)
return trainset, testset


def init_model(args,trainset):
model = cifar10Model.cnn_cifar10(len(trainset.classes),trainset.data[0].shape[2],args)
return model


# 测试模型loss 和 准确率
def test(epoch,test_loader,model,model_criterion):
test_loss = 0
correct = 0
total = 0
with torch.no_grad():
for batch_idx, (inputs, targets) in enumerate(test_loader):
inputs, targets = inputs.to(device="cuda:0"), targets.to(device="cuda:0")
outputs = model(inputs)
loss = model_criterion(outputs, targets)
test_loss += loss.item()
_, predicted = outputs.max(1)
total += targets.size(0)
correct += predicted.eq(targets).sum().item()
return test_loss / (batch_idx + 1), 100. * correct / total


if __name__ == "__main__":
writer = SummaryWriter()
# 创建summary writer时,指定文件路径
writer = SummaryWriter(log_dir="fedsgd_non_iid")
# 初始化参数
args = init_args()
# 获取数据集
trainset,testset = get_dataset(args)
# 构造各个客户端的niid数据集
all_client_trainloaders= dataset_utils.get_all_clients_niid_dataloader_list(trainset,args)
# 初始化全局模型
global_model = init_model(args,trainset)
global_model = global_model.to(device="cuda:0")
global_criterion = torch.nn.CrossEntropyLoss()
test_loader = torch.utils.data.DataLoader(testset, batch_size=args.BATCH_SIZE)

# 初始化各个客户端上的模型
client_models = [ deepcopy(global_model).to(device="cuda:0") for _ in range(args.n_clients)]
# 初始化各个客户端的优化器
client_optimizers = [ torch.optim.SGD(client_models[i].parameters(), lr=0.01) for i in range(args.n_clients)]
# 初始化各个客户端的损失函数
client_criterions = [ torch.nn.CrossEntropyLoss() for _ in range(args.n_clients)]
# 开始训练
for epoch in range(args.global_epochs):
# 每个客户端进行本地训练
for client_i in range(args.n_clients):
loss_list = []
accuracy_list = []
client_i_model = client_models[client_i]
client_i_optimizer = client_optimizers[client_i]
client_i_criterion = client_criterions[client_i]
for local_epoch in range(args.local_epochs):
for batch_idx, (inputs, targets) in enumerate(all_client_trainloaders[client_i]):
inputs, targets = inputs.to(device="cuda:0"), targets.to(device="cuda:0")
client_i_optimizer.zero_grad()
outputs = client_i_model(inputs)
loss = client_i_criterion(outputs, targets)

loss_list.append(loss.item())
accuracy_list.append(outputs.argmax(dim=1).eq(targets).sum().item() / len(targets))

loss.backward()
client_i_optimizer.step()
client_models[client_i] = client_i_model
print('Client {} | Global_Epoch {} | Loss {:.4f} | Accuracy {:.4f}'.format(client_i, epoch, np.mean(loss_list), np.mean(accuracy_list)))

# 每个客户端的模型参数进行平均
for client_i in range(args.n_clients):
for name, param in client_models[client_i].named_parameters():
if client_i == 0:
global_model.state_dict()[name].data = param.data
else:
global_model.state_dict()[name].data += param.data
for name, param in global_model.named_parameters():
param.data /= args.n_clients

# 将全局模型的参数赋值给各个客户端
for client_i in range(args.n_clients):
client_models[client_i].load_state_dict(global_model.state_dict())

# 在各个客户端上测试模型
loss_i, accuracy_i = test(epoch, test_loader, global_model, global_criterion)
writer.add_scalar('loss', loss_i, epoch)
writer.add_scalar('accuracy', accuracy_i, epoch)
print('Global_Epoch: %d | Loss: %.3f | Acc: %.3f%% '
% (epoch, loss_i, accuracy_i))

if epoch % 20 == 0:
torch.save(global_model.state_dict(), "model_save/fedsgd_niid_{}.pth".format(epoch))

writer.close()
pass


fedavg_iid

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
from copy import deepcopy

import torch
import dataset_utils
import argparse
import torchvision
import torchvision.transforms as transforms
import numpy as np
import cifar10Model
from myArgs import myArgs


def init_args():
args = myArgs()
return args


def get_dataset(args):
# 创建一个转换器,将torchvision数据集的输出范围[0,1]转换为归一化范围的张量[-1,1]。
transform = transforms.Compose([
transforms.Resize((24, 24)),
transforms.ToTensor(),
])

trainset = torchvision.datasets.CIFAR10(
root='../src/data/CIFAR10/',
train=True,
download=True,
transform=transform
)

# 创建测试集
testset = torchvision.datasets.CIFAR10(
root='../src/data/CIFAR10/',
train=False,
download=True,
transform=transform,
)
return trainset, testset


def init_model(args,trainset):
model = cifar10Model.cnn_cifar10(len(trainset.classes),trainset.data[0].shape[2],args)
return model


if __name__ == "__main__":
# 初始化参数
args = init_args()
# 获取数据集
trainset,testset = get_dataset(args)
# 构造各个客户端的iid数据集
all_client_trainloaders= dataset_utils.get_all_clients_iid_dataloader_list(trainset,args)
# 初始化全局模型
global_model = init_model(args,trainset)
global_model = global_model.to(device="cuda:0")
global_criterion = torch.nn.CrossEntropyLoss()
test_loader = torch.utils.data.DataLoader(testset, batch_size=args.BATCH_SIZE)

# 初始化各个客户端上的模型
client_models = [ deepcopy(global_model).to(device="cuda:0") for _ in range(args.n_clients)]
# 初始化各个客户端的优化器
client_optimizers = [ torch.optim.SGD(client_models[i].parameters(), lr=0.01) for i in range(args.n_clients)]
# 初始化各个客户端的损失函数
client_criterions = [ torch.nn.CrossEntropyLoss() for _ in range(args.n_clients)]
# 开始训练
for epoch in range(args.global_epochs):
# 每个客户端进行本地训练
for client_i in range(args.n_clients):
loss_list = []
accuracy_list = []
client_i_model = client_models[client_i]
client_i_optimizer = client_optimizers[client_i]
client_i_criterion = client_criterions[client_i]
for local_epoch in range(args.local_epochs):
for batch_idx, (inputs, targets) in enumerate(all_client_trainloaders[client_i]):
inputs, targets = inputs.to(device="cuda:0"), targets.to(device="cuda:0")
client_i_optimizer.zero_grad()
outputs = client_i_model(inputs)
loss = client_i_criterion(outputs, targets)

loss_list.append(loss.item())
accuracy_list.append(outputs.argmax(dim=1).eq(targets).sum().item() / len(targets))

loss.backward()
client_i_optimizer.step()
client_models[client_i] = client_i_model
print('Client {} | Global_Epoch {} | Loss {:.4f} | Accuracy {:.4f}'.format(client_i, epoch, np.mean(loss_list), np.mean(accuracy_list)))

# 每个客户端的模型参数进行平均
for client_i in range(args.n_clients):
for name, param in client_models[client_i].named_parameters():
if client_i == 0:
global_model.state_dict()[name].data = param.data
else:
global_model.state_dict()[name].data += param.data
for name, param in global_model.named_parameters():
param.data /= args.n_clients
# 测试全局模型
test_loss = 0
correct = 0
total = 0
with torch.no_grad():
for batch_idx, (inputs, targets) in enumerate(test_loader):
inputs, targets = inputs.to(device="cuda:0"), targets.to(device="cuda:0")
outputs = global_model(inputs)
loss = global_criterion(outputs, targets)
test_loss += loss.item()
_, predicted = outputs.max(1)
total += targets.size(0)
correct += predicted.eq(targets).sum().item()
print('Epoch: %d | Loss: %.3f | Acc: %.3f%% (%d/%d)'
% (epoch, test_loss / (batch_idx + 1), 100. * correct / total, correct, total))

# 将全局模型的参数赋值给各个客户端
for client_i in range(args.n_clients):
client_models[client_i].load_state_dict(global_model.state_dict())

pass


fedavg_niid

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
from copy import deepcopy

import torch
import dataset_utils
import argparse
import torchvision
import torchvision.transforms as transforms
import numpy as np
import cifar10Model
from myArgs import myArgs


def init_args():
args = myArgs()
return args


def get_dataset(args):
# 创建一个转换器,将torchvision数据集的输出范围[0,1]转换为归一化范围的张量[-1,1]。
transform = transforms.Compose([
transforms.Resize((24, 24)),
transforms.ToTensor(),
])

trainset = torchvision.datasets.CIFAR10(
root='../src/data/CIFAR10/',
train=True,
download=True,
transform=transform
)

# 创建测试集
testset = torchvision.datasets.CIFAR10(
root='../src/data/CIFAR10/',
train=False,
download=True,
transform=transform,
)
return trainset, testset


def init_model(args,trainset):
model = cifar10Model.cnn_cifar10(len(trainset.classes),trainset.data[0].shape[2],args)
return model


# 测试模型loss 和 准确率
def test(epoch,test_loader,model,model_criterion):
test_loss = 0
correct = 0
total = 0
with torch.no_grad():
for batch_idx, (inputs, targets) in enumerate(test_loader):
inputs, targets = inputs.to(device="cuda:0"), targets.to(device="cuda:0")
outputs = model(inputs)
loss = model_criterion(outputs, targets)
test_loss += loss.item()
_, predicted = outputs.max(1)
total += targets.size(0)
correct += predicted.eq(targets).sum().item()
return test_loss / (batch_idx + 1), 100. * correct / total


if __name__ == "__main__":
# 初始化参数
args = init_args()
# 获取数据集
trainset,testset = get_dataset(args)
# 构造各个客户端的niid数据集
all_client_trainloaders= dataset_utils.get_all_clients_niid_dataloader_list(trainset,args)
# 初始化全局模型
global_model = init_model(args,trainset)
global_model = global_model.to(device="cuda:0")
global_criterion = torch.nn.CrossEntropyLoss()
test_loader = torch.utils.data.DataLoader(testset, batch_size=args.BATCH_SIZE)

# 初始化各个客户端上的模型
client_models = [ deepcopy(global_model).to(device="cuda:0") for _ in range(args.n_clients)]
# 初始化各个客户端的优化器
client_optimizers = [ torch.optim.SGD(client_models[i].parameters(), lr=0.01) for i in range(args.n_clients)]
# 初始化各个客户端的损失函数
client_criterions = [ torch.nn.CrossEntropyLoss() for _ in range(args.n_clients)]
# 开始训练
for epoch in range(args.global_epochs):
# 每个客户端进行本地训练
loss_all_list = []
accuracy_all_list = []
for client_i in range(args.n_clients):
loss_list = []
accuracy_list = []
client_i_model = client_models[client_i]
client_i_optimizer = client_optimizers[client_i]
client_i_criterion = client_criterions[client_i]
for local_epoch in range(args.local_epochs):
for batch_idx, (inputs, targets) in enumerate(all_client_trainloaders[client_i]):
inputs, targets = inputs.to(device="cuda:0"), targets.to(device="cuda:0")
client_i_optimizer.zero_grad()
outputs = client_i_model(inputs)
loss = client_i_criterion(outputs, targets)

loss_list.append(loss.item())
accuracy_list.append(outputs.argmax(dim=1).eq(targets).sum().item() / len(targets))

loss.backward()
client_i_optimizer.step()
client_models[client_i] = client_i_model
print('Client {} | Global_Epoch {} | Loss {:.4f} | Accuracy {:.4f}'.format(client_i, epoch, np.mean(loss_list), np.mean(accuracy_list)))

# 在各个客户端上测试模型
loss_i,accuracy_i = test(epoch,test_loader,client_i_model,client_i_criterion)
loss_all_list.append(loss_i)
accuracy_all_list.append(accuracy_i)

# 每个客户端的模型参数进行平均
for client_i in range(args.n_clients):
for name, param in client_models[client_i].named_parameters():
if client_i == 0:
global_model.state_dict()[name].data = param.data
else:
global_model.state_dict()[name].data += param.data
for name, param in global_model.named_parameters():
param.data /= args.n_clients

# 将全局模型的参数赋值给各个客户端
for client_i in range(args.n_clients):
client_models[client_i].load_state_dict(global_model.state_dict())

print('Global_Epoch: %d | Loss: %.3f | Acc: %.3f%% '
% (epoch, np.mean(loss_all_list), np.mean(accuracy_all_list)))
pass


MyArgs 参数

1
2
3
4
5
6
7
8
9
class myArgs:
def __init__(self):
self.n_clients = 10
self.n_class = 10
self.BATCH_SIZE = 64
self.dataset = 'cifar10'
self.alpha = 1.0
self.global_epochs = 50
self.local_epochs = 3

cifar CNN Model

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import torch
import torch.nn as nn
from torchvision.transforms import Resize


class cnn_cifar10(nn.Module):
def __init__(self, num_classes, num_channels, args):
super(cnn_cifar10, self).__init__()

# self.resize = Resize((24, 24))

# padding的same策略
self.KERE_SIZE = 5
self.PADDING = (self.KERE_SIZE - 1) // 2

self.feature_extractor = nn.Sequential(
nn.Conv2d(num_channels, 64, kernel_size=self.KERE_SIZE, stride=1, padding=self.PADDING),
nn.ReLU(),
nn.ZeroPad2d((0, 1, 0, 1)), # Equivalent of TensorFlow padding 'SAME' for MaxPool2d
nn.MaxPool2d(3, stride=2, padding=0),
nn.LocalResponseNorm(4, alpha=0.001/9),
nn.Conv2d(64, 64, kernel_size=self.KERE_SIZE, stride=1, padding=self.PADDING),
nn.ReLU(),
nn.LocalResponseNorm(4, alpha=0.001/9),
nn.ZeroPad2d((0, 1, 0, 1)), # Equivalent of TensorFlow padding 'SAME' for MaxPool2d
nn.MaxPool2d(3, stride=2, padding=0),
)

self.classifier = nn.Sequential(
nn.Flatten(),
nn.Linear(64*6*6, 384),
nn.ReLU(),
nn.Linear(384, 192),
nn.ReLU(),
nn.Linear(192, num_classes),
)

def forward(self, x):
x = self.feature_extractor(x)
x = self.classifier(x)
return x

dataUtils

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import torch
import numpy as np
from torch.utils.data.sampler import SubsetRandomSampler

# 获取 数据集 { target1 -> [index1,index2...], target2 -> [index1,index2...] } 的 map
def get_class_index_list_map(trainset):
class_map = {}
# 遍历数据集上所有数据的标签,将每个类别的索引存储在 class_map 中
for index_of_target in range(len(trainset.targets)):
class_target = trainset.targets[index_of_target]
if class_target not in class_map:
class_map[class_target] = []
class_map[class_target].append(index_of_target)
else:
class_map[class_target].append(index_of_target)
return class_map



# 根据 class_map 和 data_percents(每个类别在私有数据集上的占比) 构造出每个 client 的niid数据集
def get_all_clients_niid_dataloader_list(trainset,args):
all_client_trainloaders = []
class_map = get_class_index_list_map(trainset)

# 统计每个类别在总数据集上的占比
p_class_tmp = []
for cls in range(len(trainset.classes)):
p_class_tmp.append(trainset.targets.count(cls))
p_class = torch.tensor(p_class_tmp, dtype=torch.float32)
p_class = p_class / len(trainset)

# 根据每个类别在总数据集上的占比(即,入参:各个类别的浓度),以及 alpah 构造出每个 client 在每个类别上的占比
data_percents = torch.distributions.dirichlet.Dirichlet(p_class * args.alpha).sample((args.n_clients,))
# 根据迪利克雷分布矩阵,遍历构造出每个 client 的数据dataloader
for i in range(args.n_clients):
one_client_sample_data_percent_list = data_percents[i]
one_client_indices = []
for j in range(args.n_class):
one_class_percent = one_client_sample_data_percent_list[j]
select_count = len(class_map[j]) * one_class_percent
sub_index = np.random.choice(class_map[j], int(select_count), replace=False)
one_client_indices.extend(sub_index)

# 构造出每个 client 的数据dataloader
train_sampler = SubsetRandomSampler(one_client_indices)
train_loader = torch.utils.data.DataLoader(trainset, batch_size=args.BATCH_SIZE, sampler=train_sampler)
# 将每个 client 的数据dataloader 存储在 all_client_trainloaders 中
all_client_trainloaders.append(train_loader)
return all_client_trainloaders



# 根据 class_map 和 data_percents(每个类别在私有数据集上的占比) 构造出每个 client 的iid数据集
def get_all_clients_iid_dataloader_list(trainset,args):
all_client_trainloaders = []
class_map = get_class_index_list_map(trainset)

# 统计每个类别在总数据集上的占比
p_class = []
for cls in range(len(trainset.classes)):
p_class.append(trainset.targets.count(cls) / len(trainset.targets))

# 独立同分布
data_percents = [p_class for i in range(args.n_clients)]
data_percents = torch.tensor(data_percents)

# 根据独立同分布,遍历构造出每个 client 的数据dataloader
for i in range(args.n_clients):
one_client_sample_data_percent_list = data_percents[i]
one_client_indices = []
for j in range(args.n_class):
one_class_percent = one_client_sample_data_percent_list[j]
select_count = len(class_map[j]) * one_class_percent
sub_index = np.random.choice(class_map[j], int(select_count), replace=False)
one_client_indices.extend(sub_index)

# 构造出每个 client 的数据dataloader
train_sampler = SubsetRandomSampler(one_client_indices)
train_loader = torch.utils.data.DataLoader(trainset, batch_size=args.BATCH_SIZE, sampler=train_sampler)
# 将每个 client 的数据dataloader 存储在 all_client_trainloaders 中
all_client_trainloaders.append(train_loader)
return all_client_trainloaders

fedChain

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import sys
from copy import deepcopy

import torch
import dataset_utils
import argparse
import torchvision
import torchvision.transforms as transforms
import numpy as np
import cifar10Model
from torch.utils.tensorboard import SummaryWriter
from myArgs import myArgs


def init_args():
args = myArgs()
return args


def get_dataset(args):
# 创建一个转换器,将torchvision数据集的输出范围[0,1]转换为归一化范围的张量[-1,1]。
transform = transforms.Compose([
transforms.Resize((24, 24)),
transforms.ToTensor(),
])

trainset = torchvision.datasets.CIFAR10(
root='../src/data/CIFAR10/',
train=True,
download=True,
transform=transform
)

# 创建测试集
testset = torchvision.datasets.CIFAR10(
root='../src/data/CIFAR10/',
train=False,
download=True,
transform=transform,
)
return trainset, testset


def init_model(args,trainset):
model = cifar10Model.cnn_cifar10(len(trainset.classes),trainset.data[0].shape[2],args)
return model


def one_span_train_and_save(start_epoch, end_epoch,
client_models, client_optimizers, client_criterions, all_client_trainloaders,
global_model, global_criterion, test_loader,
args, writer):
for epoch in range(start_epoch, end_epoch):
# 每个客户端进行本地训练
for client_i in range(args.n_clients):
loss_list = []
accuracy_list = []
client_i_model = client_models[client_i]
client_i_optimizer = client_optimizers[client_i]
client_i_criterion = client_criterions[client_i]
for local_epoch in range(args.local_epochs):
for batch_idx, (inputs, targets) in enumerate(all_client_trainloaders[client_i]):
inputs, targets = inputs.to(device="cuda:0"), targets.to(device="cuda:0")
client_i_optimizer.zero_grad()
outputs = client_i_model(inputs)
loss = client_i_criterion(outputs, targets)

loss_list.append(loss.item())
accuracy_list.append(outputs.argmax(dim=1).eq(targets).sum().item() / len(targets))

loss.backward()
client_i_optimizer.step()
client_models[client_i] = client_i_model
print('Client {} | Global_Epoch {} | Loss {:.4f} | Accuracy {:.4f}'.format(client_i, epoch,
np.mean(loss_list),
np.mean(accuracy_list)))

# 每个客户端的模型参数进行平均
for client_i in range(args.n_clients):
for name, param in client_models[client_i].named_parameters():
if client_i == 0:
global_model.state_dict()[name].data = param.data
else:
global_model.state_dict()[name].data += param.data
for name, param in global_model.named_parameters():
param.data /= args.n_clients


# 测试全局模型
test_loss = 0
correct = 0
total = 0
with torch.no_grad():
for batch_idx, (inputs, targets) in enumerate(test_loader):
inputs, targets = inputs.to(device="cuda:0"), targets.to(device="cuda:0")
outputs = global_model(inputs)
loss = global_criterion(outputs, targets)
test_loss += loss.item()
_, predicted = outputs.max(1)
total += targets.size(0)
correct += predicted.eq(targets).sum().item()

writer.add_scalar('loss', test_loss / (batch_idx + 1), epoch)
writer.add_scalar('accuracy', 100. * correct / total, epoch)
print('Global_Epoch: %d | Loss: %.3f | Acc: %.3f%% (%d/%d)'
% (epoch, test_loss / (batch_idx + 1), 100. * correct / total, correct, total))

# 将全局模型的参数赋值给各个客户端
for client_i in range(args.n_clients):
client_models[client_i].load_state_dict(global_model.state_dict())

# 保存模型
if epoch % 20 == 0:
torch.save(global_model.state_dict(), 'model_save/fedchain_iid_{}.pth'.format(epoch))


if __name__ == "__main__":
writer = SummaryWriter()
# 创建summary writer时,指定文件路径
writer = SummaryWriter(log_dir="fedchain_iid")
# 初始化参数
args = init_args()
# 获取数据集
trainset,testset = get_dataset(args)
# 构造各个客户端的iid数据集
all_client_trainloaders= dataset_utils.get_all_clients_iid_dataloader_list(trainset,args)
# 初始化全局模型
global_model = init_model(args,trainset)
global_model = global_model.to(device="cuda:0")
global_criterion = torch.nn.CrossEntropyLoss()
test_loader = torch.utils.data.DataLoader(testset, batch_size=args.BATCH_SIZE)

# 初始化各个客户端上的模型
client_models = [ deepcopy(global_model).to(device="cuda:0") for _ in range(args.n_clients)]
# 初始化各个客户端的优化器
client_optimizers = [ torch.optim.SGD(client_models[i].parameters(), lr=0.01) for i in range(args.n_clients)]
# 初始化各个客户端的损失函数
client_criterions = [ torch.nn.CrossEntropyLoss() for _ in range(args.n_clients)]
# ##### step 1: FedAVG 开始训练 #####
print("##### step 1: FedAVG 开始训练 ##### ")
one_span_train_and_save(0, args.fed_avg_epochs,
client_models, client_optimizers, client_criterions, all_client_trainloaders,
global_model, global_criterion, test_loader,
args, writer
)
print("##### step 1: FedAVG 训练结束 ##### ")

# ##### step 2: 参数切换到 FedSGD #####
print("##### step 2: 参数切换到 FedSGD ##### ")
args.local_epochs = 1
args.BATCH_SIZE = sys.maxsize
all_client_trainloaders = dataset_utils.get_all_clients_iid_dataloader_list(trainset, args)
test_loader = torch.utils.data.DataLoader(testset, batch_size=args.BATCH_SIZE)
print(args)

# ##### step 3: FedSGD 开始训练 #####
print("##### step 3: FedSGD 开始训练 ##### ")
one_span_train_and_save(args.fed_avg_epochs, args.global_epochs,
client_models, client_optimizers, client_criterions, all_client_trainloaders,
global_model, global_criterion, test_loader,
args, writer
)
print("##### step 3: FedSGD 训练结束 ##### ")

FedRecon

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
import sys

import numpy as np

sys.path.append("/")
sys.path.append("data")

import torch
import pickle
from data.utils import get_dataset
from copy import deepcopy
from utils import train_with_logging
from torch.utils.data import random_split, DataLoader
from path import Path

PROJECT_DIR = Path(__file__).parent.parent.abspath()
CLIENTS_DIR = PROJECT_DIR / "clients"


class FedReconTrainer:
def __init__(self, args, model, logger):
"""
The function initializes the parameters of the model, and also initializes the dataloaders for
the support set, query set, and validation set

Args:
args: the arguments passed to the main script
model: the model to be trained
logger: a logger object to log the training process
"""
if args.gpu and torch.cuda.is_available():
self.device = torch.device("cuda")
else:
self.device = torch.device("cpu")

self.recon_epochs = args.recon_epochs
self.pers_epochs = args.pers_epochs
self.logger = logger
self.model = deepcopy(model)
self.backup_local_params = self.model.local_params(
requires_name=True, data_only=True
)
self.batch_size = args.batch_size
self.valset_ratio = args.valset_ratio
self.dataset = args.dataset
self.criterion = torch.nn.CrossEntropyLoss()
self.recon_lr = args.recon_lr
self.pers_lr = args.pers_lr
self.no_split = args.no_split

self.id = None
self.support_set_dataloader = None
self.query_set_dataloader = None
self.val_set_dataloader = None
self.weight = 0
self.optimizer = torch.optim.SGD(self.model.parameters(), lr=0.01, momentum=0.9)

def train(self, client_id, model_params, have_seen=False, validation=False, global_epoch=50):
"""
The function takes in the client id, the model parameters, and a boolean value indicating whether
the client has been trained before. If the client has been trained before, the function loads
the client data. Otherwise, the function splits the dataset and loads the backup local
parameters. The function then calculates the model difference and trains the model with logging.
The function then calculates the pseudo gradients and saves the client data. The function returns
the model difference and the weight

Args:
client_id: the id of the client
model_params: the global model parameters
have_seen: whether the client has been trained before. Defaults to False
validation: whether to use the validation set or not. Defaults to False

Returns:
The model_diff is the difference between the global model and the local model.
"""
self.id = client_id
self.model.load_state_dict(model_params, strict=False)

if have_seen:
self.load_client_data(client_id)
else:
self.split_dataset()
self.model.load_state_dict(self.backup_local_params, strict=False)

self.id = client_id
model_diff = self.model.global_params(requires_name=True, data_only=True)

res_map = train_with_logging(self, validation)()
loss_after = res_map["loss_after"]
acc_after = res_map["acc_after"]

# calculate the pseudo gradients
with torch.no_grad():
for frz_p, updated_p in zip(
model_diff.values(), self.model.global_params()
):
frz_p.sub_(updated_p)

self.save_client_data()

return model_diff, self.weight, loss_after, acc_after

def _train(self):
"""
> For each epoch, we first train the local model on the support set, then we train the global model
on the query set
"""
all_loss = []
all_acc = []
# reconstruction phase
for _ in range(self.recon_epochs):
for x, y in self.support_set_dataloader:
x, y = x.to(self.device), y.to(self.device)
logit = self.model(x)
loss = self.criterion(logit, y)
gradients = torch.autograd.grad(loss, self.model.local_params())
for param, grad in zip(self.model.local_params(), gradients):
param.data -= self.recon_lr * grad

# personalzation phase
for _ in range(self.pers_epochs):
for x, y in self.query_set_dataloader:
x, y = x.to(self.device), y.to(self.device)
logit = self.model(x)
loss = self.criterion(logit, y)

all_loss.append(loss.item())
all_acc.append((logit.argmax(dim=1) == y).float().mean().item())

self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()

return np.mean(all_loss), np.mean(all_acc)


def eval(self, model_params, client_id):
"""
The function takes in the model parameters and the client id, and then loads the model
parameters into the model, and then loads the backup local parameters into the model, and then
splits the dataset, and then returns the result of the train_with_logging function

Args:
model_params: the model parameters to be evaluated
client_id: the id of the client

Returns:
The return value is the validation loss.
"""
self.id = client_id

self.model.load_state_dict(model_params, strict=False)
self.model.load_state_dict(self.backup_local_params, strict=False)

self.split_dataset()

return train_with_logging(self, validation=True)()

def split_dataset(self):
"""
The function splits the dataset into training, validation and test sets.
"""

dataset = get_dataset(self.dataset, self.id)

num_val_samples = int(self.valset_ratio * len(dataset))
num_train_samples = len(dataset) - num_val_samples

training_set, val_set = random_split(
dataset, [num_train_samples, num_val_samples]
)
if self.no_split:
num_support_samples = num_query_samples = num_train_samples
support_set = query_set = training_set
else:
# query set's size is set same as the support set's by default.
num_support_samples = int(num_train_samples / 2)
num_query_samples = num_train_samples - num_support_samples
support_set, query_set = random_split(
training_set, [num_support_samples, num_query_samples]
)

self.support_set_dataloader = DataLoader(support_set, self.batch_size)
self.query_set_dataloader = DataLoader(query_set, self.batch_size)
self.val_set_dataloader = DataLoader(val_set, self.batch_size)
self.weight = num_query_samples

def save_client_data(self):
"""
It saves the client's data, weight, and local parameters to a pickle file
"""
local_params = self.model.local_params(requires_name=True, data_only=True)
pkl_path = CLIENTS_DIR / f"{self.id}.pkl"
with open(pkl_path, "wb") as f:
pickle.dump(
{
"support": self.support_set_dataloader,
"query": self.query_set_dataloader,
"val": self.val_set_dataloader,
"weight": self.weight,
"local_params": local_params,
},
f,
)

def load_client_data(self, client_id):
"""
It loads the client data from a pickle file, and then sets the support, query, and validation
dataloaders, as well as the weight and local parameters of the model

Args:
client_id: the id of the client we want to load data for
"""
pkl_path = CLIENTS_DIR / f"{client_id}.pkl"
with open(pkl_path, "rb") as f:
client_data = pickle.load(f)
self.support_set_dataloader = client_data["support"]
self.query_set_dataloader = client_data["query"]
self.val_set_dataloader = client_data["val"]
self.weight = client_data["weight"]
local_params = client_data["local_params"]
self.model.load_state_dict(local_params, strict=False)