Federated learning: aggregation operators

In this notebook, we provide an explanation of the implementation of the different federated aggregation operators provided in the framework. Before discussing the different aggregation operators, we must establish the federated configuration (for more information see notebook Federated learning basic concepts).

import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf

import shfl


database = shfl.data_base.Emnist()
train_data, train_labels, test_data, test_labels = database.load_data()

iid_distribution = shfl.data_distribution.IidDataDistribution(database)
nodes_federation, test_data, test_labels = iid_distribution.get_nodes_federation(num_nodes=5, percent=10)

def model_builder():
    model = tf.keras.models.Sequential()
    model.add(tf.keras.layers.Conv2D(32, kernel_size=(3, 3), padding='same', activation='relu', strides=1, input_shape=(28, 28, 1)))
    model.add(tf.keras.layers.MaxPooling2D(pool_size=2, strides=2, padding='valid'))
    model.add(tf.keras.layers.Dropout(0.4))
    model.add(tf.keras.layers.Conv2D(32, kernel_size=(3, 3), padding='same', activation='relu', strides=1))
    model.add(tf.keras.layers.MaxPooling2D(pool_size=2, strides=2, padding='valid'))
    model.add(tf.keras.layers.Dropout(0.3))
    model.add(tf.keras.layers.Flatten())
    model.add(tf.keras.layers.Dense(128, activation='relu'))
    model.add(tf.keras.layers.Dropout(0.1))
    model.add(tf.keras.layers.Dense(64, activation='relu'))
    model.add(tf.keras.layers.Dense(10, activation='softmax'))

    loss = tf.keras.losses.CategoricalCrossentropy()
    optimizer = tf.keras.optimizers.RMSprop()
    metrics = [tf.keras.metrics.categorical_accuracy]

    return shfl.model.DeepLearningModel(model=model, loss=loss, optimizer=optimizer, metrics=metrics)


def reshape_data(labeled_data):
    labeled_data.data = np.reshape(labeled_data.data, (labeled_data.data.shape[0], labeled_data.data.shape[1], labeled_data.data.shape[2],1))

def normalize_data(data, mean, std):
    data.data = (data.data - mean) / std

nodes_federation.apply_data_transformation(reshape_data)

mean = np.mean(train_data.data)
std = np.std(train_data.data)
nodes_federation.apply_data_transformation(normalize_data, mean=mean, std=std);

test_data = np.reshape(test_data, (test_data.shape[0], test_data.shape[1], test_data.shape[2],1))

Once we have loaded and federated the data and established the learning model, the only step that remains is to establish the aggregation operator. At the moment, the framework has FedAvg and WeightedFedAvg implemented. The implementation of the federated aggregation operators are as follows.

NOTE: The aggregators are required to be callable. That is, we can use a simple function. Alternatively, if a more sophisticated implementation is needed in a class, this has simply to implement the method __call__ (see code in following links for examples).

Federated averaging operator

In this section, we detail the implementation of FedAvg (see FedAvg) proposed by Google in this paper.

It is based on the arithmetic mean of the local weights WiW_i trained in each of the local clients CiC_i. That is, the weights WW of the global model after each round of training are

W=1nCi=1nCWiW = \frac{1}{n_{\rm{C}}} \sum_{i=1}^{n_{\rm{C}}} W_i

For its implementation, we create a class that implements the FederatedAggregator interface.

from shfl.federated_aggregator.fedavg_aggregator import FedAvgAggregator


fedavg_aggregator = FedAvgAggregator()

Weighted federated averaging operator

In this section, we detail the implementation of WeightedFedAvg (see WeightedFedAvg). It is the weighted version of FedAvg. The weight of each client CiC_i is determined by the amount of client data nin_i with respect to total training data nn. That is, the parameters WW of the global model after each round of training are:

W=i=1nninWiW = \sum_{i=1}^n \frac{n_i}{n} W_i

When all clients have the same amount of data, it is equivalent to FedAvg.

To implement it, we create a class that implements the FederatedAggregator interface. For this purpose, we first weigh the local parameters by percentage and then sum the weighted parameters.

from shfl.federated_aggregator.weighted_fedavg_aggregator import WeightedFedAggregator


weighted_fedavg_aggregator = WeightedFedAggregator(30)

Finally, we are ready to establish the federated government with any of the implemented aggregation operators and start the federated learning process.

federated_government = shfl.federated_government.FederatedGovernment(model_builder(), nodes_federation, fedavg_aggregator)
2022-04-26 15:25:23.023614: I tensorflow/core/platform/cpu_feature_guard.cc:151] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2022-04-26 15:25:23.302655: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1525] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 2138 MB memory:  -> device: 0, name: NVIDIA GeForce RTX 3050 Laptop GPU, pci bus id: 0000:01:00.0, compute capability: 8.6
federated_government.run_rounds(5, test_data, test_labels)
2022-04-26 15:25:25.222219: I tensorflow/stream_executor/cuda/cuda_dnn.cc:368] Loaded cuDNN version 8303
2022-04-26 15:25:26.098535: I tensorflow/stream_executor/cuda/cuda_blas.cc:1786] TensorFloat-32 will be used for the matrix multiplication. This will only be logged once.


Evaluation in round 0:

Collaborative model test -> loss: 10.470479965209961  categorical_accuracy: 0.8862749934196472

Evaluation in round 1:

Collaborative model test -> loss: 11.364419937133789  categorical_accuracy: 0.9259250164031982

Evaluation in round 2:

Collaborative model test -> loss: 11.860504150390625  categorical_accuracy: 0.9331750273704529

Evaluation in round 3:

Collaborative model test -> loss: 11.764689445495605  categorical_accuracy: 0.9387750029563904

Evaluation in round 4:

Collaborative model test -> loss: 11.562532424926758  categorical_accuracy: 0.9453999996185303

Cluster federated averaging operator

In this section, we detail the implementation of ClusterFedAvg (see ClusterFedAvg).

Cluster Federated Averaging is based on the aggregation operator used for k-means clustering. When aggregating the centroids of a federated K-means clustering, we are faced with the problem of grouping the clusters for subsequent aggregation. Based on the hypothesis that the closest centroids will belong to the same cluster, we apply K-means over the centroids, in order to group the centroids that belong to the same cluster and to obtain the representation (aggregation) of each group. We choose the new centroids obtained as the aggregation.

This time, instead of a class, we implement a function (remember, the aggregator simply needs to be callable):

from shfl.federated_aggregator.cluster_fedavg_aggregator import cluster_fed_avg_aggregator

clustering_aggregator = cluster_fed_avg_aggregator

We create a federated government of clustering, in order to apply this aggregation operator.

c_database = shfl.data_base.Iris()
c_train_data, c_train_labels, c_test_data, c_test_labels = c_database.load_data()

c_iid_distribution = shfl.data_distribution.IidDataDistribution(c_database)
c_nodes_federation, c_test_data, c_test_labels = c_iid_distribution.get_nodes_federation(num_nodes=3, percent=50)

n_clusters = 3 # Set number of clusters
n_features = train_data.shape[1]
def clustering_model_builder():
    model = shfl.model.KMeansModel(n_clusters=n_clusters, n_features = n_features)
    return model

clustering_federated_government = shfl.federated_government.FederatedGovernment(clustering_model_builder(), c_nodes_federation, clustering_aggregator)
clustering_federated_government.run_rounds(5, c_test_data, c_test_labels)
Evaluation in round 0:

Collaborative model test -> homogeneity_score: 0.7402314858536988  completeness_score: 0.7244113753984411  v_measure_score: 0.7322359914034998  adjusted_rand_score: 0.5956497490239822

Evaluation in round 1:

Collaborative model test -> homogeneity_score: 0.6412965771037632  completeness_score: 0.6259364201792915  v_measure_score: 0.6335234082543412  adjusted_rand_score: 0.5191468565643872

Evaluation in round 2:

Collaborative model test -> homogeneity_score: 0.6412965771037632  completeness_score: 0.6259364201792915  v_measure_score: 0.6335234082543412  adjusted_rand_score: 0.5191468565643872

Evaluation in round 3:

Collaborative model test -> homogeneity_score: 0.6412965771037632  completeness_score: 0.6259364201792915  v_measure_score: 0.6335234082543412  adjusted_rand_score: 0.5191468565643872

Evaluation in round 4:

Collaborative model test -> homogeneity_score: 0.6412965771037632  completeness_score: 0.6259364201792916  v_measure_score: 0.6335234082543413  adjusted_rand_score: 0.5191468565643872
;