Federated learning: aggregation operators
In this notebook, we provide an explanation of the implementation of the different federated aggregation operators provided in the framework. Before discussing the different aggregation operators, we must establish the federated configuration (for more information see notebook Federated learning basic concepts).
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import shfl
database = shfl.data_base.Emnist()
train_data, train_labels, test_data, test_labels = database.load_data()
iid_distribution = shfl.data_distribution.IidDataDistribution(database)
nodes_federation, test_data, test_labels = iid_distribution.get_nodes_federation(num_nodes=5, percent=10)
def model_builder():
model = tf.keras.models.Sequential()
model.add(tf.keras.layers.Conv2D(32, kernel_size=(3, 3), padding='same', activation='relu', strides=1, input_shape=(28, 28, 1)))
model.add(tf.keras.layers.MaxPooling2D(pool_size=2, strides=2, padding='valid'))
model.add(tf.keras.layers.Dropout(0.4))
model.add(tf.keras.layers.Conv2D(32, kernel_size=(3, 3), padding='same', activation='relu', strides=1))
model.add(tf.keras.layers.MaxPooling2D(pool_size=2, strides=2, padding='valid'))
model.add(tf.keras.layers.Dropout(0.3))
model.add(tf.keras.layers.Flatten())
model.add(tf.keras.layers.Dense(128, activation='relu'))
model.add(tf.keras.layers.Dropout(0.1))
model.add(tf.keras.layers.Dense(64, activation='relu'))
model.add(tf.keras.layers.Dense(10, activation='softmax'))
loss = tf.keras.losses.CategoricalCrossentropy()
optimizer = tf.keras.optimizers.RMSprop()
metrics = [tf.keras.metrics.categorical_accuracy]
return shfl.model.DeepLearningModel(model=model, loss=loss, optimizer=optimizer, metrics=metrics)
def reshape_data(labeled_data):
labeled_data.data = np.reshape(labeled_data.data, (labeled_data.data.shape[0], labeled_data.data.shape[1], labeled_data.data.shape[2],1))
def normalize_data(data, mean, std):
data.data = (data.data - mean) / std
nodes_federation.apply_data_transformation(reshape_data)
mean = np.mean(train_data.data)
std = np.std(train_data.data)
nodes_federation.apply_data_transformation(normalize_data, mean=mean, std=std);
test_data = np.reshape(test_data, (test_data.shape[0], test_data.shape[1], test_data.shape[2],1))
Once we have loaded and federated the data and established the learning model, the only step that remains is to establish the aggregation operator. At the moment, the framework has FedAvg and WeightedFedAvg implemented. The implementation of the federated aggregation operators are as follows.
NOTE: The aggregators are required to be callable. That is, we can use a simple function. Alternatively, if a more sophisticated implementation is needed in a class, this has simply to implement the method __call__
(see code in following links for examples).
Federated averaging operator
In this section, we detail the implementation of FedAvg
(see FedAvg) proposed by Google in this paper.
It is based on the arithmetic mean of the local weights trained in each of the local clients . That is, the weights of the global model after each round of training are
For its implementation, we create a class that implements the FederatedAggregator interface.
from shfl.federated_aggregator.fedavg_aggregator import FedAvgAggregator
fedavg_aggregator = FedAvgAggregator()
Weighted federated averaging operator
In this section, we detail the implementation of WeightedFedAvg
(see WeightedFedAvg). It is the weighted version of FedAvg
. The weight of each client is determined by the amount of client data with respect to total training data . That is, the parameters of the global model after each round of training are:
When all clients have the same amount of data, it is equivalent to FedAvg.
To implement it, we create a class that implements the FederatedAggregator
interface. For this purpose, we first weigh the local parameters by percentage and then sum the weighted parameters.
from shfl.federated_aggregator.weighted_fedavg_aggregator import WeightedFedAggregator
weighted_fedavg_aggregator = WeightedFedAggregator(30)
Finally, we are ready to establish the federated government with any of the implemented aggregation operators and start the federated learning process.
federated_government = shfl.federated_government.FederatedGovernment(model_builder(), nodes_federation, fedavg_aggregator)
2022-04-26 15:25:23.023614: I tensorflow/core/platform/cpu_feature_guard.cc:151] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations: AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2022-04-26 15:25:23.302655: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1525] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 2138 MB memory: -> device: 0, name: NVIDIA GeForce RTX 3050 Laptop GPU, pci bus id: 0000:01:00.0, compute capability: 8.6
federated_government.run_rounds(5, test_data, test_labels)
2022-04-26 15:25:25.222219: I tensorflow/stream_executor/cuda/cuda_dnn.cc:368] Loaded cuDNN version 8303
2022-04-26 15:25:26.098535: I tensorflow/stream_executor/cuda/cuda_blas.cc:1786] TensorFloat-32 will be used for the matrix multiplication. This will only be logged once.
Evaluation in round 0:
Collaborative model test -> loss: 10.470479965209961 categorical_accuracy: 0.8862749934196472
Evaluation in round 1:
Collaborative model test -> loss: 11.364419937133789 categorical_accuracy: 0.9259250164031982
Evaluation in round 2:
Collaborative model test -> loss: 11.860504150390625 categorical_accuracy: 0.9331750273704529
Evaluation in round 3:
Collaborative model test -> loss: 11.764689445495605 categorical_accuracy: 0.9387750029563904
Evaluation in round 4:
Collaborative model test -> loss: 11.562532424926758 categorical_accuracy: 0.9453999996185303
Cluster federated averaging operator
In this section, we detail the implementation of ClusterFedAvg
(see ClusterFedAvg).
Cluster Federated Averaging is based on the aggregation operator used for k-means clustering. When aggregating the centroids of a federated K-means clustering, we are faced with the problem of grouping the clusters for subsequent aggregation. Based on the hypothesis that the closest centroids will belong to the same cluster, we apply K-means over the centroids, in order to group the centroids that belong to the same cluster and to obtain the representation (aggregation) of each group. We choose the new centroids obtained as the aggregation.
This time, instead of a class, we implement a function (remember, the aggregator simply needs to be callable):
from shfl.federated_aggregator.cluster_fedavg_aggregator import cluster_fed_avg_aggregator
clustering_aggregator = cluster_fed_avg_aggregator
We create a federated government of clustering, in order to apply this aggregation operator.
c_database = shfl.data_base.Iris()
c_train_data, c_train_labels, c_test_data, c_test_labels = c_database.load_data()
c_iid_distribution = shfl.data_distribution.IidDataDistribution(c_database)
c_nodes_federation, c_test_data, c_test_labels = c_iid_distribution.get_nodes_federation(num_nodes=3, percent=50)
n_clusters = 3 # Set number of clusters
n_features = train_data.shape[1]
def clustering_model_builder():
model = shfl.model.KMeansModel(n_clusters=n_clusters, n_features = n_features)
return model
clustering_federated_government = shfl.federated_government.FederatedGovernment(clustering_model_builder(), c_nodes_federation, clustering_aggregator)
clustering_federated_government.run_rounds(5, c_test_data, c_test_labels)
Evaluation in round 0:
Collaborative model test -> homogeneity_score: 0.7402314858536988 completeness_score: 0.7244113753984411 v_measure_score: 0.7322359914034998 adjusted_rand_score: 0.5956497490239822
Evaluation in round 1:
Collaborative model test -> homogeneity_score: 0.6412965771037632 completeness_score: 0.6259364201792915 v_measure_score: 0.6335234082543412 adjusted_rand_score: 0.5191468565643872
Evaluation in round 2:
Collaborative model test -> homogeneity_score: 0.6412965771037632 completeness_score: 0.6259364201792915 v_measure_score: 0.6335234082543412 adjusted_rand_score: 0.5191468565643872
Evaluation in round 3:
Collaborative model test -> homogeneity_score: 0.6412965771037632 completeness_score: 0.6259364201792915 v_measure_score: 0.6335234082543412 adjusted_rand_score: 0.5191468565643872
Evaluation in round 4:
Collaborative model test -> homogeneity_score: 0.6412965771037632 completeness_score: 0.6259364201792916 v_measure_score: 0.6335234082543413 adjusted_rand_score: 0.5191468565643872