Pribatutasunaren Adimen Artifiziala

Ikusi GitHub-en

Aggregation Operators

In this notebook, we provide an explanation of the implementation of the different federated aggregation operators provided in the platform. Before discussing the different aggregation operators, we must establish the federated configuration (for more information see the A Simple Experiment) notebook.

import matplotlib.pyplot as plt
import shfl
import tensorflow as tf
import numpy as np


database = shfl.data_base.Emnist()
train_data, train_labels, test_data, test_labels = database.load_data()

iid_distribution = shfl.data_distribution.IidDataDistribution(database)
federated_data, test_data, test_labels = iid_distribution.get_federated_data(num_nodes=5, percent=10)

def model_builder():
    model = tf.keras.models.Sequential()
    model.add(tf.keras.layers.Conv2D(32, kernel_size=(3, 3), padding='same', activation='relu', strides=1, input_shape=(28, 28, 1)))
    model.add(tf.keras.layers.MaxPooling2D(pool_size=2, strides=2, padding='valid'))
    model.add(tf.keras.layers.Dropout(0.4))
    model.add(tf.keras.layers.Conv2D(32, kernel_size=(3, 3), padding='same', activation='relu', strides=1))
    model.add(tf.keras.layers.MaxPooling2D(pool_size=2, strides=2, padding='valid'))
    model.add(tf.keras.layers.Dropout(0.3))
    model.add(tf.keras.layers.Flatten())
    model.add(tf.keras.layers.Dense(128, activation='relu'))
    model.add(tf.keras.layers.Dropout(0.1))
    model.add(tf.keras.layers.Dense(64, activation='relu'))
    model.add(tf.keras.layers.Dense(10, activation='softmax'))

    model.compile(optimizer="rmsprop", loss="categorical_crossentropy", metrics=["accuracy"])

    return shfl.model.DeepLearningModel(model)


class Reshape(shfl.private.FederatedTransformation):

    def apply(self, labeled_data):
        labeled_data.data = np.reshape(labeled_data.data, (labeled_data.data.shape[0], labeled_data.data.shape[1], labeled_data.data.shape[2],1))

shfl.private.federated_operation.apply_federated_transformation(federated_data, Reshape())

class Normalize(shfl.private.FederatedTransformation):

    def __init__(self, mean, std):
        self.__mean = mean
        self.__std = std

    def apply(self, labeled_data):
        labeled_data.data = (labeled_data.data - self.__mean)/self.__std


mean = np.mean(train_data.data)
std = np.std(train_data.data)
shfl.private.federated_operation.apply_federated_transformation(federated_data, Normalize(mean, std))

test_data = np.reshape(test_data, (test_data.shape[0], test_data.shape[1], test_data.shape[2],1))

Once we have loaded and federated the data and established the learning model, the only step that remains is to establish the aggregation operator. At the moment, the framework has FedAvg and WeightedFedAvg implemented. The implementation of the federated aggregation operators are as follows.

Federated Averaging (FedAvg) Operator

In this section, we detail the implementation of FedAvg (see FedAvg) proposed by Google in this paper.

It is based on the arithmetic mean of the local weights WiW_i trained in each of the local clients CiC_i. That is, the weights WW of the global model after each round of training are

W=1nCi=1nCWiW = \frac{1}{n_{\rm{C}}} \sum_{i=1}^{n_{\rm{C}}} W_i

For its implementation, we create a class that implements the FederatedAggregator interface. The method aggregate_weights is overwritten by calculating the mean of the local weights of each client.

import numpy as np

from shfl.federated_aggregator.federated_aggregator import FederatedAggregator


class FedAvgAggregator(FederatedAggregator):
    """
    Implementation of Federated Averaging Aggregator. It only uses a simple average of the parameters of all the models
    """

    def aggregate_weights(self, clients_params):
        clients_params_array = np.array(clients_params)

        num_clients = clients_params_array.shape[0]
        num_layers = clients_params_array.shape[1]

        aggregated_weights = np.array([np.mean(clients_params_array[:, layer], axis=0) for layer in range(num_layers)])

        return aggregated_weights


fedavg_aggregator = FedAvgAggregator()

Weighted Federated Averaging (WeightedFedAvg) Operator

In this section, we detail the implementation of WeightedFedAvg (see WeightedFedAvg). It is the weighted version of FedAvg. The weight of each client CiC_i is determined by the amount of client data nin_i with respect to total training data nn. That is, the parameters WW of the global model after each round of training are:

W=i=1nninWiW = \sum_{i=1}^n \frac{n_i}{n} W_i

When all clients have the same amount of data, it is equivalent to FedAvg.

To implement it, we create a class that implements the FederatedAggregator interface. The method aggregate_weights is overwritten by calculating the weighted mean of the local parameters of each client. For this purpose, we first weigh the local parameters by percentage and then sum the weighted parameters.

import numpy as np

from shfl.federated_aggregator.federated_aggregator import FederatedAggregator


class WeightedFedAvgAggregator(FederatedAggregator):
    """
    Implementation of Weighted Federated Averaging Aggregator. The aggregation of the parameters is based in the number of data \
    in every node.
    """

    def aggregate_weights(self, clients_params):
        clients_params_array = np.array(clients_params)

        num_clients = clients_params_array.shape[0]
        num_layers = clients_params_array.shape[1]

        ponderated_weights = np.array([self._percentage[client] * clients_params_array[client, :] for client in range(num_clients)])
        aggregated_weights = np.array([np.sum(ponderated_weights[:, layer], axis=0) for layer in range(num_layers)])

        return aggregated_weights

weighted_fedavg_aggregator = WeightedFedAvgAggregator()

Finally, we are ready to establish the federated government with any of the implemented aggregation operators and start the federated learning process.

federated_government = shfl.federated_government.FederatedGovernment(model_builder, federated_data, fedavg_aggregator)
federated_government.run_rounds(1, test_data, test_labels)
Accuracy round 0
Test performance client <shfl.private.federated_operation.FederatedDataNode object at 0x149552650>: [30.405847549438477, 0.7931749820709229]
Test performance client <shfl.private.federated_operation.FederatedDataNode object at 0x149552890>: [40.98423385620117, 0.7247499823570251]
Test performance client <shfl.private.federated_operation.FederatedDataNode object at 0x1495529d0>: [40.2315788269043, 0.7540000081062317]
Test performance client <shfl.private.federated_operation.FederatedDataNode object at 0x149552b10>: [29.9063663482666, 0.8166999816894531]
Test performance client <shfl.private.federated_operation.FederatedDataNode object at 0x149552c50>: [28.207412719726562, 0.8036999702453613]
Global model test performance : [19.37423324584961, 0.7795500159263611]

Cluster Federated Averaging (ClusterFedAvg) Operator

In this section, we detail the implementation of ClusterFedAvg (see ClusterFedAvg).

Cluster Federated Averaging Model is based on the aggregation operator used for k-means clustering. When aggregating the centroids of a federated K-means clustering, we are faced with the problem of grouping the clusters for subsequent aggregation. Based on the hypothesis that the closest centroids will belong to the same cluster, we apply K-Means over the centroids, in order to group the centroids that belong to the same cluster and to obtain the representation (aggregation) of each group. We choose the new centroids obtained as the aggregation.

To implement it, we create a class that implements the FederatedAggregator interface. The method aggregate_weights is overwritten by applying K-means to the clients' centroids.

from shfl.federated_aggregator.federated_aggregator import FederatedAggregator
import numpy as np
from sklearn.cluster import KMeans


class ClusterFedAvgAggregator(FederatedAggregator):
    """
    Implementation of Cluster Average Federated Aggregator.
    It adds another k-means to find the minimum distance of cluster centroids coming from each node.
    """

    def aggregate_weights(self, clients_params):
        clients_params_array = np.concatenate((clients_params))

        n_clusters = clients_params[0].shape[0]
        model_aggregator = KMeans(n_clusters=n_clusters, init='k-means++')
        model_aggregator.fit(clients_params_array)
        aggregated_weights = np.array(model_aggregator.cluster_centers_)
        return aggregated_weights

We create a federated government of clustering, in order to apply this aggregation operator.

c_database = shfl.data_base.Iris()
c_train_data, c_train_labels, c_test_data, c_test_labels = c_database.load_data()

c_iid_distribution = shfl.data_distribution.IidDataDistribution(c_database)
c_federated_data, c_test_data, c_test_labels = c_iid_distribution.get_federated_data(num_nodes=3, percent=50)

n_clusters = 3 # Set number of clusters
n_features = train_data.shape[1]
def clustering_model_builder():
    model = shfl.model.KMeansModel(n_clusters=n_clusters, n_features = n_features)
    return model

clustering_aggregator = ClusterFedAvgAggregator()

clustering_federated_government = shfl.federated_government.FederatedGovernment(clustering_model_builder, c_federated_data, clustering_aggregator)
clustering_federated_government.run_rounds(1, c_test_data, c_test_labels)
Accuracy round 0
Test performance client <shfl.private.federated_operation.FederatedDataNode object at 0x15152a210>: (0.7343645479042842, 0.7063271351615994, 0.7200730223994161, 0.5325418698325207)
Test performance client <shfl.private.federated_operation.FederatedDataNode object at 0x15152a290>: (0.7343645479042842, 0.7063271351615994, 0.7200730223994161, 0.5325418698325207)
Test performance client <shfl.private.federated_operation.FederatedDataNode object at 0x15152a2d0>: (0.8563305124226227, 0.8236365430725703, 0.8396653978091654, 0.7996608013567946)
Global model test performance : (0.7343645479042842, 0.7063271351615994, 0.7200730223994161, 0.5325418698325207)