Aggregation Operators
In this notebook, we provide an explanation of the implementation of the different federated aggregation operators provided in the platform. Before discussing the different aggregation operators, we must establish the federated configuration (for more information see the A Simple Experiment) notebook.
import matplotlib.pyplot as plt
import shfl
import tensorflow as tf
import numpy as np
database = shfl.data_base.Emnist()
train_data, train_labels, test_data, test_labels = database.load_data()
iid_distribution = shfl.data_distribution.IidDataDistribution(database)
federated_data, test_data, test_labels = iid_distribution.get_federated_data(num_nodes=5, percent=10)
def model_builder():
model = tf.keras.models.Sequential()
model.add(tf.keras.layers.Conv2D(32, kernel_size=(3, 3), padding='same', activation='relu', strides=1, input_shape=(28, 28, 1)))
model.add(tf.keras.layers.MaxPooling2D(pool_size=2, strides=2, padding='valid'))
model.add(tf.keras.layers.Dropout(0.4))
model.add(tf.keras.layers.Conv2D(32, kernel_size=(3, 3), padding='same', activation='relu', strides=1))
model.add(tf.keras.layers.MaxPooling2D(pool_size=2, strides=2, padding='valid'))
model.add(tf.keras.layers.Dropout(0.3))
model.add(tf.keras.layers.Flatten())
model.add(tf.keras.layers.Dense(128, activation='relu'))
model.add(tf.keras.layers.Dropout(0.1))
model.add(tf.keras.layers.Dense(64, activation='relu'))
model.add(tf.keras.layers.Dense(10, activation='softmax'))
model.compile(optimizer="rmsprop", loss="categorical_crossentropy", metrics=["accuracy"])
return shfl.model.DeepLearningModel(model)
class Reshape(shfl.private.FederatedTransformation):
def apply(self, labeled_data):
labeled_data.data = np.reshape(labeled_data.data, (labeled_data.data.shape[0], labeled_data.data.shape[1], labeled_data.data.shape[2],1))
shfl.private.federated_operation.apply_federated_transformation(federated_data, Reshape())
class Normalize(shfl.private.FederatedTransformation):
def __init__(self, mean, std):
self.__mean = mean
self.__std = std
def apply(self, labeled_data):
labeled_data.data = (labeled_data.data - self.__mean)/self.__std
mean = np.mean(train_data.data)
std = np.std(train_data.data)
shfl.private.federated_operation.apply_federated_transformation(federated_data, Normalize(mean, std))
test_data = np.reshape(test_data, (test_data.shape[0], test_data.shape[1], test_data.shape[2],1))
Once we have loaded and federated the data and established the learning model, the only step that remains is to establish the aggregation operator. At the moment, the framework has FedAvg and WeightedFedAvg implemented. The implementation of the federated aggregation operators are as follows.
Federated Averaging (FedAvg
) Operator
In this section, we detail the implementation of FedAvg (see FedAvg) proposed by Google in this paper.
It is based on the arithmetic mean of the local weights Wi trained in each of the local clients Ci. That is, the weights W of the global model after each round of training are
For its implementation, we create a class that implements the FederatedAggregator interface. The method aggregate_weights is overwritten by calculating the mean of the local weights of each client.
import numpy as np
from shfl.federated_aggregator.federated_aggregator import FederatedAggregator
class FedAvgAggregator(FederatedAggregator):
"""
Implementation of Federated Averaging Aggregator. It only uses a simple average of the parameters of all the models
"""
def aggregate_weights(self, clients_params):
clients_params_array = np.array(clients_params)
num_clients = clients_params_array.shape[0]
num_layers = clients_params_array.shape[1]
aggregated_weights = np.array([np.mean(clients_params_array[:, layer], axis=0) for layer in range(num_layers)])
return aggregated_weights
fedavg_aggregator = FedAvgAggregator()
Weighted Federated Averaging (WeightedFedAvg
) Operator
In this section, we detail the implementation of WeightedFedAvg
(see WeightedFedAvg).
It is the weighted version of FedAvg
. The weight of each client Ci is determined by the amount of client data
ni with respect to total training data n. That is,
the parameters W of the global
model after each round of training are:
When all clients have the same amount of data, it is equivalent to FedAvg.
To implement it, we create a class that implements the FederatedAggregator
interface. The method aggregate_weights
is overwritten by calculating the weighted mean of the local parameters of each client.
For this purpose, we first weigh the local parameters by percentage and then sum the weighted parameters.
import numpy as np
from shfl.federated_aggregator.federated_aggregator import FederatedAggregator
class WeightedFedAvgAggregator(FederatedAggregator):
"""
Implementation of Weighted Federated Averaging Aggregator. The aggregation of the parameters is based in the number of data \
in every node.
"""
def aggregate_weights(self, clients_params):
clients_params_array = np.array(clients_params)
num_clients = clients_params_array.shape[0]
num_layers = clients_params_array.shape[1]
ponderated_weights = np.array([self._percentage[client] * clients_params_array[client, :] for client in range(num_clients)])
aggregated_weights = np.array([np.sum(ponderated_weights[:, layer], axis=0) for layer in range(num_layers)])
return aggregated_weights
weighted_fedavg_aggregator = WeightedFedAvgAggregator()
Finally, we are ready to establish the federated government with any of the implemented aggregation operators and start the federated learning process.
federated_government = shfl.federated_government.FederatedGovernment(model_builder, federated_data, fedavg_aggregator)
federated_government.run_rounds(1, test_data, test_labels)
Accuracy round 0
Test performance client <shfl.private.federated_operation.FederatedDataNode object at 0x149552650>: [30.405847549438477, 0.7931749820709229]
Test performance client <shfl.private.federated_operation.FederatedDataNode object at 0x149552890>: [40.98423385620117, 0.7247499823570251]
Test performance client <shfl.private.federated_operation.FederatedDataNode object at 0x1495529d0>: [40.2315788269043, 0.7540000081062317]
Test performance client <shfl.private.federated_operation.FederatedDataNode object at 0x149552b10>: [29.9063663482666, 0.8166999816894531]
Test performance client <shfl.private.federated_operation.FederatedDataNode object at 0x149552c50>: [28.207412719726562, 0.8036999702453613]
Global model test performance : [19.37423324584961, 0.7795500159263611]
Cluster Federated Averaging (ClusterFedAvg
) Operator
In this section, we detail the implementation of ClusterFedAvg
(see ClusterFedAvg).
Cluster Federated Averaging Model is based on the aggregation operator used for k-means clustering. When aggregating the centroids of a federated K-means clustering, we are faced with the problem of grouping the clusters for subsequent aggregation. Based on the hypothesis that the closest centroids will belong to the same cluster, we apply K-Means over the centroids, in order to group the centroids that belong to the same cluster and to obtain the representation (aggregation) of each group. We choose the new centroids obtained as the aggregation.
To implement it, we create a class that implements the FederatedAggregator
interface. The method aggregate_weights
is overwritten by applying K-means to the clients' centroids.
from shfl.federated_aggregator.federated_aggregator import FederatedAggregator
import numpy as np
from sklearn.cluster import KMeans
class ClusterFedAvgAggregator(FederatedAggregator):
"""
Implementation of Cluster Average Federated Aggregator.
It adds another k-means to find the minimum distance of cluster centroids coming from each node.
"""
def aggregate_weights(self, clients_params):
clients_params_array = np.concatenate((clients_params))
n_clusters = clients_params[0].shape[0]
model_aggregator = KMeans(n_clusters=n_clusters, init='k-means++')
model_aggregator.fit(clients_params_array)
aggregated_weights = np.array(model_aggregator.cluster_centers_)
return aggregated_weights
We create a federated government of clustering, in order to apply this aggregation operator.
c_database = shfl.data_base.Iris()
c_train_data, c_train_labels, c_test_data, c_test_labels = c_database.load_data()
c_iid_distribution = shfl.data_distribution.IidDataDistribution(c_database)
c_federated_data, c_test_data, c_test_labels = c_iid_distribution.get_federated_data(num_nodes=3, percent=50)
n_clusters = 3 # Set number of clusters
n_features = train_data.shape[1]
def clustering_model_builder():
model = shfl.model.KMeansModel(n_clusters=n_clusters, n_features = n_features)
return model
clustering_aggregator = ClusterFedAvgAggregator()
clustering_federated_government = shfl.federated_government.FederatedGovernment(clustering_model_builder, c_federated_data, clustering_aggregator)
clustering_federated_government.run_rounds(1, c_test_data, c_test_labels)
Accuracy round 0
Test performance client <shfl.private.federated_operation.FederatedDataNode object at 0x15152a210>: (0.7343645479042842, 0.7063271351615994, 0.7200730223994161, 0.5325418698325207)
Test performance client <shfl.private.federated_operation.FederatedDataNode object at 0x15152a290>: (0.7343645479042842, 0.7063271351615994, 0.7200730223994161, 0.5325418698325207)
Test performance client <shfl.private.federated_operation.FederatedDataNode object at 0x15152a2d0>: (0.8563305124226227, 0.8236365430725703, 0.8396653978091654, 0.7996608013567946)
Global model test performance : (0.7343645479042842, 0.7063271351615994, 0.7200730223994161, 0.5325418698325207)