首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用tf.from_generator并行化tf.contrib.data.parallel_interleave

使用tf.from_generator并行化tf.contrib.data.parallel_interleave
EN

Stack Overflow用户
提问于 2018-09-05 07:50:00
回答 1查看 2.3K关注 0票数 7

我有一堆JSON数组文件(准确地说是AVRO),每个文件都会产生多个训练Keras模型的样本。利用@GPhilo@jsimsa的思想,我能够将输入管道并行化。无法解决如何设计generator(n)来划分处理文件的工作。代码在parse_file(f)内部失败,因为函数需要字符串文件路径,而不是Tensor

代码语言:javascript
复制
N = num_cores = 2
files_to_process = ["f1.avro", "f2.avro", "f3.avro"]
shuffle_size = prefetch_buffer = 1000
batch_size = 512

def generator(n):
    size = math.ceil(len(files_to_process) / N)
    start_index = n * size
    end_index = start_index + size

    def gen():
        # for f in files_to_process[start_index:end_index]:
        for f in tf.slice(files_to_process, start_index, size):
            yield f

    return gen

def dataset(n):
    return tf.data.Dataset.from_generator(generator(n), (tf.string,))

def process_file(f):
    examples_x, examples_y = parse_file(f)
    return examples_x, examples_y

ds = tf.data.Dataset.range(N)
ds = ds.apply(tf.contrib.data.parallel_interleave(dataset, cycle_length=N))
ds = ds.map(process_file, num_parallel_calls=N)
ds = ds.prefetch(prefetch_buffer)
ds = ds.flat_map(lambda *x: tf.data.Dataset.from_tensor_slices(x))
ds = ds.batch(batch_size).shuffle(shuffle_size)

...
myTfKerasModel.fit( ds.make_one_iterator(), NUM_TRAIN_SAMPLES // batch_size )
  • 这里设计generator(n)的正确方法是什么?
  • 这是使用parallel_interleaveflat_map设计输入管道的优化方法吗?
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-09-05 13:05:22

在我看来,你把你的生活和发电机搞得太复杂了。这就是我如何实现您的输入管道:

代码语言:javascript
复制
def parse_file_tf(filename):
    return tf.py_func(parse_file, [filename], [tf.float32, tf.float32])

# version with map
files = tf.data.Dataset.from_tensor_slices(files_to_process)
dataset = files.map(parse_file_tf, num_parallel_calls=N)
dataset = dataset.flat_map(lambda *x: tf.data.Dataset.from_tensor_slices(x))
dataset = dataset.batch(batch_size).shuffle(shuffle_size).prefetch(2)
it = dataset.make_one_shot_iterator()

为了测试它,我将虚拟parse_file定义为:

代码语言:javascript
复制
i=0
def parse_file(f):
    global i
    i += 1
    return np.asarray([i]*i, dtype=np.float32), np.asarray([i]*i, dtype=np.float32) # mimicks variable-length examples_x, examples_y

我将其输入一个基本循环,该循环显示迭代器返回的内容:

代码语言:javascript
复制
sess = tf.Session()
try:
    while True:
        x, y = it.get_next()
        vx, vy = sess.run([x,y])
        print(vx)
        print(vy)
except tf.errors.OutOfRangeError:
    pass
sess.close()

运行上面的代码打印:

代码语言:javascript
复制
[2. 3. 2. 1. 3. 3.]
[2. 3. 2. 1. 3. 3.]

对管道的解释

本质上,我把并行化问题留给了map,在那里我可以传递它应该运行的线程数。不需要发电机迭代范围和那些额外的复杂。

我选择map而不是parallel_interleave,因为后者要求您为它返回的每一项生成一个Dataset实例,在您的示例中,这是没有意义的,因为在运行parse_file时,您已经在内存中加载了所有值。如果您缓慢地生成值(例如,通过将parallel_interleave应用于文件名列表),那么parallel_interleave是有意义的,但是如果您的数据集适合于内存,则选择map

关于tf.py_func的限制,它们不会影响您受过训练的网络,只会影响输入管道。理想情况下,您将有一个不同的管道为您的培训和您的最后使用的网络。您只需要在后面的过程中处理这些限制,而对于培训(除非您对分布式培训和/或跨机器移动培训做了非常具体的事情),您是相当安全的。

带生成器的版本

如果您的JSON文件非常大,并且它们的内容不能存储在内存中,则可以使用生成器,但与您开始使用的方法略有不同。其思想是,生成器一次遍历JSON文件和yield的一个记录。然后,生成器必须是您的parse_file函数。作为一个例子,让我们假设您有以下parse_file生成器:

代码语言:javascript
复制
i = 3
def parse_file(filename):
    global i
    i += 1
    ctr = 0
    while ctr < i:
        yield ctr, ctr

在这种情况下,管道如下所示:

代码语言:javascript
复制
def wrap_generator(filename):
    return tf.data.Dataset.from_generator(parse_file(filename), [tf.int32, tf.int32])

files = tf.data.Dataset.from_tensor_slices(files_to_process)
dataset = files.apply(tf.contrib.data.parallel_interleave(wrap_generator, cycle_length=N))
dataset = dataset.flat_map(lambda *x: tf.data.Dataset.from_tensor_slices(x))
dataset = dataset.shuffle(shuffle_size).batch(batch_size).prefetch(2)
it = dataset.make_one_shot_iterator()

注意,这里我们需要使用parallel_interleave,因为我们将生成器转换为Dataset实例,从中提取值。其余的都保持不变。

将此输入到与上面的打印相同的示例循环:

代码语言:javascript
复制
[6. 5. 4. 4. 6. 5. 6. 6. 5. 4. 6. 4. 5. 5. 6.]
[6. 5. 4. 4. 6. 5. 6. 6. 5. 4. 6. 4. 5. 5. 6.]
票数 8
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52179857

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档