diff --git a/openfl-tutorials/experimental/Workflow_Interface_101_MNIST.ipynb b/openfl-tutorials/experimental/Workflow_Interface_101_MNIST.ipynb index 283126666be..5f0022f4c7f 100644 --- a/openfl-tutorials/experimental/Workflow_Interface_101_MNIST.ipynb +++ b/openfl-tutorials/experimental/Workflow_Interface_101_MNIST.ipynb @@ -1,6 +1,7 @@ { "cells": [ { + "attachments": {}, "cell_type": "markdown", "id": "14821d97", "metadata": {}, @@ -10,6 +11,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "id": "bd059520", "metadata": {}, @@ -23,6 +25,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "id": "39c3d86a", "metadata": {}, @@ -31,6 +34,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "id": "a7989e72", "metadata": {}, @@ -39,6 +43,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "id": "fc8e35da", "metadata": {}, @@ -47,6 +52,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "id": "4dbb89b6", "metadata": {}, @@ -67,12 +73,13 @@ "!pip install torchvision\n", "\n", "# Uncomment this if running in Google Colab\n", - "#!pip install -r https://raw.githubusercontent.com/intel/openfl/develop/openfl-tutorials/experimental/requirements_workflow_interface.txt\n", - "#import os\n", - "#os.environ[\"USERNAME\"] = \"colab\"" + "# !pip install -r https://raw.githubusercontent.com/intel/openfl/develop/openfl-tutorials/experimental/requirements_workflow_interface.txt\n", + "# import os\n", + "# os.environ[\"USERNAME\"] = \"colab\"" ] }, { + "attachments": {}, "cell_type": "markdown", "id": "7237eac4", "metadata": {}, @@ -105,19 +112,29 @@ "torch.backends.cudnn.enabled = False\n", "torch.manual_seed(random_seed)\n", "\n", - "mnist_train = torchvision.datasets.MNIST('files/', train=True, download=True,\n", - " transform=torchvision.transforms.Compose([\n", - " torchvision.transforms.ToTensor(),\n", - " torchvision.transforms.Normalize(\n", - " (0.1307,), (0.3081,))\n", - " ]))\n", + "mnist_train = torchvision.datasets.MNIST(\n", + " \"./files/\",\n", + " train=True,\n", + " download=True,\n", + " transform=torchvision.transforms.Compose(\n", + " [\n", + " torchvision.transforms.ToTensor(),\n", + " torchvision.transforms.Normalize((0.1307,), (0.3081,)),\n", + " ]\n", + " ),\n", + ")\n", "\n", - "mnist_test = torchvision.datasets.MNIST('files/', train=False, download=True,\n", - " transform=torchvision.transforms.Compose([\n", - " torchvision.transforms.ToTensor(),\n", - " torchvision.transforms.Normalize(\n", - " (0.1307,), (0.3081,))\n", - " ]))\n", + "mnist_test = torchvision.datasets.MNIST(\n", + " \"./files/\",\n", + " train=False,\n", + " download=True,\n", + " transform=torchvision.transforms.Compose(\n", + " [\n", + " torchvision.transforms.ToTensor(),\n", + " torchvision.transforms.Normalize((0.1307,), (0.3081,)),\n", + " ]\n", + " ),\n", + ")\n", "\n", "class Net(nn.Module):\n", " def __init__(self):\n", @@ -156,6 +173,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "id": "cd268911", "metadata": {}, @@ -181,14 +199,13 @@ "from openfl.experimental.placement import aggregator, collaborator\n", "\n", "\n", - "def FedAvg(models, weights=None):\n", + "def FedAvg(models):\n", " new_model = models[0]\n", " state_dicts = [model.state_dict() for model in models]\n", " state_dict = new_model.state_dict()\n", " for key in models[1].state_dict():\n", - " state_dict[key] = torch.from_numpy(np.average([state[key].numpy() for state in state_dicts],\n", - " axis=0, \n", - " weights=weights))\n", + " state_dict[key] = np.sum(np.array([state[key]\n", + " for state in state_dicts], dtype=object), axis=0) / len(models)\n", " new_model.load_state_dict(state_dict)\n", " return new_model" ] @@ -219,7 +236,7 @@ "source": [ "class FederatedFlow(FLSpec):\n", "\n", - " def __init__(self, model = None, optimizer = None, rounds=3, **kwargs):\n", + " def __init__(self, model=None, optimizer=None, rounds=3, **kwargs):\n", " super().__init__(**kwargs)\n", " if model is not None:\n", " self.model = model\n", @@ -227,7 +244,7 @@ " else:\n", " self.model = Net()\n", " self.optimizer = optim.SGD(self.model.parameters(), lr=learning_rate,\n", - " momentum=momentum)\n", + " momentum=momentum)\n", " self.rounds = rounds\n", "\n", " @aggregator\n", @@ -236,12 +253,12 @@ " self.collaborators = self.runtime.collaborators\n", " self.private = 10\n", " self.current_round = 0\n", - " self.next(self.aggregated_model_validation,foreach='collaborators',exclude=['private'])\n", + " self.next(self.aggregated_model_validation, foreach='collaborators', exclude=['private'])\n", "\n", " @collaborator\n", " def aggregated_model_validation(self):\n", " print(f'Performing aggregated model validation for collaborator {self.input}')\n", - " self.agg_validation_score = inference(self.model,self.test_loader)\n", + " self.agg_validation_score = inference(self.model, self.test_loader)\n", " print(f'{self.input} value of {self.agg_validation_score}')\n", " self.next(self.train)\n", "\n", @@ -252,32 +269,35 @@ " momentum=momentum)\n", " train_losses = []\n", " for batch_idx, (data, target) in enumerate(self.train_loader):\n", - " self.optimizer.zero_grad()\n", - " output = self.model(data)\n", - " loss = F.nll_loss(output, target)\n", - " loss.backward()\n", - " self.optimizer.step()\n", - " if batch_idx % log_interval == 0:\n", - " print('Train Epoch: 1 [{}/{} ({:.0f}%)]\\tLoss: {:.6f}'.format(\n", - " batch_idx * len(data), len(self.train_loader.dataset),\n", - " 100. * batch_idx / len(self.train_loader), loss.item()))\n", - " self.loss = loss.item()\n", - " torch.save(self.model.state_dict(), 'model.pth')\n", - " torch.save(self.optimizer.state_dict(), 'optimizer.pth')\n", + " self.optimizer.zero_grad()\n", + " output = self.model(data)\n", + " loss = F.nll_loss(output, target)\n", + " loss.backward()\n", + " self.optimizer.step()\n", + " if batch_idx % log_interval == 0:\n", + " print('Train Epoch: 1 [{}/{} ({:.0f}%)]\\tLoss: {:.6f}'.format(\n", + " batch_idx * len(data), len(self.train_loader.dataset),\n", + " 100. * batch_idx / len(self.train_loader), loss.item()))\n", + " self.loss = loss.item()\n", + " torch.save(self.model.state_dict(), 'model.pth')\n", + " torch.save(self.optimizer.state_dict(), 'optimizer.pth')\n", " self.training_completed = True\n", " self.next(self.local_model_validation)\n", "\n", " @collaborator\n", " def local_model_validation(self):\n", - " self.local_validation_score = inference(self.model,self.test_loader)\n", - " print(f'Doing local model validation for collaborator {self.input}: {self.local_validation_score}')\n", + " self.local_validation_score = inference(self.model, self.test_loader)\n", + " print(\n", + " f'Doing local model validation for collaborator {self.input}: {self.local_validation_score}')\n", " self.next(self.join, exclude=['training_completed'])\n", "\n", " @aggregator\n", - " def join(self,inputs):\n", - " self.average_loss = sum(input.loss for input in inputs)/len(inputs)\n", - " self.aggregated_model_accuracy = sum(input.agg_validation_score for input in inputs)/len(inputs)\n", - " self.local_model_accuracy = sum(input.local_validation_score for input in inputs)/len(inputs)\n", + " def join(self, inputs):\n", + " self.average_loss = sum(input.loss for input in inputs) / len(inputs)\n", + " self.aggregated_model_accuracy = sum(\n", + " input.agg_validation_score for input in inputs) / len(inputs)\n", + " self.local_model_accuracy = sum(\n", + " input.local_validation_score for input in inputs) / len(inputs)\n", " print(f'Average aggregated model validation values = {self.aggregated_model_accuracy}')\n", " print(f'Average training loss = {self.average_loss}')\n", " print(f'Average local model validation values = {self.local_model_accuracy}')\n", @@ -285,23 +305,25 @@ " self.optimizer = [input.optimizer for input in inputs][0]\n", " self.current_round += 1\n", " if self.current_round < self.rounds:\n", - " self.next(self.aggregated_model_validation, foreach='collaborators', exclude=['private'])\n", + " self.next(self.aggregated_model_validation,\n", + " foreach='collaborators', exclude=['private'])\n", " else:\n", " self.next(self.end)\n", - " \n", + "\n", " @aggregator\n", " def end(self):\n", - " print(f'This is the end of the flow') " + " print(f'This is the end of the flow')" ] }, { + "attachments": {}, "cell_type": "markdown", "id": "2aabf61e", "metadata": {}, "source": [ - "You'll notice in the `FederatedFlow` definition above that there were certain attributes that the flow was not initialized with, namely the `train_loader` and `test_loader` for each of the collaborators. These are **private_attributes** that are exposed only throught he runtime. Each participant has it's own set of private attributes: a dictionary where the key is the attribute name, and the value is the object that will be made accessible through that participant's task. \n", + "Note that the private attributes are flexible, and you can choose to pass in a completely different type of object to any of the collaborators or aggregator (with an arbitrary name). These private attributes will always be filtered out of the current state when transferring from collaborator to aggregator, or vice versa. \n", "\n", - "Below, we segment shards of the MNIST dataset for **four collaborators**: Portland, Seattle, Chandler, and Portland. Each has their own slice of the dataset that's accessible via the `train_loader` or `test_loader` attribute. Note that the private attributes are flexible, and you can choose to pass in a completely different type of object to any of the collaborators or aggregator (with an arbitrary name). These private attributes will always be filtered out of the current state when transfering from collaborator to aggregator, or vice versa. " + "Private attributes can be set using callback function while instantiating the participant. Parameters required by the callback function are specified as arguments while instantiating the participant. In this example callback function, `callable_to_initialize_collaborator_private_attributes`, returns the private attributes `train_loader` and `test_loader` of the collaborator. Parameters required by the callback function `index`, `n_collaborators`, `batch_size`, `train_dataset`, `test_dataset` are passed appropriate values with the same names in the Collaborator constructor." ] }, { @@ -311,30 +333,43 @@ "metadata": {}, "outputs": [], "source": [ - "# Setup participants\n", - "aggregator = Aggregator()\n", - "aggregator.private_attributes = {}\n", + "# Aggregator\n", + "aggregator_ = Aggregator()\n", + "\n", + "collaborator_names = [\"Portland\", \"Seattle\", \"Chandler\", \"Bangalore\"]\n", "\n", - "# Setup collaborators with private attributes\n", - "collaborator_names = ['Portland', 'Seattle', 'Chandler','Bangalore']\n", - "collaborators = [Collaborator(name=name) for name in collaborator_names]\n", - "for idx, collaborator in enumerate(collaborators):\n", - " local_train = deepcopy(mnist_train)\n", - " local_test = deepcopy(mnist_test)\n", - " local_train.data = mnist_train.data[idx::len(collaborators)]\n", - " local_train.targets = mnist_train.targets[idx::len(collaborators)]\n", - " local_test.data = mnist_test.data[idx::len(collaborators)]\n", - " local_test.targets = mnist_test.targets[idx::len(collaborators)]\n", - " collaborator.private_attributes = {\n", - " 'train_loader': torch.utils.data.DataLoader(local_train,batch_size=batch_size_train, shuffle=True),\n", - " 'test_loader': torch.utils.data.DataLoader(local_test,batch_size=batch_size_train, shuffle=True)\n", + "def callable_to_initialize_collaborator_private_attributes(index, n_collaborators, batch_size, train_dataset, test_dataset):\n", + " train = deepcopy(train_dataset)\n", + " test = deepcopy(test_dataset)\n", + " train.data = train_dataset.data[index::n_collaborators]\n", + " train.targets = train_dataset.targets[index::n_collaborators]\n", + " test.data = test_dataset.data[index::n_collaborators]\n", + " test.targets = test_dataset.targets[index::n_collaborators]\n", + "\n", + " return {\n", + " \"train_loader\": torch.utils.data.DataLoader(train, batch_size=batch_size, shuffle=True),\n", + " \"test_loader\": torch.utils.data.DataLoader(test, batch_size=batch_size, shuffle=True),\n", " }\n", "\n", - "local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators, backend='single_process')\n", + "# Setup collaborators private attributes via callable function\n", + "collaborators = []\n", + "for idx, collaborator_name in enumerate(collaborator_names):\n", + " collaborators.append(\n", + " Collaborator(\n", + " name=collaborator_name, num_cpus=0, num_gpus=0,\n", + " private_attributes_callable=callable_to_initialize_collaborator_private_attributes,\n", + " index=idx, n_collaborators=len(collaborator_names),\n", + " train_dataset=mnist_train, test_dataset=mnist_test, batch_size=64\n", + " )\n", + " )\n", + "\n", + "local_runtime = LocalRuntime(aggregator=aggregator_, collaborators=collaborators,\n", + " backend=\"ray\")\n", "print(f'Local runtime collaborators = {local_runtime.collaborators}')" ] }, { + "attachments": {}, "cell_type": "markdown", "id": "278ad46b", "metadata": {}, @@ -345,24 +380,25 @@ { "cell_type": "code", "execution_count": null, - "id": "16937a65", + "id": "a175b4d6", "metadata": {}, "outputs": [], "source": [ "model = None\n", "best_model = None\n", "optimizer = None\n", - "flflow = FederatedFlow(model,optimizer)\n", + "flflow = FederatedFlow(model, optimizer, checkpoint=True)\n", "flflow.runtime = local_runtime\n", "flflow.run()" ] }, { + "attachments": {}, "cell_type": "markdown", - "id": "c32e0844", + "id": "86b3dd2e", "metadata": {}, "source": [ - "Now that the flow has completed, let's get the final model and accuracy:" + "Now that the flow has completed, let's get the final model and accuracy" ] }, { @@ -378,6 +414,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "id": "5dd1558c", "metadata": {}, @@ -394,12 +431,13 @@ "metadata": {}, "outputs": [], "source": [ - "flflow2 = FederatedFlow(model=flflow.model,optimizer=flflow.optimizer,checkpoint=True)\n", + "flflow2 = FederatedFlow(model=flflow.model, optimizer=flflow.optimizer, checkpoint=True)\n", "flflow2.runtime = local_runtime\n", "flflow2.run()" ] }, { + "attachments": {}, "cell_type": "markdown", "id": "a61a876d", "metadata": {}, @@ -449,6 +487,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "id": "b55ccb19", "metadata": {}, @@ -477,6 +516,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "id": "e5efa1ff", "metadata": {}, @@ -495,6 +535,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "id": "3292b2e0", "metadata": {}, @@ -533,6 +574,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "id": "eb1866b7", "metadata": {}, @@ -561,6 +603,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "id": "ef877a50", "metadata": {}, @@ -589,6 +632,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "id": "9826c45f", "metadata": {}, @@ -607,6 +651,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "id": "dd962ddc", "metadata": {}, @@ -625,6 +670,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "id": "426f2395", "metadata": {}, @@ -642,9 +688,9 @@ ], "metadata": { "kernelspec": { - "display_name": "workflow-interface-py38", + "display_name": "Python 3 (ipykernel)", "language": "python", - "name": "workflow-interface-py38" + "name": "python3" }, "language_info": { "codemirror_mode": { @@ -656,7 +702,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.8.10" + "version": "3.8.17" } }, "nbformat": 4, diff --git a/openfl-tutorials/experimental/Workflow_Interface_102_Aggregator_Validation.ipynb b/openfl-tutorials/experimental/Workflow_Interface_102_Aggregator_Validation.ipynb index bd597dd6a49..aa787ab9ce5 100644 --- a/openfl-tutorials/experimental/Workflow_Interface_102_Aggregator_Validation.ipynb +++ b/openfl-tutorials/experimental/Workflow_Interface_102_Aggregator_Validation.ipynb @@ -1,6 +1,7 @@ { "cells": [ { + "attachments": {}, "cell_type": "markdown", "id": "14821d97", "metadata": {}, @@ -10,6 +11,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "id": "bd059520", "metadata": {}, @@ -18,6 +20,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "id": "fc8e35da", "metadata": {}, @@ -26,6 +29,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "id": "4dbb89b6", "metadata": {}, @@ -52,6 +56,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "id": "7237eac4", "metadata": {}, @@ -135,6 +140,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "id": "cd268911", "metadata": {}, @@ -165,14 +171,13 @@ " state_dicts = [model.state_dict() for model in models]\n", " state_dict = new_model.state_dict()\n", " for key in models[1].state_dict():\n", - " state_dict[key] = torch.from_numpy(np.average([state[key].numpy() for state in state_dicts],\n", - " axis=0, \n", - " weights=weights))\n", + " state_dict[key] = np.average(np.array([state[key] for state in state_dicts], dtype=object),axis=0,weights=weights)\n", " new_model.load_state_dict(state_dict)\n", " return new_model\n" ] }, { + "attachments": {}, "cell_type": "markdown", "id": "b2e45614", "metadata": { @@ -281,13 +286,16 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "id": "7a133f9f", "metadata": {}, "source": [ "You'll notice in the `FederatedFlow` definition above that there were certain attributes that the flow was not initialized with, namely the `train_loader` and `test_loader` for each of the collaborators. These are **private_attributes** that are exposed only throught he runtime. Each participant has it's own set of private attributes: a dictionary where the key is the attribute name, and the value is the object that will be made accessible through that participant's task. \n", "\n", - "Below, we segment shards of the MNIST dataset for **four collaborators**: Portland, Seattle, Chandler, and Portland. Each has their own slice of the dataset that's accessible via the `train_loader` or `test_loader` attribute. Note that the private attributes are flexible, and you can choose to pass in a completely different type of object to any of the collaborators or aggregator (with an arbitrary name). These private attributes will always be filtered out of the current state when transfering from collaborator to aggregator, or vice versa. " + "Below, we segment shards of the MNIST dataset for **four collaborators**: `Portland`, `Seattle`, `Chandler`, and `Portland`. Each has their own slice of the dataset that is accessible through the `train_loader` and `test_loader` attributes, which are set using the `callable_to_initialize_collaborator_private_attributes` callable function. Note that the private attributes are flexible, and you can choose to pass in a completely different type of object to any of the collaborators or aggregator (with an arbitrary name). These private attributes will always be filtered out of the current state when transfering from collaborator to aggregator, or vice versa.\n", + "\n", + "Private attributes can be set using callback function while instantiating the participant. Parameters required by the callback function are specified as arguments while instantiating the participant. In this example callback function, `callable_to_initialize_collaborator_private_attributes`, returns the private attributes `train_loader` and `test_loader` of the collaborator. Callback function, `callable_to_initialize_aggregator_private_attributes`, returns the private attribute `test_loader` of the Aggregator." ] }, { @@ -297,37 +305,55 @@ "metadata": {}, "outputs": [], "source": [ - "# Setup participants\n", - "aggregator = Aggregator()\n", - "\n", - "# Setup collaborators with private attributes\n", "collaborator_names = ['Portland', 'Seattle', 'Chandler','Bangalore']\n", - "collaborators = [Collaborator(name=name) for name in collaborator_names]\n", "\n", - "aggregator_test = deepcopy(mnist_test)\n", - "aggregator_test.targets = mnist_test.targets[len(collaborators)::len(collaborators)+1]\n", - "aggregator_test.data = mnist_test.data[len(collaborators)::len(collaborators)+1]\n", - "aggregator.private_attributes = {\n", - " 'test_loader': torch.utils.data.DataLoader(aggregator_test,batch_size=batch_size_train, shuffle=True)\n", - "}\n", + "def callable_to_initialize_aggregator_private_attributes(n_collaborators, test_dataset, batch_size_train):\n", + " aggregator_test = deepcopy(test_dataset)\n", + " aggregator_test.targets = test_dataset.targets[n_collaborators::n_collaborators+1]\n", + " aggregator_test.data = test_dataset.data[n_collaborators::n_collaborators+1]\n", + " return {\n", + " 'test_loader': torch.utils.data.DataLoader(aggregator_test,batch_size=batch_size_train, shuffle=True)\n", + " }\n", + "\n", + "# Setup Aggregator private attributes via callable function\n", + "aggregator = Aggregator(\n", + " name=\"agg\",\n", + " private_attributes_callable=callable_to_initialize_aggregator_private_attributes,\n", + " n_collaborators=len(collaborator_names),\n", + " test_dataset=mnist_test, batch_size_train=batch_size_train\n", + ")\n", "\n", - "for idx, collaborator in enumerate(collaborators):\n", - " local_train = deepcopy(mnist_train)\n", - " local_test = deepcopy(mnist_test)\n", - " local_train.data = mnist_train.data[idx::len(collaborators)]\n", - " local_train.targets = mnist_train.targets[idx::len(collaborators)]\n", - " local_test.data = mnist_test.data[idx::len(collaborators)+1]\n", - " local_test.targets = mnist_test.targets[idx::len(collaborators)+1]\n", - " collaborator.private_attributes = {\n", + "# Setup collaborators private attributes via callable function\n", + "def callable_to_initialize_collaborator_private_attributes(index, n_collaborators, train_dataset, test_dataset, batch_size_train):\n", + " local_train = deepcopy(train_dataset)\n", + " local_test = deepcopy(test_dataset)\n", + " local_train.data = train_dataset.data[index::n_collaborators]\n", + " local_train.targets = train_dataset.targets[index::n_collaborators]\n", + " local_test.data = test_dataset.data[index::n_collaborators]\n", + " local_test.targets = test_dataset.targets[index::n_collaborators]\n", + " \n", + " return {\n", " 'train_loader': torch.utils.data.DataLoader(local_train,batch_size=batch_size_train, shuffle=True),\n", " 'test_loader': torch.utils.data.DataLoader(local_test,batch_size=batch_size_train, shuffle=True)\n", " }\n", "\n", - "local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators, backend='single_process')\n", + "collaborators=[]\n", + "for idx, collaborator_name in enumerate(collaborator_names):\n", + " collaborators.append(\n", + " Collaborator(\n", + " name=collaborator_name, num_cpus=0, num_gpus=0,\n", + " private_attributes_callable=callable_to_initialize_collaborator_private_attributes,\n", + " index=idx, n_collaborators=len(collaborator_names),\n", + " train_dataset=mnist_train, test_dataset=mnist_test, batch_size_train=batch_size_train,\n", + " )\n", + " )\n", + "\n", + "local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators, backend='ray')\n", "print(f'Local runtime collaborators = {local_runtime.collaborators}')" ] }, { + "attachments": {}, "cell_type": "markdown", "id": "0525eaa9", "metadata": {}, @@ -345,12 +371,13 @@ "model = None\n", "best_model = None\n", "optimizer = None\n", - "flflow = AggregatorValidationFlow(model,optimizer)\n", + "flflow = AggregatorValidationFlow(model, optimizer)\n", "flflow.runtime = local_runtime\n", "flflow.run()" ] }, { + "attachments": {}, "cell_type": "markdown", "id": "8b9f8d25", "metadata": {}, @@ -368,9 +395,9 @@ ], "metadata": { "kernelspec": { - "display_name": "workflow-interface-py38", + "display_name": "new_test_env", "language": "python", - "name": "workflow-interface-py38" + "name": "python3" }, "language_info": { "codemirror_mode": { @@ -382,7 +409,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.8.10" + "version": "3.8.16" } }, "nbformat": 4, diff --git a/openfl-tutorials/experimental/Workflow_Interface_103_Cyclic_Institutional_Incremental_Learning.ipynb b/openfl-tutorials/experimental/Workflow_Interface_103_Cyclic_Institutional_Incremental_Learning.ipynb index 2331a9d66f5..0b65c194195 100644 --- a/openfl-tutorials/experimental/Workflow_Interface_103_Cyclic_Institutional_Incremental_Learning.ipynb +++ b/openfl-tutorials/experimental/Workflow_Interface_103_Cyclic_Institutional_Incremental_Learning.ipynb @@ -1,6 +1,7 @@ { "cells": [ { + "attachments": {}, "cell_type": "markdown", "id": "14821d97", "metadata": {}, @@ -28,6 +29,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "id": "fc8e35da", "metadata": {}, @@ -36,6 +38,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "id": "4dbb89b6", "metadata": {}, @@ -62,6 +65,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "id": "7237eac4", "metadata": {}, @@ -145,6 +149,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "id": "cd268911", "metadata": {}, @@ -175,14 +180,13 @@ " state_dicts = [model.state_dict() for model in models]\n", " state_dict = new_model.state_dict()\n", " for key in models[1].state_dict():\n", - " state_dict[key] = torch.from_numpy(np.average([state[key].numpy() for state in state_dicts],\n", - " axis=0, \n", - " weights=weights))\n", + " state_dict[key] = np.average(np.array([state[key] for state in state_dicts],dtype=object),axis=0,weights=weights)\n", " new_model.load_state_dict(state_dict)\n", " return new_model\n" ] }, { + "attachments": {}, "cell_type": "markdown", "id": "b2e45614", "metadata": { @@ -300,13 +304,16 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "id": "7a133f9f", "metadata": {}, "source": [ "You'll notice in the `CyclicFlow` definition above that each collaborator performs **aggregated_model_validation**, **training**, and **local_model_validation** before passing it's model on to the next collaborator (through the aggregator). \n", "\n", - "Below, we segment shards of the MNIST dataset for **four collaborators**: Portland, Seattle, Chandler, and Portland **equally and IID**. Each has their own slice of the dataset that's accessible via the `train_loader` or `test_loader` attribute. Note that the private attributes are flexible, and you can choose to pass in a completely different type of object to any of the collaborators or aggregator (with an arbitrary name). These private attributes will always be filtered out of the current state when transfering from collaborator to aggregator, or vice versa. " + "Below, we segment shards of the MNIST dataset for **four collaborators**: Portland, Seattle, Chandler, and Portland **equally and IID**. Each has their own slice of the dataset that is accessible through the `train_loader` and `test_loader` attributes, which are set using the `callable_to_initialize_collaborator_private_attributes` callable function. Note that the private attributes are flexible, and you can choose to pass in a completely different type of object to any of the collaborators or aggregator (with an arbitrary name). These private attributes will always be filtered out of the current state when transfering from collaborator to aggregator, or vice versa.\n", + "\n", + "Private attributes can be set using callback function while instantiating the participant. Parameters required by the callback function are specified as arguments while instantiating the participant. In this example callback function, `callable_to_initialize_collaborator_private_attributes`, returns the private attributes `train_loader` and `test_loader` of the collaborator. Callback function, `callable_to_initialize_aggregator_private_attributes`, returns the private attribute `test_loader` of the Aggregator." ] }, { @@ -316,37 +323,58 @@ "metadata": {}, "outputs": [], "source": [ - "# Setup participants\n", - "agg = Aggregator()\n", + "collaborator_names = ['Portland', 'Seattle','Chandler','Bangalore']\n", "\n", - "# Setup collaborators with private attributes\n", - "collaborator_names = ['Portland', 'Seattle', 'Chandler','Bangalore']\n", - "collaborators = [Collaborator(name=name) for name in collaborator_names]\n", - "\n", - "aggregator_test = deepcopy(mnist_test)\n", - "aggregator_test.targets = mnist_test.targets[len(collaborators)::len(collaborators)+1]\n", - "aggregator_test.data = mnist_test.data[len(collaborators)::len(collaborators)+1]\n", - "aggregator.private_attributes = {\n", - " 'test_loader': torch.utils.data.DataLoader(aggregator_test,batch_size=batch_size_train, shuffle=True)\n", - "}\n", - "\n", - "for idx, col in enumerate(collaborators):\n", - " local_train = deepcopy(mnist_train)\n", - " local_test = deepcopy(mnist_test)\n", - " local_train.data = mnist_train.data[idx::len(collaborators)]\n", - " local_train.targets = mnist_train.targets[idx::len(collaborators)]\n", - " local_test.data = mnist_test.data[idx::len(collaborators)+1]\n", - " local_test.targets = mnist_test.targets[idx::len(collaborators)+1]\n", - " col.private_attributes = {\n", - " 'train_loader': torch.utils.data.DataLoader(local_train,batch_size=batch_size_train, shuffle=True),\n", - " 'test_loader': torch.utils.data.DataLoader(local_test,batch_size=batch_size_train, shuffle=True)\n", + "def callable_to_initialize_aggregator_private_attributes(n_collaborators, test_dataset,\n", + " batch_size):\n", + " aggregator_test = deepcopy(test_dataset)\n", + " aggregator_test.targets = test_dataset.targets[n_collaborators::n_collaborators+1]\n", + " aggregator_test.data = test_dataset.data[n_collaborators::n_collaborators+1]\n", + "\n", + " return {\n", + " 'test_loader': torch.utils.data.DataLoader(aggregator_test, batch_size=batch_size, shuffle=True)\n", + " }\n", + "\n", + "# Setup Aggregator private attributes via callable function\n", + "agg = Aggregator(\n", + " name=\"agg\",\n", + " private_attributes_callable=callable_to_initialize_aggregator_private_attributes,\n", + " n_collaborators=len(collaborator_names), test_dataset=mnist_test,\n", + " batch_size=batch_size_test\n", + ")\n", + "\n", + "def callable_to_initialize_collaborator_private_attributes(index, n_collaborators,\n", + " batch_size_train, train_dataset, test_dataset):\n", + " local_train = deepcopy(train_dataset)\n", + " local_test = deepcopy(test_dataset)\n", + " local_train.data = train_dataset.data[index::n_collaborators]\n", + " local_train.targets = train_dataset.targets[index::n_collaborators]\n", + " local_test.data = test_dataset.data[index::n_collaborators+1]\n", + " local_test.targets = test_dataset.targets[index::n_collaborators+1]\n", + "\n", + " return {\n", + " 'train_loader': torch.utils.data.DataLoader(local_train, batch_size=batch_size_train, shuffle=True),\n", + " 'test_loader': torch.utils.data.DataLoader(local_test, batch_size=batch_size_train, shuffle=True)\n", " }\n", "\n", - "local_runtime = LocalRuntime(aggregator=agg, collaborators=collaborators, backend='single_process')\n", + "# Setup collaborators private attributes via callable function\n", + "collaborators=[]\n", + "for idx, collaborator_name in enumerate(collaborator_names):\n", + " collaborators.append(\n", + " Collaborator( \n", + " name=collaborator_name, num_cpus=0, num_gpus=0,\n", + " private_attributes_callable=callable_to_initialize_collaborator_private_attributes,\n", + " index=idx, n_collaborators=len(collaborator_names), batch_size_train=batch_size_train,\n", + " train_dataset=mnist_train, test_dataset=mnist_test\n", + " )\n", + " )\n", + "\n", + "local_runtime = LocalRuntime(aggregator=agg, collaborators=collaborators, backend='ray') \n", "print(f'Local runtime collaborators = {local_runtime.collaborators}')" ] }, { + "attachments": {}, "cell_type": "markdown", "id": "0525eaa9", "metadata": {}, @@ -370,6 +398,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "id": "ad93c508", "metadata": {}, @@ -484,6 +513,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "id": "e58fe9cd", "metadata": {}, @@ -498,45 +528,50 @@ "metadata": {}, "outputs": [], "source": [ - "# Setup participants\n", - "agg = Aggregator()\n", - "\n", - "# Setup collaborators with private attributes\n", "collaborator_names = ['Portland', 'Seattle', 'Chandler','Bangalore']\n", - "collaborators = [Collaborator(name=name) for name in collaborator_names]\n", - "\n", - "aggregator_test = deepcopy(mnist_test)\n", - "aggregator_test.targets = mnist_test.targets[len(collaborators)::len(collaborators)+1]\n", - "aggregator_test.data = mnist_test.data[len(collaborators)::len(collaborators)+1]\n", - "aggregator.private_attributes = {\n", - " 'test_loader': torch.utils.data.DataLoader(aggregator_test,batch_size=batch_size_train, shuffle=True)\n", - "}\n", - "\n", - "for idx, col in enumerate(collaborators):\n", - " local_train = deepcopy(mnist_train)\n", - " local_test = deepcopy(mnist_test)\n", - " local_train.data = mnist_train.data[idx::len(collaborators)]\n", - " local_train.targets = mnist_train.targets[idx::len(collaborators)]\n", - " if col.name == 'Portland':\n", + "\n", + "def callable_to_initialize_aggregator_private_attributes(n_collaborators, test_dataset, batch_size_test):\n", + " aggregator_test = deepcopy(test_dataset)\n", + " aggregator_test.targets = test_dataset.targets[n_collaborators::n_collaborators+1]\n", + " aggregator_test.data = test_dataset.data[n_collaborators::n_collaborators+1]\n", + "\n", + " return {\n", + " 'test_loader': torch.utils.data.DataLoader(aggregator_test,batch_size=batch_size_test, shuffle=True)\n", + " }\n", + "\n", + "# Setup Aggregator private attributes via callable function\n", + "agg = Aggregator(\n", + " name=\"agg\",\n", + " private_attributes_callable=callable_to_initialize_aggregator_private_attributes,\n", + " n_collaborators = len(collaborator_names), test_dataset=mnist_test,\n", + " batch_size_test=batch_size_test\n", + ")\n", + "\n", + "def callable_to_initialize_collaborator_private_attributes(index, n_collaborators, batch_size_train, train_dataset, test_dataset):\n", + " local_train = deepcopy(train_dataset)\n", + " local_test = deepcopy(test_dataset)\n", + " local_train.data = train_dataset.data[index::n_collaborators]\n", + " local_train.targets = train_dataset.targets[index::n_collaborators]\n", + " if collaborator_name == 'Portland':\n", " # Remove the 0 class from Portland\n", " mask = local_train.targets != 1\n", " local_train.data = local_train.data[mask]\n", " local_train.targets = local_train.targets[mask]\n", - " if col.name == 'Seattle':\n", + " if collaborator_name == 'Seattle':\n", " # Seattle has 500 samples of class 1 (exclusively)\n", " mask = local_train.targets == 1\n", " local_train.data = local_train.data[mask]\n", " local_train.targets = local_train.targets[mask]\n", " local_train.data = local_train.data[:500]\n", " local_train.targets = local_train.targets[:500]\n", - " if col.name == 'Chandler':\n", + " if collaborator_name == 'Chandler':\n", " # Chandler has 300 samples of class 2 (exclusively)\n", " mask = local_train.targets == 2\n", " local_train.data = local_train.data[mask]\n", " local_train.targets = local_train.targets[mask]\n", " local_train.data = local_train.data[:300]\n", " local_train.targets = local_train.targets[:300]\n", - " if col.name == 'Bangalore':\n", + " if collaborator_name == 'Bangalore':\n", " # Bangalore has 300 samples of class 3 (exclusively)\n", " mask = local_train.targets == 3\n", " local_train.data = local_train.data[mask]\n", @@ -544,14 +579,26 @@ " local_train.data = local_train.data[:500]\n", " local_train.targets = local_train.targets[:500]\n", " # Test data is left unchanged (all classes represented)\n", - " local_test.data = mnist_test.data[idx::len(collaborators)+1]\n", - " local_test.targets = mnist_test.targets[idx::len(collaborators)+1]\n", - " col.private_attributes = {\n", - " 'train_loader': torch.utils.data.DataLoader(local_train,batch_size=batch_size_train, shuffle=True),\n", - " 'test_loader': torch.utils.data.DataLoader(local_test,batch_size=batch_size_train, shuffle=True)\n", + " local_test.data = test_dataset.data[index::n_collaborators+1]\n", + " local_test.targets = test_dataset.targets[index::n_collaborators+1]\n", + " return {\n", + " 'train_loader': torch.utils.data.DataLoader(local_train,batch_size=batch_size_train, shuffle=True),\n", + " 'test_loader': torch.utils.data.DataLoader(local_test,batch_size=batch_size_train, shuffle=True)\n", " }\n", "\n", - "local_runtime = LocalRuntime(aggregator=agg, collaborators=collaborators, backend='single_process')\n", + "# Setup collaborators private attributes via callable function\n", + "collaborators=[]\n", + "for idx, collaborator_name in enumerate(collaborator_names):\n", + " collaborators.append(\n", + " Collaborator(\n", + " name=collaborator_name, num_cpus=0, num_gpus=0,\n", + " private_attributes_callable=callable_to_initialize_collaborator_private_attributes,\n", + " index=idx, n_collaborators=len(collaborator_names), batch_size_train=batch_size_train,\n", + " train_dataset=mnist_train, test_dataset=mnist_test,\n", + " )\n", + " )\n", + "\n", + "local_runtime = LocalRuntime(aggregator=agg, collaborators=collaborators, backend='ray')\n", "print(f'Local runtime collaborators = {local_runtime.collaborators}')" ] }, @@ -565,7 +612,7 @@ "model = None\n", "best_model = None\n", "optimizer = None\n", - "clflow2 = CyclicLearningFlow(model,optimizer,rounds=4)\n", + "clflow2 = CyclicLearningFlow(model, optimizer, rounds=4)\n", "clflow2.runtime = local_runtime\n", "clflow2.run()" ] @@ -580,12 +627,13 @@ "model = None\n", "best_model = None\n", "optimizer = None\n", - "flflow2 = FederatedFlow(model,optimizer,rounds=4)\n", + "flflow2 = FederatedFlow(model, optimizer, rounds=4)\n", "flflow2.runtime = local_runtime\n", "flflow2.run()" ] }, { + "attachments": {}, "cell_type": "markdown", "id": "285d63a9", "metadata": {}, @@ -594,6 +642,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "id": "8b9f8d25", "metadata": {}, @@ -611,9 +660,9 @@ ], "metadata": { "kernelspec": { - "display_name": "workflow-interface-py38", + "display_name": "Python 3 (ipykernel)", "language": "python", - "name": "workflow-interface-py38" + "name": "python3" }, "language_info": { "codemirror_mode": { @@ -625,7 +674,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.8.10" + "version": "3.8.16" } }, "nbformat": 4, diff --git a/openfl-tutorials/experimental/Workflow_Interface_104_Keras_MNIST_with_GPU.ipynb b/openfl-tutorials/experimental/Workflow_Interface_104_Keras_MNIST_with_GPU.ipynb index 7312c8baa46..0df9bf8db2b 100644 --- a/openfl-tutorials/experimental/Workflow_Interface_104_Keras_MNIST_with_GPU.ipynb +++ b/openfl-tutorials/experimental/Workflow_Interface_104_Keras_MNIST_with_GPU.ipynb @@ -50,7 +50,7 @@ }, { "cell_type": "code", - "execution_count": 1, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -77,13 +77,12 @@ "metadata": {}, "outputs": [], "source": [ - "from keras.layers import Flatten, Dense, Dropout, Conv2D, MaxPool2D\n", - "from keras.models import Sequential\n", - "from keras.datasets import mnist\n", + "from tensorflow.keras.layers import Flatten, Dense, Dropout, Conv2D, MaxPool2D\n", + "from tensorflow.keras.models import Sequential\n", + "from tensorflow.keras.datasets import mnist\n", "from tensorflow.keras.utils import to_categorical\n", "\n", "nb_classes = 10\n", - "batch_size=32\n", "(X_train, y_train), (X_test, y_test) = mnist.load_data()\n", "print(\"X_train original shape\", X_train.shape)\n", "print(\"y_train original shape\", y_train.shape)\n", @@ -98,8 +97,6 @@ "Y_train = to_categorical(y_train, nb_classes)\n", "Y_test = to_categorical(y_test, nb_classes)\n", "\n", - "train_dataset=(X_train, Y_train)\n", - "test_dataset=(X_test, Y_test)\n", "\n", "model = Sequential([\n", " Conv2D(filters=32, kernel_size=(3, 3), activation=\"relu\", input_shape=(28, 28, 1)),\n", @@ -112,7 +109,7 @@ " Dense(nb_classes, activation=\"softmax\"),\n", "])\n", "\n", - "model.compile(optimizer=\"SGD\", loss=\"categorical_crossentropy\", metrics=[\"accuracy\"])\n", + "model.compile(optimizer=\"adam\", loss=\"categorical_crossentropy\", metrics=[\"accuracy\"])\n", "print(model.summary())\n", "\n", "\n", @@ -142,7 +139,7 @@ }, { "cell_type": "code", - "execution_count": 3, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -195,14 +192,14 @@ " self.collaborators = self.runtime.collaborators\n", " self.next(self.aggregated_model_validation, foreach='collaborators')\n", "\n", - " @collaborator\n", + " @collaborator(num_gpus=1)\n", " def aggregated_model_validation(self):\n", " print(f'Performing aggregated model validation for collaborator {self.input}')\n", " self.agg_validation_score = inference(self.model, self.test_loader, self.batch_size)\n", " print(f'{self.input} value of {self.agg_validation_score}')\n", " self.next(self.train)\n", "\n", - " @collaborator\n", + " @collaborator(num_gpus=1)\n", " def train(self):\n", " x_train, y_train = self.train_loader\n", " history = self.model.fit(\n", @@ -214,7 +211,7 @@ " self.loss = history.history[\"loss\"][0]\n", " self.next(self.local_model_validation)\n", "\n", - " @collaborator\n", + " @collaborator(num_gpus=1)\n", " def local_model_validation(self):\n", " self.local_validation_score = inference(self.model, self.test_loader, self.batch_size)\n", " print(\n", @@ -253,9 +250,9 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "You'll notice in the `FederatedFlow` definition above that there were certain attributes that the flow was not initialized with, namely the `train_loader`, `test_loader` and `batch_size` for each of the collaborators. These are **private_attributes** that are exposed only throught he runtime. Each participant has it's own set of private attributes: a dictionary where the key is the attribute name, and the value is the object that will be made accessible through that participant's task. \n", + "Note that the private attributes are flexible, and you can choose to pass in a completely different type of object to any of the collaborators or aggregator (with an arbitrary name). These private attributes will always be filtered out of the current state when transferring from collaborator to aggregator, or vice versa. \n", "\n", - "Below, we segment shards of the MNIST dataset for **two collaborators**: Portland and Seattle. Each has their own slice of the dataset that's accessible via the `train_loader` or `test_loader` attribute. Note that the private attributes are flexible, and you can choose to pass in a completely different type of object to any of the collaborators or aggregator (with an arbitrary name). These private attributes will always be filtered out of the current state when transfering from collaborator to aggregator, or vice versa. " + "Private attributes can be set using callback function while instantiating the participant. Parameters required by the callback function are specified as arguments while instantiating the participant. In this example callback function, `callable_to_initialize_collaborator_private_attributes`, returns the private attributes `train_loader`, `test_loader` and `batch_size` of the collaborator. Parameters required by the callback function `index`, `n_collaborators`, `batch_size`, `train_dataset`, `test_dataset` are passed appropriate values with the same names in the Collaborator constructor." ] }, { @@ -264,13 +261,11 @@ "metadata": {}, "outputs": [], "source": [ - "# Aggregator\n", "agg = Aggregator()\n", "\n", - "# Setup collaborators with private attributes\n", "collaborator_names = [\"Portland\", \"Seattle\"]\n", - "collaborators = [Collaborator(name=name) for name in collaborator_names]\n", - "for idx, collaborator in enumerate(collaborators):\n", + "\n", + "def callable_to_initialize_collaborator_private_attributes(n_collaborators, index, train_dataset, test_dataset, batch_size):\n", " from openfl.utilities.data_splitters import EqualNumPyDataSplitter\n", " train_splitter = EqualNumPyDataSplitter()\n", " test_splitter = EqualNumPyDataSplitter()\n", @@ -278,18 +273,30 @@ " X_train, y_train = train_dataset\n", " X_test, y_test = test_dataset\n", "\n", - " train_idx = train_splitter.split(y_train, len(collaborator_names))\n", - " valid_idx = test_splitter.split(y_test, len(collaborator_names))\n", + " train_idx = train_splitter.split(y_train, n_collaborators)\n", + " valid_idx = test_splitter.split(y_test, n_collaborators)\n", "\n", - " train_dataset = X_train[train_idx[idx]], y_train[train_idx[idx]]\n", - " test_dataset = X_test[valid_idx[idx]], y_test[valid_idx[idx]]\n", + " train_dataset = X_train[train_idx[index]], y_train[train_idx[index]]\n", + " test_dataset = X_test[valid_idx[index]], y_test[valid_idx[index]]\n", "\n", - " collaborator.private_attributes = {\n", + " return {\n", " \"train_loader\": train_dataset, \"test_loader\": test_dataset,\n", " \"batch_size\": batch_size\n", " }\n", "\n", - "local_runtime = LocalRuntime(aggregator=agg, collaborators=collaborators, backend='ray')\n", + "# Setup collaborators private attributes via callable function\n", + "collaborators = []\n", + "for idx, collaborator_name in enumerate(collaborator_names):\n", + " collaborators.append(\n", + " Collaborator(\n", + " name=collaborator_name, num_cpus=0, num_gpus=0,\n", + " private_attributes_callable=callable_to_initialize_collaborator_private_attributes,\n", + " n_collaborators=len(collaborator_names), index=idx, train_dataset=(X_train, Y_train),\n", + " test_dataset=(X_test, Y_test), batch_size=32\n", + " )\n", + " )\n", + "\n", + "local_runtime = LocalRuntime(aggregator=agg, collaborators=collaborators, backend=\"ray\")\n", "print(f'Local runtime collaborators = {local_runtime.collaborators}')" ] }, diff --git a/openfl-tutorials/experimental/Workflow_Interface_201_Exclusive_GPUs_with_Ray.ipynb b/openfl-tutorials/experimental/Workflow_Interface_201_Exclusive_GPUs_with_Ray.ipynb index e9148b0a750..509afd98dcf 100644 --- a/openfl-tutorials/experimental/Workflow_Interface_201_Exclusive_GPUs_with_Ray.ipynb +++ b/openfl-tutorials/experimental/Workflow_Interface_201_Exclusive_GPUs_with_Ray.ipynb @@ -1,6 +1,7 @@ { "cells": [ { + "attachments": {}, "cell_type": "markdown", "id": "14821d97", "metadata": {}, @@ -10,6 +11,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "id": "bd059520", "metadata": {}, @@ -18,6 +20,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "id": "fc8e35da", "metadata": {}, @@ -26,6 +29,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "id": "4dbb89b6", "metadata": {}, @@ -52,6 +56,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "id": "7237eac4", "metadata": {}, @@ -140,6 +145,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "id": "cd268911", "metadata": {}, @@ -165,28 +171,29 @@ "from openfl.experimental.placement import aggregator, collaborator\n", "\n", "\n", - "def FedAvg(models, weights=None):\n", + "def FedAvg(models):\n", " models = [model.to('cpu') for model in models]\n", " new_model = models[0]\n", " state_dicts = [model.state_dict() for model in models]\n", " state_dict = new_model.state_dict()\n", " for key in models[1].state_dict():\n", - " state_dict[key] = torch.from_numpy(np.average([state[key].numpy() for state in state_dicts],\n", - " axis=0, \n", - " weights=weights))\n", + " state_dict[key] = np.sum(np.array(\n", + " [state[key] for state in state_dicts],\n", + " dtype=object\n", + " ), axis=0) / len(models)\n", " new_model.load_state_dict(state_dict)\n", - " return new_model\n", - "\n" + " return new_model" ] }, { + "attachments": {}, "cell_type": "markdown", "id": "b2e45614", "metadata": { "scrolled": true }, "source": [ - "Now we come to the updated flow definition. Here we request `@collaborator(num_gpus=1)` as the placement decorator, which will require a dedicated GPU for each collaborator task. Tune this based on your use case, but because this uses Ray internally, you can also pass through a [fraction of a GPU](https://docs.ray.io/en/latest/ray-core/tasks/using-ray-with-gpus.html#fractional-gpus), which will allow more than one task to run on each GPU (i.e. `@collaborator(num_gpus=0.5)` would result in two tasks per GPU). " + "Now we come to the updated flow definition." ] }, { @@ -217,14 +224,14 @@ " self.current_round = 0\n", " self.next(self.aggregated_model_validation,foreach='collaborators',exclude=['private'])\n", "\n", - " @collaborator(num_gpus=1)\n", + " @collaborator\n", " def aggregated_model_validation(self):\n", " print(f'Performing aggregated model validation for collaborator {self.input}')\n", " self.agg_validation_score = inference(self.model,self.test_loader)\n", " print(f'{self.input} value of {self.agg_validation_score}')\n", " self.next(self.train)\n", "\n", - " @collaborator(num_gpus=1)\n", + " @collaborator\n", " def train(self):\n", " \"\"\"\n", " Train the model.\n", @@ -249,7 +256,7 @@ " self.loss = loss.item()\n", " self.next(self.local_model_validation)\n", "\n", - " @collaborator(num_gpus=1)\n", + " @collaborator\n", " def local_model_validation(self):\n", " self.local_validation_score = inference(self.model,self.test_loader)\n", " print(f'Doing local model validation for collaborator {self.input}: {self.local_validation_score}')\n", @@ -273,19 +280,18 @@ " \n", " @aggregator\n", " def end(self):\n", - " print(f'This is the end of the flow') " + " print(f'This is the end of the flow')" ] }, { + "attachments": {}, "cell_type": "markdown", - "id": "7a133f9f", + "id": "49c4afa8", "metadata": {}, "source": [ - "You'll notice in the `FederatedFlow` definition above that there were certain attributes that the flow was not initialized with, namely the `train_loader` and `test_loader` for each of the collaborators. These are **private_attributes** that are exposed only throught he runtime. Each participant has it's own set of private attributes: a dictionary where the key is the attribute name, and the value is the object that will be made accessible through that participant's task. \n", + "In this step we define entities necessary to run the flow and create a function which returns dataset as private attributes of collaborator. As described in [quickstart](https://github.com/securefederatedai/openfl/blob/develop/openfl-tutorials/experimental/Workflow_Interface_101_MNIST.ipynb) we define entities necessary for the flow.\n", "\n", - "Below, we segment shards of the MNIST dataset for **four collaborators**: Portland, Seattle, Chandler, and Portland. Each has their own slice of the dataset that's accessible via the `train_loader` or `test_loader` attribute. Note that the private attributes are flexible, and you can choose to pass in a completely different type of object to any of the collaborators or aggregator (with an arbitrary name). These private attributes will always be filtered out of the current state when transfering from collaborator to aggregator, or vice versa. \n", - "\n", - "The LocalRuntime is now initialized **without** the backend argument. Now the LocalRuntime will default to `backend='ray'`, which allows passing through the `num_gpus` argument in the placement decorator" + "To request GPU(s) with ray-backend, we specify `num_gpus=0.5` as the argument while instantiating Collaborator, this will reserve 0.5 GPU for each of the 2 collaborators and therefore require a dedicated GPU for the exeriment. Tune this based on your use case, for example `num_gpus=0.5` for an experiment with 4 collaborators will require 2 dedicated GPUs. **NOTE:** Collaborator cannot span over multiple GPUs, for example `num_gpus=0.4` with 5 collaborators will require 3 dedicated GPUs. In this case collaborator 1 and 2 use GPU#1, collaborator 3 and 4 use GPU#2, and collaborator 5 uses GPU#3." ] }, { @@ -295,39 +301,52 @@ "metadata": {}, "outputs": [], "source": [ - "# Setup participants\n", + "# Setup Aggregator private attributes via callable function\n", "aggregator = Aggregator()\n", - "aggregator.private_attributes = {}\n", "\n", - "# Setup collaborators with private attributes\n", - "collaborator_names = ['Portland', 'Seattle', 'Chandler','Bangalore']\n", - "collaborators = [Collaborator(name=name) for name in collaborator_names]\n", - "for idx, collaborator in enumerate(collaborators):\n", - " local_train = deepcopy(mnist_train)\n", - " local_test = deepcopy(mnist_test)\n", - " local_train.data = mnist_train.data[idx::len(collaborators)]\n", - " local_train.targets = mnist_train.targets[idx::len(collaborators)]\n", - " local_test.data = mnist_test.data[idx::len(collaborators)]\n", - " local_test.targets = mnist_test.targets[idx::len(collaborators)]\n", - " collaborator.private_attributes = {\n", - " 'train_loader': torch.utils.data.DataLoader(local_train,batch_size=batch_size_train, shuffle=True),\n", - " 'test_loader': torch.utils.data.DataLoader(local_test,batch_size=batch_size_train, shuffle=True)\n", + "collaborator_names = ['Portland', 'Seattle']\n", + "\n", + "def callable_to_initialize_collaborator_private_attributes(index, n_collaborators,\n", + " train_dataset, test_dataset, batch_size_train):\n", + " local_train = deepcopy(train_dataset)\n", + " local_test = deepcopy(test_dataset)\n", + " local_train.data = train_dataset.data[index::n_collaborators]\n", + " local_train.targets = train_dataset.targets[index::n_collaborators]\n", + " local_test.data = test_dataset.data[index::n_collaborators]\n", + " local_test.targets = test_dataset.targets[index::n_collaborators]\n", + "\n", + " return {\n", + " 'train_loader': torch.utils.data.DataLoader(local_train,batch_size=batch_size_train, shuffle=True),\n", + " 'test_loader': torch.utils.data.DataLoader(local_test, batch_size=batch_size_train, shuffle=True)\n", " }\n", "\n", + "# Setup collaborators private attributes via callable function\n", + "collaborators = []\n", + "for idx, collaborator_name in enumerate(collaborator_names):\n", + " collaborators.append(\n", + " Collaborator(\n", + " name=collaborator_name, num_cpus=0, num_gpus=0.5,\n", + " private_attributes_callable=callable_to_initialize_collaborator_private_attributes,\n", + " index=idx, n_collaborators=len(collaborator_names),\n", + " train_dataset=mnist_train, test_dataset=mnist_test, batch_size_train=batch_size_train\n", + " )\n", + " )\n", + "\n", "# The following is equivalent to\n", "# local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators, **backend='ray'**)\n", - "local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators)\n", + "local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators, backend='ray')\n", "print(f'Local runtime collaborators = {local_runtime.collaborators}')" ] }, { + "attachments": {}, "cell_type": "markdown", "id": "0525eaa9", "metadata": {}, "source": [ "Now that we have our flow and runtime defined, let's run the experiment! \n", "\n", - "(If you run this example on Google Colab with the GPU Runtime, you should see one task executing at a time.)" + "(If you run this example on Google Colab with the GPU Runtime, you should see two task executing at a time.)" ] }, { @@ -340,12 +359,13 @@ "model = None\n", "best_model = None\n", "optimizer = None\n", - "flflow = CollaboratorGPUFlow(model,optimizer,checkpoint=True)\n", + "flflow = CollaboratorGPUFlow(model, optimizer, checkpoint=True)\n", "flflow.runtime = local_runtime\n", "flflow.run()" ] }, { + "attachments": {}, "cell_type": "markdown", "id": "10616d60", "metadata": {}, @@ -366,6 +386,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "id": "8e084b41", "metadata": {}, @@ -383,16 +404,6 @@ "run_id = flflow._run_id" ] }, - { - "cell_type": "code", - "execution_count": null, - "id": "composed-burst", - "metadata": {}, - "outputs": [], - "source": [ - "import metaflow" - ] - }, { "cell_type": "code", "execution_count": null, @@ -415,6 +426,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "id": "f8f7d05f", "metadata": {}, @@ -443,6 +455,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "id": "a206b36c", "metadata": {}, @@ -461,6 +474,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "id": "bf4ec317", "metadata": {}, @@ -499,11 +513,12 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "id": "0c5522b7", "metadata": {}, "source": [ - "Now we see **12** steps: **4** collaborators each performed **3** rounds of model training " + "Now we see **6** steps: **2** collaborators each performed **3** rounds of model training " ] }, { @@ -513,7 +528,7 @@ "metadata": {}, "outputs": [], "source": [ - "t = Task(f'FederatedFlow/{run_id}/train/9')" + "t = Task(f'CollaboratorGPUFlow/{run_id}/train/11')" ] }, { @@ -527,6 +542,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "id": "efd5da76", "metadata": {}, @@ -555,6 +571,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "id": "3e92fab0", "metadata": {}, @@ -573,6 +590,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "id": "ced6e90e", "metadata": {}, @@ -591,6 +609,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "id": "8b9f8d25", "metadata": {}, @@ -603,21 +622,13 @@ "- Differential Privacy\n", "- And More!" ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "34fcbaa6", - "metadata": {}, - "outputs": [], - "source": [] } ], "metadata": { "kernelspec": { - "display_name": "workflow-interface-py38", + "display_name": "openfl_github_collab_private", "language": "python", - "name": "workflow-interface-py38" + "name": "python3" }, "language_info": { "codemirror_mode": { @@ -629,7 +640,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.8.10" + "version": "3.8.16" }, "vscode": { "interpreter": { diff --git a/openfl/experimental/utilities/stream_redirect.py b/openfl/experimental/utilities/stream_redirect.py index ba657fecad3..d0afa47bf4e 100644 --- a/openfl/experimental/utilities/stream_redirect.py +++ b/openfl/experimental/utilities/stream_redirect.py @@ -6,6 +6,7 @@ import sys import io from copy import deepcopy +import termcolor class RedirectStdStreamBuffer: @@ -44,6 +45,7 @@ def __init__(self, buffer, destination): self.__stdBuffer = buffer def write(self, message): + message = f"\33[94m{message}\33[0m" self.__stdDestination.write(message) self.__stdBuffer.write(message)