我想实现我自己的自定义数据生成器,用于在我从keras构建的使用函数api构建的多输入keras模型上。
我读过很多关于序列类以及如何扩展它的功能的文章。
我的数据集严重失衡,包含3个类。

我想要实现的是构建一个自定义的datagenerator,它使用的是来自datagenerator的流。此数据文件包含图像的路径。通过限制来自过度表示的类目录的图像路径的数量,我可以成功地欠采样,从而平衡数据集。
Dataframe结构:

然而,我遗漏的其余图像仍然包含丰富的信息,我希望我的模型学习。
是否可以使用类似于回调"onepochend“的方法来调用我的imagedatagenerator中的一个函数,该函数替换了dataframe中的旧路径并替换为随机选择的新路径?
回调角文件:https://www.tensorflow.org/api_docs/python/tf/keras/callbacks/Callback
生成器类文档:https://www.tensorflow.org/api_docs/python/tf/keras/utils/Sequence
勾勒出我的想法:

或者tensorflow/keras有什么能实现这一点的东西呢?
发布于 2021-12-10 13:53:08
如果有人在寻找这方面的解决方案,我已经通过从tensorflow扩展序列实现了一个自定义生成器:
class custom_generator(tf.keras.utils.Sequence):
def __init__(self, ecg_path, eeg_path, batch_size, img_shape, shuffle=True, X_col='filename', Y_col='class'):
self.batch_size = batch_size
self.img_shape = img_shape
self.shuffle = shuffle
self.X_col = X_col
self.Y_col = Y_col
self.class_mapping = {"sz": 1, "non-sz": 0}
self.ecg_path = ecg_path
self.eeg_path = eeg_path
self.eeg_df, self.ecg_df = self.__generate_data()
self.len = len(self.eeg_df)
self.n_name = self.ecg_df[self.Y_col].nunique()
def __generate_data(self):
eeg_class_dist = inspect_class_distribution(self.eeg_path)
ecg_class_dist = inspect_class_distribution(self.ecg_path)
max_n_images = get_lowest_distr(ecg_class_dist, eeg_class_dist)
balanced_ecg_data = limit_data(self.ecg_path, max_n_images).sort_values(by=[self.Y_col]).reset_index(drop=True)
balanced_eeg_data = limit_data(self.eeg_path, max_n_images).sort_values(by=[self.Y_col]).reset_index(drop=True)
return shuffle_order_dataframes(balanced_eeg_data, balanced_ecg_data)
def on_epoch_end(self):
if shuffle:
self.ecg_df, self.eeg_df = self.__generate_data()
def __get_input(self, path, target_size):
image = tf.keras.preprocessing.image.load_img(path)
image_arr = tf.keras.preprocessing.image.img_to_array(image)
image_arr = tf.image.resize(image_arr,(target_size[0], target_size[1])).numpy()
return image_arr/255.
def __get_output(self, label, num_classes):
categoric_label = self.class_mapping[label]
return tf.keras.utils.to_categorical(categoric_label, num_classes=num_classes)
def __get_data(self, x1_batches):
eeg_path_batch = x1_batches[self.X_col]
ecg_path_batch = x1_batches[self.X_col]
label_batch = x1_batches[self.Y_col]
x1_batch = np.asarray([self.__get_input(x, self.img_shape) for x in eeg_path_batch])
x2_batch = np.asarray([self.__get_input(x, self.img_shape) for x in ecg_path_batch])
y_batch = np.asarray([self.__get_output(y, self.n_name) for y in label_batch])
return tuple([x1_batch, x2_batch]), y_batch
def __getitem__(self, index):
n_batches = self.eeg_df[index * self.batch_size:(index + 1) * self.batch_size]
X, y = self.__get_data(n_batches)
return X, y
def __len__(self):
return self.len // self.batch_sizeon_epoch_end是这里的关键。
https://stackoverflow.com/questions/70057651
复制相似问题