我正在训练一个自动编码器网络,用于对大型数据集上的多变量时间序列进行编码。我上传了一个完整的工作示例作为gist。
即使它正在工作,我也必须在内存效率极低或速度缓慢的解决方案之间做出选择。我想更快地得到我的内存效率解决方案。
在我的内存低效设置中,我像在plain_dataset中一样准备训练集。即通过在整个数据集上实现滑动窗口。(在我的真实训练设置中,它们高度重叠)
相反,我希望将我的训练集定义为一个(dataset_index, row_index, size)元组列表,就像在idx_dataset中一样。在每个训练步骤之前,该引用被解析并且实际的训练示例被编译。
我将时间序列数据集转换为RaggedTensor rt,以便能够以图形模式访问数据。然后,作为custom_fit函数的一部分,我在resolve_index_batch中实现了索引解析。
我的希望是,与实际的训练步骤相比,编写这个训练示例相当便宜,但当使用索引训练集时,吞吐量几乎是一半。
有什么想法可以让resolve_index_batch函数更高效吗?
@tf.function
def resolve_index_batch(idx_batch):
"""
param idx_batch: (32x3) int32 tensor. Each row containing [time_series_idx, row_idx, window_size]
"""
return tf.map_fn(fn=lambda idx: tf.slice(rt[idx[0]], [idx[1], 0], [idx[2], -1]), elems=idx_batch, fn_output_signature=tf.float32)
@tf.function
def train_step(batch):
if mode == 'idx':
# apply the conversion from indexes batch to data_series batch
batch = resolve_index_batch(batch)
# train on reversed time series
batch_flip = tf.reverse(batch, [1])
with tf.GradientTape() as tape:
m = model(batch, training=True)
loss = loss_fun(batch_flip, m)
grad = tape.gradient(loss, model.trainable_weights)
opt.apply_gradients(zip(grad, model.trainable_weights))
return loss发布于 2021-08-17 04:37:25
要提高resolve_index_batch速度,请使用tf.gather取代tf.map_fn。tf.map_fn只是一个陷阱,很多人都落入了它的陷阱,而且它在图形处理器上的表现非常差。tf.gather是向量化您的操作的正确方法。
示例代码:
@tf.function
def resolve_index_batch_fast(idx_batch):
"""
note that window length should be a constant, which is the case in your gist codes WINDOW_LEN = 14
"""
batch_size = idx_batch.shape[0] #if batch size is constant, this line can be optimized as well
return tf.gather(tf.gather(rt,idx_batch[:,0]),tf.tile(tf.range(window_length)[None,:],[batch_size,1])+idx_batch[:,1:2],batch_dims=1)在我的实验中,它至少比图形处理器中的resolve_index_batch快几倍,这取决于批处理的大小。批处理大小越大,加速比越大。在CPU中,tf.map_fn运行得相当好。
为了比较普通方法和索引方法的性能,让数据如此之小,以至于整个rt都可以放入图形处理器内存中,这是没有意义的。在用于深度学习的实际问题大小中,整个rt应该更大,并且只能驻留在中央处理器内存甚至固态硬盘中。
因此,首先要做的是确保rt在CPU内存中,方法如下:
def series_ragged_tensor():
with tf.device('/CPU:0'):
rt = tf.RaggedTensor.from_row_lengths(tf.concat(data_series(), axis=0), [ser.shape[0] for ser in data_series()])
return rt第二,在CPU中异步进行数据准备。def idx_dataset()内部
return tf.data.Dataset.from_tensor_slices(arr).batch(32).map(resolve_index_batch).repeat().prefetch(tf.data.AUTOTUNE)第三,将resolve_index_batch定义和rt = series_ragged_tensor()移到正确的位置,并相应地使索引模式和普通模式的train_step相同。
https://stackoverflow.com/questions/68800775
复制相似问题