V případě, že někdo hledá řešení jsem implementoval vlastní generátor rozšířením sekvence z tensorflow:
class custom_generator(tf.keras.utils.Sequence):
def __init__(self, ecg_path, eeg_path, batch_size, img_shape, shuffle=True, X_col='filename', Y_col='class'):
self.batch_size = batch_size
self.img_shape = img_shape
self.shuffle = shuffle
self.X_col = X_col
self.Y_col = Y_col
self.class_mapping = {"sz": 1, "non-sz": 0}
self.ecg_path = ecg_path
self.eeg_path = eeg_path
self.eeg_df, self.ecg_df = self.__generate_data()
self.len = len(self.eeg_df)
self.n_name = self.ecg_df[self.Y_col].nunique()
def __generate_data(self):
eeg_class_dist = inspect_class_distribution(self.eeg_path)
ecg_class_dist = inspect_class_distribution(self.ecg_path)
max_n_images = get_lowest_distr(ecg_class_dist, eeg_class_dist)
balanced_ecg_data = limit_data(self.ecg_path, max_n_images).sort_values(by=[self.Y_col]).reset_index(drop=True)
balanced_eeg_data = limit_data(self.eeg_path, max_n_images).sort_values(by=[self.Y_col]).reset_index(drop=True)
return shuffle_order_dataframes(balanced_eeg_data, balanced_ecg_data)
def on_epoch_end(self):
if shuffle:
self.ecg_df, self.eeg_df = self.__generate_data()
def __get_input(self, path, target_size):
image = tf.keras.preprocessing.image.load_img(path)
image_arr = tf.keras.preprocessing.image.img_to_array(image)
image_arr = tf.image.resize(image_arr,(target_size[0], target_size[1])).numpy()
return image_arr/255.
def __get_output(self, label, num_classes):
categoric_label = self.class_mapping[label]
return tf.keras.utils.to_categorical(categoric_label, num_classes=num_classes)
def __get_data(self, x1_batches):
eeg_path_batch = x1_batches[self.X_col]
ecg_path_batch = x1_batches[self.X_col]
label_batch = x1_batches[self.Y_col]
x1_batch = np.asarray([self.__get_input(x, self.img_shape) for x in eeg_path_batch])
x2_batch = np.asarray([self.__get_input(x, self.img_shape) for x in ecg_path_batch])
y_batch = np.asarray([self.__get_output(y, self.n_name) for y in label_batch])
return tuple([x1_batch, x2_batch]), y_batch
def __getitem__(self, index):
n_batches = self.eeg_df[index * self.batch_size:(index + 1) * self.batch_size]
X, y = self.__get_data(n_batches)
return X, y
def __len__(self):
return self.len // self.batch_size
on_epoch_end je klíčem zde.