Federated learning: using a TensorFlow model

In this notebook we provide a simple example of how to perform an experiment in a federated environment with a TensorFlow model using custom layers on the network, with the help of Sherpa.ai Federated Learning framework. To set up the federated learning experiment we will show the simple steps for loading the dataset and distribute it to a federated network of clients, and we will define the model that will be trained in the federated learning rounds.

The data

We are going to use a popular dataset: the framework provides some functions to load the Emnist digits dataset.

import matplotlib.pyplot as plt
import numpy as np
import shfl
from shfl.auxiliar_functions_for_notebooks.functionsFL import *
import tensorflow as tf

database = shfl.data_base.Emnist()
train_data, train_labels, test_data, test_labels = database.load_data()
2022-03-21 10:08:43.470736: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2022-03-21 10:08:43.470754: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.

Let's inspect some properties of the loaded data.

print(len(train_data))
print(len(test_data))
print(type(train_data[0]))
train_data[0].shape
240000
40000
<class 'numpy.ndarray'>





(28, 28)

So, as we have seen, our dataset is composed of a set of matrices that are 28 by 28. Before starting with the federated scenario, we can take a look at a sample of the training data.

plt.imshow(train_data[0])
<matplotlib.image.AxesImage at 0x7fb7557d7760>

png

We are going to simulate a federated learning scenario with a set of client nodes containing private data, and a central server that will be responsible for coordinating the different clients. But, first of all, we have to simulate the data contained in every client. In order to do that, we are going to use the previously loaded dataset. The assumption in this example is that the data is distributed as a set of independent and identically distributed random variables, with every node having approximately the same amount of data.

iid_distribution = shfl.data_distribution.IidDataDistribution(database)
nodes_federation, test_data, test_label = iid_distribution.get_nodes_federation(num_nodes=20, percent=10)

That's it! We have created federated data from the Emnist dataset using 20 nodes and 10 percent of the available data. This data is distributed to a set of data nodes in the form of private data.

The model

A federated learning algorithm is defined by a machine learning model, locally deployed in each node, that learns from the respective node's private data. Then, an aggregating mechanism aggregates the different model parameters uploaded by the client nodes to later redistribute them to the nodes again to continue the training, in a loop.

In this example, we will use a deep learning model using TensorFlow to build it. The framework provides classes on using TensorFlow custom models in a federated learning scenario, your only job is to create a function acting as model builder. Moreover, the framework allows introducing user defined layers into the model, adding more customization possibilities. In this example, we are defining a Dense layer and then using it on our model.

#If you want execute in GPU, you must uncomment this two lines.
# physical_devices = tf.config.experimental.list_physical_devices('GPU')
# tf.config.experimental.set_memory_growth(physical_devices[0], True)

class CustomDense(tf.keras.layers.Layer):
    """
    Implementation of Linear layer

    Attributes
    ----------
    units : int
        number of units for the output
    w : matrix
        Weights from the layer
    b : array
        Bias from the layer
    """

    def __init__(self, units=32, **kwargs):
        super(CustomDense, self).__init__(**kwargs)
        self._units = units
        
    def get_config(self):
        config = {'units': self._units}
        base_config = super(CustomDense, self).get_config()

        return dict(list(base_config.items()) + list(config.items()))

    def build(self, input_shape):
        """
        Method for build the params

        Parameters
        ----------
        input_shape: list
            size of inputs
        """
        self._w = self.add_weight(shape=(input_shape[-1], self._units),
                                  initializer='random_normal',
                                  trainable=True)

        self._b = self.add_weight(shape=(self._units,),
                                  initializer='random_normal',
                                  trainable=True)

    def call(self, inputs):
        """
        Apply linear layer

        Parameters
        ----------
        inputs: matrix
            Input data

        Return
        ------
        result : matrix
            the result of linear transformation of the data
        """
        return tf.nn.bias_add(tf.matmul(inputs, self._w), self._b)


def model_builder():
    inputs = tf.keras.Input(shape=(28, 28, 1))
    x = tf.keras.layers.Conv2D(32, kernel_size=(3, 3), padding='same', activation='relu', strides=1)(inputs)
    x = tf.keras.layers.MaxPooling2D(pool_size=2, strides=2, padding='valid')(x)
    x = tf.keras.layers.Dropout(0.4)(x)
    x = tf.keras.layers.Conv2D(32, kernel_size=(3, 3), padding='same', activation='relu', strides=1)(x)
    x = tf.keras.layers.MaxPooling2D(pool_size=2, strides=2, padding='valid')(x)
    x = tf.keras.layers.Flatten()(x)
    x = CustomDense(128)(x)
    x = tf.nn.relu(x)
    x = tf.keras.layers.Dropout(0.1)(x)
    x = CustomDense(64)(x)
    x = tf.nn.relu(x)
    x = CustomDense(10)(x)
    outputs = tf.nn.softmax(x)
    
    model = tf.keras.Model(inputs=inputs, outputs=outputs)

    loss = tf.keras.losses.CategoricalCrossentropy()
    optimizer = tf.keras.optimizers.RMSprop()
    metrics = [tf.keras.metrics.categorical_accuracy]
    
    return shfl.model.DeepLearningModel(model=model, loss=loss, optimizer=optimizer, metrics=metrics)

Now, the only missing piece is the aggregation operator. Nevertheless, the framework provides some aggregation operators that we can use. In the following piece of code, we define the federated aggregation mechanism. Moreover, we define the federated government based on the TensorFlow model, the federated data, and the aggregation mechanism.

aggregator = shfl.federated_aggregator.FedAvgAggregator()
federated_government = shfl.federated_government.FederatedGovernment(model_builder(), nodes_federation, aggregator)
2022-03-21 10:08:46.682641: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory
2022-03-21 10:08:46.682670: W tensorflow/stream_executor/cuda/cuda_driver.cc:269] failed call to cuInit: UNKNOWN ERROR (303)
2022-03-21 10:08:46.682693: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (SH-083-WS): /proc/driver/nvidia/version does not exist
2022-03-21 10:08:46.682916: I tensorflow/core/platform/cpu_feature_guard.cc:151] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.

Before running the algorithm, we want to apply a transformation to the data. A good practice is to define a federated operation that will ensure that the transformation is applied to the federated data in all the client nodes. We want to reshape the data, so we define the following Federated Transformation.

nodes_federation.apply_data_transformation(reshape_data_tf);
nodes_federation.apply_data_transformation(cast_to_float);

In addition, we want to normalize the data, as it is a good practice in order to make the model converge faster. We define a federated transformation using mean and standard deviation (std) parameters. We use the mean and std estimated from the training set in this example. Although the ideal parameters would be an aggregation of the mean and std of each client's training datasets, we use the mean and std of the global dataset as a simple approximation for this experiment.

mean = np.mean(train_data.data)
std = np.std(train_data.data)
nodes_federation.apply_data_transformation(normalize_data, mean=mean, std=std);

Run the federated learning experiment

test_data = np.reshape(test_data, (test_data.shape[0], test_data.shape[1], test_data.shape[2],1))
test_data = test_data.astype(np.float32)
test_data = (test_data - mean) / std
federated_government.run_rounds(3, test_data, test_label)
Evaluation in round 0:

Collaborative model test -> Loss: 0.9988124966621399, Accuracy: 0.7898250222206116
========================


Evaluation in round 1:

Collaborative model test -> Loss: 0.4348503053188324, Accuracy: 0.8767499923706055
========================


Evaluation in round 2:

Collaborative model test -> Loss: 0.3360821008682251, Accuracy: 0.9017000198364258
========================
;