我已经在我的单机上试用了TFF tutorial ( MNIST ),现在我正尝试使用MNIST数据执行多机过程。
显然,我不能使用create_tf_dataset_for_client,所以我使用GRPC来学习如何将数据从一台机器传递到另一台机器。
我的场景是,Server将初始模型(带零)分派给所有参与的客户端,在这些客户端上,模型将在本地数据上运行。每个客户端都会将新的权重分派到将执行federated_mean的服务器。
我正在考虑使用tff.learning.build_federated_averaging_process来定制next函数(第二个参数),但我失败了……我甚至不确定我们是否使用这种方法发送模型并从远程客户端获取权重。
然后我想我可以在@tff.federated_computation装饰器下使用tff.federated_mean。但是,由于权重是数组,并且我有一个它们的列表(因为我有许多客户端),所以我无法理解如何创建指向列表列表的tff.FederatedType。对分布式数据集进行联合建模的人提供的任何帮助都很容易理解。
问候你,戴夫。
发布于 2020-09-05 00:29:07
TFF计算被设计为平台/运行时不可知的;单个计算可以由几个不同的后端执行。
在这里,TFF的类型系统对于推断数据在计算中的预期流动方式很有帮助。有关TFF如何考虑类型的介绍,请参阅custom federated algorithms part 1 tutorial。
build_federated_averaging_process的结果需要一个放在客户端的数据集的参数;对于元素类型为T的数据集,在TFF通常的表示法中,这将被表示为{T*}@C。这个特征细节对于数据集如何到达客户端是不可知的,或者实际上客户端本身是如何表示的。
具体化数据和表示客户端实际上是运行时的工作。TFF在这里提供了一些所谓的native选项。
例如,在本地Python运行时中,客户端由本地计算机上的线程表示。Datasets只是急切的tf.data.Dataset对象,线程在训练期间从数据集中提取数据。
在远程Python运行时中,客户端由远程工作者(线程)表示,因此一个远程工作者可以运行多个客户端。在这种情况下,正如您所提到的,数据必须在远程员工上具体化,才能进行培训。
有几种方法可以实现这一点。
其一,TFF将实际为您处理跨此RPC连接的渴望数据集的序列化和反序列化,因此您可以使用与本地运行时中相同的指定数据模式,并且它应该“正常工作”。通过使用tf.raw_ops.DatasetToGraphV2,这种模式实际上在2021年3月变得更好了。
然而,也许更好地映射到联邦计算的概念是使用一些库函数来简单地实例化工作者上的数据集。
假设您有一个迭代流程ip,它接受state和data参数,其中data的类型为{T*}@C。进一步假设我们有一个TFF计算get_dataset_for_client_id,它接受一个字符串并返回一个适当类型的数据集(IE,它的TFF类型签名是tf.str -> T*)。
然后我们可以将这两个计算组合成另一个:
@tff.federated_computation(STATE_TYPE, tff.FederatedType(tf.string, tff.CLIENTS))
def new_next(state, client_ids):
datasets_on_clients = tff.federated_map(get_dataset_for_client_id, client_ids)
return ip.next(state, datasets_on_clients)new_next现在要求控制器只指定要训练的客户端的ids,并将指向数据存储的责任委托给代表客户端的任何人。
我认为这个模式很可能就是您想要的;TFF提供了一些帮助器,比如tff.simulation.ClientData和tff.simulation.compose_dataset_computation_with_iterative_process上的dataset_computation属性,它们或多或少会执行我们上面为您完成的连接。
发布于 2020-10-06 04:41:04
让我们一步一步地来做。如果下面的解释回答了你的问题,请让我们知道。
@tff.tf_computation(tff.SequenceType(tf.int32))
def process_data(ds):
return ds.reduce(np.int32(0), lambda x, y: x + y)此代码在输入时获取整数的数据集,并在输出时返回具有总和的单个整数。
您可以通过查看类型签名来确认这一点,如下所示:
str(process_data.type_signature)您应该看到以下内容:
(int32* -> int32)因此,process_data接受一组整数,并返回一个整数。
@tff.federated_computation(tff.FederatedType(tff.SequenceType(tf.int32), tff.CLIENTS))
def process_data_on_clients(federated_ds):
return tff.federated_map(process_data, federated_ds)如果您查看这个新计算的类型签名(就像上面一样),您将看到以下内容:
({int32*}@CLIENTS -> {int32}@CLIENTS)这意味着process_data_on_clients接受一组联合的整数(每个客户端一个),并返回一个联合的整数(一个整数与每个客户端的和)。
上面发生的事情是,process_data中的TF逻辑将在每个客户端上执行一次。这就是federated_map操作符的工作方式。
process_data_on_clients有点像你正在使用的迭代过程。它希望您提供一个联合数据集作为参数。让我们看看如何通过遵循与上面相同的模式来制作一个。
下面是一些TF代码,它创建一个包含整数的数据集,假设您提供了一个整数n,并希望创建一个从1到n的数据集,即{1,2,...,n}:
@tff.tf_computation(tf.int32)
def make_data(n):
return tf.data.Dataset.range(tf.cast(n, tf.int64)).map(lambda x: tf.cast(x + 1, tf.int32))这显然是一个愚蠢的例子,你可以做一些你需要的事情(例如,从一个由名称指定的文件中读取数据,等等)。
下面是它的类型签名:
(int32 -> int32*)您可以看到它与process_data的相似之处。
而且,就像处理数据一样,现在我们可以使用federated_map操作符在所有客户机上生成数据:
@tff.federated_computation(tff.FederatedType(tf.int32, tff.CLIENTS))
def make_data_on_clients(federated_n):
return tff.federated_map(make_data, federated_n)这是类型签名:
({int32}@CLIENTS -> {int32*}@CLIENTS)很好,就像process_data_on_clients想要的那样,make_data_on_clients接受一个联合整数(它告诉我们每个客户端要生成多少个数据项),并返回一个联合数据集。
您可以检查这两者是否按预期一起工作:
federated_n = [2, 3, 4]
federated_ds = make_data_on_clients(federated_n)
result = process_data_on_clients(federated_ds)
result您应该会得到参与此计算的3个客户端上的1+2、1+2+3和1+2+3+4之和(请注意,上面的联合整数中有3个数字,因此这里有3个客户端):
[<tf.Tensor: shape=(), dtype=int32, numpy=3>,
<tf.Tensor: shape=(), dtype=int32, numpy=6>,
<tf.Tensor: shape=(), dtype=int32, numpy=10>]请注意,到目前为止,您看到的所有TF代码,包括数据集创建和数据集缩减,都是在客户机上执行的(使用federated_map)。
@tff.federated_computation(tff.FederatedType(tf.int32, tff.CLIENTS))
def make_and_process_data_on_clients(federated_n):
federated_ds = make_data_on_clients(federated_n)
return process_data_on_clients(federated_ds)现在,您可以一次性调用make和process数据组合:
make_and_process_data_on_clients(federated_n)同样,这里的所有TF代码都是在客户机上执行的,就像上面一样。
回到基思的解释,您从TFF得到的迭代过程需要一个联邦数据集作为输入,就像process_data_on_clients一样。
Keith示例中的函数get_dataset_for_client_id类似于我们的make_data,因为假设它包含您希望在每个客户端上运行的TensorFlow代码,以便在该客户端上以物理方式构造一个dataset。
在一个愚蠢的例子中,数据集构造逻辑使用了range,但它可以是任何东西。例如,您可以从相同的本地文件my_data加载每个客户端上的数据,或者使用自定义的TF,或者通过任何其他方法。就像在我们示例中一样,您可以将参数传递给该函数,以提供更集中的控制(类似于上面对联邦整数所做的任何操作)。
Keith示例中的代码截取程序new_next与我们的make_and_process_data_on_clients类似,因为它组合了两个联邦计算:一个在客户机上生成联邦数据(由您提供,就像这里讨论的那样),另一个处理数据(来自tff.learning,即迭代过程)。
这有帮助吗?
如果仍然不清楚,我建议在您的分布式设置上尝试上面包含的示例,因为您已经有了一个示例。您可以将一些TF打印操作注入到该代码中,以确认您编写的TF代码正在系统中的客户机上执行。
一旦您了解了这一部分,就可以简单地调整一下,将make_data中愚蠢的数据集构造逻辑替换为从所使用的任何本地数据源加载每个客户端上的数据集的逻辑。
编辑:
Re:如何打印,出现在@tff.tf_computation主体中的任何TensorFlow代码都是在急切模式下执行的,您可以使用标准的TensorFlow机制(如tf.print )从TensorFlow中打印。
tensorflow.org/api_docs/python/tf/print
有关如何配置具有多个工作节点的多机系统的信息,请参阅Kubernetes教程。请注意,驱动流程的机器连接到工作节点,而不是反过来。
https://www.tensorflow.org/federated/tutorials/high_performance_simulation_with_kubernetes
https://stackoverflow.com/questions/63723518
复制相似问题