# Aggregation Operators

In this notebook, we provide an explanation of the implementation of the different federated aggregation operators provided in the platform. Before discussing the different aggregation operators, we must establish the federated configuration (for more information see the A Simple Experiment) notebook.

import matplotlib.pyplot as plt
import shfl
import tensorflow as tf
import numpy as np

database = shfl.data_base.Emnist()
train_data, train_labels, test_data, test_labels = database.load_data()

federated_data, test_data, test_labels = iid_distribution.get_federated_data(num_nodes=5, percent=10)

def model_builder():
model = tf.keras.models.Sequential()

model.compile(optimizer="rmsprop", loss="categorical_crossentropy", metrics=["accuracy"])

return shfl.model.DeepLearningModel(model)

class Reshape(shfl.private.FederatedTransformation):

def apply(self, labeled_data):
labeled_data.data = np.reshape(labeled_data.data, (labeled_data.data.shape[0], labeled_data.data.shape[1], labeled_data.data.shape[2],1))

shfl.private.federated_operation.apply_federated_transformation(federated_data, Reshape())

class Normalize(shfl.private.FederatedTransformation):

def __init__(self, mean, std):
self.__mean = mean
self.__std = std

def apply(self, labeled_data):
labeled_data.data = (labeled_data.data - self.__mean)/self.__std

mean = np.mean(train_data.data)
std = np.std(train_data.data)
shfl.private.federated_operation.apply_federated_transformation(federated_data, Normalize(mean, std))

test_data = np.reshape(test_data, (test_data.shape[0], test_data.shape[1], test_data.shape[2],1))

Once we have loaded and federated the data and established the learning model, the only step that remains is to establish the aggregation operator. At the moment, the framework has FedAvg and WeightedFedAvg implemented. The implementation of the federated aggregation operators are as follows.

## Federated Averaging (FedAvg) Operator

In this section, we detail the implementation of FedAvg (see FedAvg) proposed by Google in this paper.

It is based on the arithmetic mean of the local weights $W_i$ trained in each of the local clients $C_i$. That is, the weights $W$ of the global model after each round of training are

$W = \frac{1}{n_{\rm{C}}} \sum_{i=1}^{n_{\rm{C}}} W_i$

For its implementation, we create a class that implements the FederatedAggregator interface. The method aggregate_weights is overwritten by calculating the mean of the local weights of each client.

import numpy as np

from shfl.federated_aggregator.federated_aggregator import FederatedAggregator

class FedAvgAggregator(FederatedAggregator):
"""
Implementation of Federated Averaging Aggregator. It only uses a simple average of the parameters of all the models
"""

def aggregate_weights(self, clients_params):
clients_params_array = np.array(clients_params)

num_clients = clients_params_array.shape[0]
num_layers = clients_params_array.shape[1]

aggregated_weights = np.array([np.mean(clients_params_array[:, layer], axis=0) for layer in range(num_layers)])

return aggregated_weights

fedavg_aggregator = FedAvgAggregator()

## Weighted Federated Averaging (WeightedFedAvg) Operator

In this section, we detail the implementation of WeightedFedAvg (see WeightedFedAvg). It is the weighted version of FedAvg. The weight of each client $C_i$ is determined by the amount of client data $n_i$ with respect to total training data $n$. That is, the parameters $W$ of the global model after each round of training are:

$W = \sum_{i=1}^n \frac{n_i}{n} W_i$

When all clients have the same amount of data, it is equivalent to FedAvg.

To implement it, we create a class that implements the FederatedAggregator interface. The method aggregate_weights is overwritten by calculating the weighted mean of the local parameters of each client. For this purpose, we first weigh the local parameters by percentage and then sum the weighted parameters.

import numpy as np

from shfl.federated_aggregator.federated_aggregator import FederatedAggregator

class WeightedFedAvgAggregator(FederatedAggregator):
"""
Implementation of Weighted Federated Averaging Aggregator. The aggregation of the parameters is based in the number of data \
in every node.
"""

def aggregate_weights(self, clients_params):
clients_params_array = np.array(clients_params)

num_clients = clients_params_array.shape[0]
num_layers = clients_params_array.shape[1]

ponderated_weights = np.array([self._percentage[client] * clients_params_array[client, :] for client in range(num_clients)])
aggregated_weights = np.array([np.sum(ponderated_weights[:, layer], axis=0) for layer in range(num_layers)])

return aggregated_weights

weighted_fedavg_aggregator = WeightedFedAvgAggregator()

Finally, we are ready to establish the federated government with any of the implemented aggregation operators and start the federated learning process.

federated_government = shfl.federated_government.FederatedGovernment(model_builder, federated_data, fedavg_aggregator)
federated_government.run_rounds(1, test_data, test_labels)
Accuracy round 0
Test performance client <shfl.private.federated_operation.FederatedDataNode object at 0x149552650>: [30.405847549438477, 0.7931749820709229]
Test performance client <shfl.private.federated_operation.FederatedDataNode object at 0x149552890>: [40.98423385620117, 0.7247499823570251]
Test performance client <shfl.private.federated_operation.FederatedDataNode object at 0x1495529d0>: [40.2315788269043, 0.7540000081062317]
Test performance client <shfl.private.federated_operation.FederatedDataNode object at 0x149552b10>: [29.9063663482666, 0.8166999816894531]
Test performance client <shfl.private.federated_operation.FederatedDataNode object at 0x149552c50>: [28.207412719726562, 0.8036999702453613]
Global model test performance : [19.37423324584961, 0.7795500159263611]

## Cluster Federated Averaging (ClusterFedAvg) Operator

In this section, we detail the implementation of ClusterFedAvg (see ClusterFedAvg).

Cluster Federated Averaging Model is based on the aggregation operator used for k-means clustering. When aggregating the centroids of a federated K-means clustering, we are faced with the problem of grouping the clusters for subsequent aggregation. Based on the hypothesis that the closest centroids will belong to the same cluster, we apply K-Means over the centroids, in order to group the centroids that belong to the same cluster and to obtain the representation (aggregation) of each group. We choose the new centroids obtained as the aggregation.

To implement it, we create a class that implements the FederatedAggregator interface. The method aggregate_weights is overwritten by applying K-means to the clients' centroids.

from shfl.federated_aggregator.federated_aggregator import FederatedAggregator
import numpy as np
from sklearn.cluster import KMeans

class ClusterFedAvgAggregator(FederatedAggregator):
"""
Implementation of Cluster Average Federated Aggregator.
It adds another k-means to find the minimum distance of cluster centroids coming from each node.
"""

def aggregate_weights(self, clients_params):
clients_params_array = np.concatenate((clients_params))

n_clusters = clients_params[0].shape[0]
model_aggregator = KMeans(n_clusters=n_clusters, init='k-means++')
model_aggregator.fit(clients_params_array)
aggregated_weights = np.array(model_aggregator.cluster_centers_)
return aggregated_weights

We create a federated government of clustering, in order to apply this aggregation operator.

c_database = shfl.data_base.Iris()
c_train_data, c_train_labels, c_test_data, c_test_labels = c_database.load_data()

c_federated_data, c_test_data, c_test_labels = c_iid_distribution.get_federated_data(num_nodes=3, percent=50)

n_clusters = 3 # Set number of clusters
n_features = train_data.shape[1]
def clustering_model_builder():
model = shfl.model.KMeansModel(n_clusters=n_clusters, n_features = n_features)
return model

clustering_aggregator = ClusterFedAvgAggregator()

clustering_federated_government = shfl.federated_government.FederatedGovernment(clustering_model_builder, c_federated_data, clustering_aggregator)
clustering_federated_government.run_rounds(1, c_test_data, c_test_labels)
Accuracy round 0
Test performance client <shfl.private.federated_operation.FederatedDataNode object at 0x15152a210>: (0.7343645479042842, 0.7063271351615994, 0.7200730223994161, 0.5325418698325207)
Test performance client <shfl.private.federated_operation.FederatedDataNode object at 0x15152a290>: (0.7343645479042842, 0.7063271351615994, 0.7200730223994161, 0.5325418698325207)
Test performance client <shfl.private.federated_operation.FederatedDataNode object at 0x15152a2d0>: (0.8563305124226227, 0.8236365430725703, 0.8396653978091654, 0.7996608013567946)
Global model test performance : (0.7343645479042842, 0.7063271351615994, 0.7200730223994161, 0.5325418698325207)