Spaces:
Build error
Build error
| import os | |
| from itertools import cycle | |
| import matplotlib | |
| import tensorflow as tf | |
| # π¬ NOTE: Handle plots issues when running tests or displaying in notebooks | |
| try: | |
| get_ipython # Only exists in Jupyter | |
| matplotlib.use("module://matplotlib_inline.backend_inline") | |
| except Exception: | |
| matplotlib.use("Agg") # Fix error with tests | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| import pandas as pd | |
| import seaborn as sns | |
| from sklearn.metrics import ( | |
| accuracy_score, | |
| classification_report, | |
| confusion_matrix, | |
| f1_score, | |
| precision_score, | |
| recall_score, | |
| roc_auc_score, | |
| roc_curve, | |
| ) | |
| from sklearn.preprocessing import LabelEncoder | |
| from sklearn.utils.class_weight import compute_class_weight | |
| from tensorflow.keras import Input, Model | |
| from tensorflow.keras.callbacks import EarlyStopping | |
| from tensorflow.keras.layers import BatchNormalization, Concatenate, Dense, Dropout | |
| from tensorflow.keras.losses import CategoricalCrossentropy | |
| from tensorflow.keras.optimizers import SGD, Adam | |
| from tensorflow.keras.utils import Sequence | |
| class MultimodalDataset(Sequence): | |
| """ | |
| Custom Keras Dataset class for multimodal data handling, designed for models that | |
| take both text and image data as inputs. It facilitates batching and shuffling | |
| of data for efficient training in Keras models. | |
| This class supports loading and batching multimodal data (text and images), as well as handling | |
| label encoding. It is compatible with Keras and can be used to train models that require both | |
| text and image inputs. It also supports optional shuffling at the end of each epoch for better | |
| training performance. | |
| Args: | |
| df (pd.DataFrame): The DataFrame containing the dataset with text, image, and label columns. | |
| text_cols (list): List of column names corresponding to text data. Can be a single column or multiple columns. | |
| image_cols (list): List of column names corresponding to image data (usually file paths or image pixel data). | |
| label_col (str): Column name corresponding to the target labels. | |
| encoder (LabelEncoder, optional): A pre-fitted LabelEncoder instance for encoding the labels. | |
| If None, a new LabelEncoder is fitted based on the provided data. | |
| batch_size (int, optional): Number of samples per batch. Default is 32. | |
| shuffle (bool, optional): Whether to shuffle the dataset at the end of each epoch. Default is True. | |
| Attributes: | |
| text_data (np.ndarray): Array of text data from the DataFrame. None if `text_cols` is not provided. | |
| image_data (np.ndarray): Array of image data from the DataFrame. None if `image_cols` is not provided. | |
| labels (np.ndarray): One-hot encoded labels corresponding to the dataset's classes. | |
| encoder (LabelEncoder): Fitted LabelEncoder used to encode target labels. | |
| batch_size (int): Number of samples per batch. | |
| shuffle (bool): Flag indicating whether to shuffle the data after each epoch. | |
| indices (np.ndarray): Array of indices representing the dataset. Used for shuffling batches. | |
| Methods: | |
| ------- | |
| __len__(): | |
| Returns the number of batches per epoch based on the dataset size and batch size. | |
| __getitem__(idx): | |
| Retrieves a single batch of data, including both text and image inputs and the corresponding labels. | |
| The method returns a tuple in the format ({'text': text_batch, 'image': image_batch}, label_batch), | |
| where 'text' and 'image' are only included if their respective columns were provided. | |
| on_epoch_end(): | |
| Updates the index order after each epoch, shuffling if needed. | |
| """ | |
| def __init__( | |
| self, | |
| df, | |
| text_cols, | |
| image_cols, | |
| label_col, | |
| encoder=None, | |
| batch_size=32, | |
| shuffle=True, | |
| ): | |
| """ | |
| Initializes the MultimodalDataset object. | |
| Args: | |
| df (pd.DataFrame): The dataset as a DataFrame, containing text, image, and label data. | |
| text_cols (list): List of column names representing text features. | |
| image_cols (list): List of column names representing image features (e.g., file paths or pixel data). | |
| label_col (str): Column name corresponding to the target labels. | |
| encoder (LabelEncoder, optional): LabelEncoder for encoding the target labels. If None, a new LabelEncoder will be created. | |
| batch_size (int, optional): Batch size for loading data. Default is 32. | |
| shuffle (bool, optional): Whether to shuffle the data at the end of each epoch. Default is True. | |
| Raises: | |
| ValueError: If both text_cols and image_cols are None or empty. | |
| """ | |
| if text_cols: | |
| # Get the text data from the DataFrame as a NumPy array | |
| self.text_data = df[text_cols].astype(np.float32).values | |
| else: | |
| # Else, set text data to None | |
| self.text_data = None | |
| if image_cols: | |
| # Get the image data from the DataFrame as a NumPy array | |
| self.image_data = df[image_cols].astype(np.float32).values | |
| else: | |
| # Else, set image data to None | |
| self.image_data = None | |
| if not text_cols and not image_cols: | |
| raise ValueError( | |
| "At least one of text_cols or image_cols must be provided." | |
| ) | |
| # Get the labels from the DataFrame and encode them | |
| self.labels = df[label_col].values | |
| # Use provided encoder or fit a new one | |
| if encoder is None: | |
| self.encoder = LabelEncoder() | |
| self.labels = self.encoder.fit_transform(self.labels) | |
| else: | |
| self.encoder = encoder | |
| self.labels = self.encoder.transform(self.labels) | |
| # One-hot encode labels for multi-class classification | |
| num_classes = len(self.encoder.classes_) | |
| self.labels = np.eye(num_classes)[self.labels] | |
| self.batch_size = batch_size | |
| self.shuffle = shuffle | |
| self.on_epoch_end() | |
| def __len__(self): | |
| """ | |
| Returns the number of batches per epoch based on the dataset size and batch size. | |
| Returns: | |
| ------- | |
| int: | |
| The number of batches per epoch. | |
| """ | |
| return int(np.floor(len(self.labels) / self.batch_size)) | |
| def __getitem__(self, idx): | |
| """ | |
| Retrieves a single batch of data (text and/or image) and the corresponding labels. | |
| Args: | |
| idx (int): Index of the batch to retrieve. | |
| Returns: | |
| ------- | |
| tuple: | |
| A tuple containing the batch of text and/or image inputs and the corresponding labels. | |
| The input data is returned as a dictionary with keys 'text' and 'image', depending on the provided columns. | |
| If no text or image columns were provided, only the other is returned. | |
| """ | |
| indices = self.indices[idx * self.batch_size : (idx + 1) * self.batch_size] | |
| if self.text_data is not None: | |
| text_batch = self.text_data[indices] | |
| if self.image_data is not None: | |
| image_batch = self.image_data[indices] | |
| label_batch = self.labels[indices] | |
| if self.text_data is None: | |
| return {"image": image_batch}, label_batch | |
| if self.image_data is None: | |
| return {"text": text_batch}, label_batch | |
| else: | |
| return {"text": text_batch, "image": image_batch}, label_batch | |
| def on_epoch_end(self): | |
| """ | |
| Updates the index order after each epoch, shuffling the data if needed. | |
| This method is called at the end of each epoch and will shuffle the data if the `shuffle` flag is set to True. | |
| """ | |
| self.indices = np.arange(len(self.labels)) | |
| if self.shuffle: | |
| np.random.shuffle(self.indices) | |
| # Early Fusion Model | |
| def create_early_fusion_model( | |
| text_input_size, image_input_size, output_size, hidden=[128], p=0.2 | |
| ): | |
| """ | |
| Creates a multimodal early fusion model combining text and image inputs. The model concatenates the text and | |
| image features, passes them through fully connected layers with optional dropout and batch normalization, | |
| and produces a multi-class classification output. | |
| Args: | |
| text_input_size (int): Size of the input vector for the text data. | |
| image_input_size (int): Size of the input vector for the image data. | |
| output_size (int): Number of classes for the output layer (i.e., size of the softmax output). | |
| hidden (int or list, optional): Specifies the number of hidden units in the dense layers. | |
| If an integer, a single dense layer with the specified units is created. | |
| If a list, multiple dense layers are created with the respective units. Default is [128]. | |
| p (float, optional): Dropout rate to apply after each dense layer. Default is 0.2. | |
| Returns: | |
| Model (keras.Model): A compiled Keras model with text and image inputs and a softmax output for classification. | |
| Model Architecture: | |
| - The model accepts two inputs: one for text features and one for image features. | |
| - The features are concatenated into a single vector. | |
| - Dense layers with ReLU activation are applied, followed by dropout and batch normalization (if multiple hidden layers are specified). | |
| - The output layer uses a softmax activation for multi-class classification. | |
| Example: | |
| model = create_early_fusion_model(text_input_size=300, image_input_size=2048, output_size=10, hidden=[128, 64], p=0.3) | |
| model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy']) | |
| """ | |
| if text_input_size is None and image_input_size is None: | |
| raise ValueError( | |
| "At least one of text_input_size and image_input_size must be provided." | |
| ) | |
| # Define inputs | |
| if text_input_size is not None: | |
| # Define text input layer for only text data | |
| text_input = Input(shape=(text_input_size,), name="text") | |
| if image_input_size is not None: | |
| # Define image input layer for only image data | |
| image_input = Input(shape=(image_input_size,), name="image") | |
| # Merge or select inputs | |
| if text_input_size is not None and image_input_size is not None: | |
| # Concatenate text and image inputs if both are provided | |
| x = Concatenate(name="fusion_layer")([text_input, image_input]) | |
| elif text_input_size is not None: | |
| x = text_input | |
| elif image_input_size is not None: | |
| x = image_input | |
| # Hidden layers | |
| if isinstance(hidden, int): | |
| # Add a single dense layer, activation, dropout and normalization | |
| x = Dense(hidden, activation="relu")(x) | |
| x = Dropout(p)(x) | |
| x = BatchNormalization()(x) | |
| elif isinstance(hidden, list): | |
| for h in hidden: | |
| # Add multiple dense layers based on the hidden list, activation, dropout and normalization | |
| x = Dense(h, activation="relu")(x) | |
| x = Dropout(p)(x) | |
| x = BatchNormalization()(x) | |
| # Output layer | |
| # Add the output layer with softmax activation | |
| output = Dense(output_size, activation="softmax", name="output")(x) | |
| # Create the model | |
| if text_input_size is not None and image_input_size is not None: | |
| # Define the model with both text and image inputs | |
| model = Model(inputs=[text_input, image_input], outputs=output) | |
| elif text_input_size is not None: | |
| # Define the model with only text input | |
| model = Model(inputs=text_input, outputs=output) | |
| elif image_input_size is not None: | |
| # Define the model with only image input | |
| model = Model(inputs=image_input, outputs=output) | |
| else: | |
| raise ValueError( | |
| "At least one of text_input_size and image_input_size must be provided." | |
| ) | |
| return model | |
| def test_model(y_test, y_pred, y_prob=None, encoder=None): | |
| """ | |
| Evaluates a trained model's performance using various metrics such as accuracy, precision, recall, F1-score, | |
| and visualizations including a confusion matrix and ROC curves. | |
| Args: | |
| y_test (np.ndarray): Ground truth one-hot encoded labels for the test data. | |
| y_pred (np.ndarray): Predicted class labels by the model for the test data (after argmax transformation). | |
| y_prob (np.ndarray, optional): Predicted probabilities for each class from the model. Required for ROC curves. Default is None. | |
| encoder (LabelEncoder, optional): A fitted LabelEncoder instance used to inverse transform one-hot encoded and predicted labels to their original categorical form. | |
| Returns: | |
| accuracy (float): Accuracy score of the model on the test data. | |
| precision (float): Weighted precision score of the model on the test data. | |
| recall (float): Weighted recall score of the model on the test data. | |
| f1 (float): Weighted F1 score of the model on the test data. | |
| This function performs the following steps: | |
| - Inverse transforms the one-hot encoded `y_test` and predicted `y_pred` values to their original labels using the provided LabelEncoder. | |
| - Computes the confusion matrix and plots it as a heatmap using Seaborn. | |
| - If `y_prob` is provided, computes and plots the ROC curves for each class. | |
| - Prints the classification report, which includes precision, recall, F1-score, and support for each class. | |
| - Returns the overall accuracy, weighted precision, recall, and F1-score of the model. | |
| Visualizations: | |
| - Confusion Matrix: A heatmap of the confusion matrix comparing the true labels with the predicted labels. | |
| - ROC Curves: Plots ROC curves for each class if predicted probabilities are provided (`y_prob`). | |
| Example: | |
| accuracy, precision, recall, f1 = test_model(y_test, y_pred, y_prob, encoder) | |
| """ | |
| # Handle label decoding | |
| y_test_binarized = y_test | |
| y_test = encoder.inverse_transform(np.argmax(y_test, axis=1)) | |
| y_pred = encoder.inverse_transform(y_pred) | |
| cm = confusion_matrix(y_test, y_pred) | |
| fig, ax = plt.subplots(figsize=(15, 15)) | |
| sns.heatmap(cm, annot=True, cmap="Blues", fmt="g", ax=ax) | |
| plt.xlabel("Predicted") | |
| plt.ylabel("True") | |
| plt.title("Confusion Matrix") | |
| plt.show() | |
| if y_prob is not None: | |
| fig, ax = plt.subplots(figsize=(15, 15)) | |
| colors = cycle(["aqua", "darkorange", "cornflowerblue"]) | |
| for i, color in zip(range(y_prob.shape[1]), colors): | |
| fpr, tpr, _ = roc_curve(y_test_binarized[:, i], y_prob[:, i]) | |
| ax.plot(fpr, tpr, color=color, lw=2, label=f"Class {i}") | |
| ax.plot([0, 1], [0, 1], "k--") | |
| plt.title("ROC Curve") | |
| plt.ylabel("True Positive Rate") | |
| plt.xlabel("False Positive Rate") | |
| plt.legend() | |
| plt.show() | |
| cr = classification_report(y_test, y_pred) | |
| print(cr) | |
| accuracy = accuracy_score(y_test, y_pred) | |
| precision = precision_score(y_test, y_pred, average="weighted") | |
| recall = recall_score(y_test, y_pred, average="weighted") | |
| f1 = f1_score(y_test, y_pred, average="weighted") | |
| return accuracy, precision, recall, f1 | |
| def train_mlp( | |
| train_loader, | |
| test_loader, | |
| text_input_size, | |
| image_input_size, | |
| output_size, | |
| num_epochs=50, | |
| report=False, | |
| lr=0.001, | |
| set_weights=True, | |
| adam=False, | |
| p=0.0, | |
| seed=1, | |
| patience=40, | |
| save_results=True, | |
| train_model=True, | |
| test_mlp_model=True, | |
| ): | |
| """ | |
| Trains a multimodal early fusion model using both text and image data. | |
| The function handles the training process of the model by combining text and image features, | |
| computes class weights if needed, applies an optimizer (SGD or Adam), and implements early stopping | |
| to prevent overfitting. The model is evaluated on the test set, and key performance metrics are computed. | |
| Args: | |
| train_loader (MultimodalDataset): Keras-compatible data loader for the training set with both text and image data. | |
| test_loader (MultimodalDataset): Keras-compatible data loader for the test set with both text and image data. | |
| text_input_size (int): The size of the input vector for the text data. | |
| image_input_size (int): The size of the input vector for the image data. | |
| output_size (int): Number of output classes for the softmax layer. | |
| num_epochs (int, optional): Number of training epochs. Default is 50. | |
| report (bool, optional): Whether to generate a detailed classification report and display metrics. Default is False. | |
| lr (float, optional): Learning rate for the optimizer. Default is 0.001. | |
| set_weights (bool, optional): Whether to compute and apply class weights to handle imbalanced datasets. Default is True. | |
| adam (bool, optional): Whether to use the Adam optimizer instead of SGD. Default is False. | |
| p (float, optional): Dropout rate for regularization in the model. Default is 0.0. | |
| seed (int, optional): Seed for random number generators to ensure reproducibility. Default is 1. | |
| patience (int, optional): Number of epochs with no improvement on validation loss before early stopping. Default is 40. | |
| Returns: | |
| None | |
| Side Effects: | |
| - Trains the early fusion model and saves the best weights based on validation loss. | |
| - Generates plots showing the training and validation accuracy over epochs. | |
| - If `report` is True, calls `test_model` to print detailed evaluation metrics and plots. | |
| Training Process: | |
| - The function creates a fusion model combining text and image inputs. | |
| - Class weights are computed to balance the dataset if `set_weights` is True. | |
| - The model is trained using categorical cross-entropy loss and the chosen optimizer (Adam or SGD). | |
| - Early stopping is applied based on validation loss to prevent overfitting. | |
| - After training, the model is evaluated on the test set, and accuracy, F1-score, and AUC are calculated. | |
| Example: | |
| train_mlp(train_loader, test_loader, text_input_size=300, image_input_size=2048, output_size=10, num_epochs=30, lr=0.001, adam=True, report=True) | |
| Notes: | |
| - `train_loader` and `test_loader` should be instances of `MultimodalDataset` or compatible Keras data loaders. | |
| - If the dataset is imbalanced, setting `set_weights=True` is recommended to ensure better model performance on minority classes. | |
| """ | |
| if seed is not None: | |
| np.random.seed(seed) | |
| tf.random.set_seed(seed) | |
| # Create an early fusion model using the provided input sizes and output size | |
| model = create_early_fusion_model(text_input_size, image_input_size, output_size) | |
| # Compute class weights for imbalanced datasets | |
| class_weights = None | |
| if set_weights: | |
| class_indices = np.argmax(train_loader.labels, axis=1) | |
| # Compute class weights using the training labels | |
| weights = compute_class_weight( | |
| class_weight="balanced", | |
| classes=np.unique(class_indices), | |
| y=class_indices, | |
| ) | |
| class_weights = {i: w for i, w in enumerate(weights)} | |
| # Choose the loss function for multi-class classification | |
| loss = CategoricalCrossentropy() | |
| # Choose the optimizer | |
| if adam: | |
| # Use the Adam optimizer with the specified learning rate | |
| optimizer = Adam(learning_rate=lr) | |
| else: | |
| # Use the SGD optimizer with the specified learning rate | |
| optimizer = SGD(learning_rate=lr) | |
| # Compile the model with the chosen optimizer and loss function | |
| model.compile(optimizer=optimizer, loss=loss, metrics=["accuracy"]) | |
| # Define an early stopping callback with the specified patience | |
| early_stopping = EarlyStopping( | |
| monitor="val_loss", | |
| patience=patience, | |
| restore_best_weights=True, | |
| ) | |
| # Train the model using the training data and validation data | |
| history = None | |
| if train_model: | |
| # π Train the model | |
| history = model.fit( | |
| train_loader, | |
| validation_data=test_loader, | |
| epochs=num_epochs, | |
| class_weight=class_weights, | |
| callbacks=[early_stopping], | |
| verbose="1", | |
| ) | |
| if test_mlp_model: | |
| # π Test the model on the test set | |
| y_true, y_pred, y_prob = [], [], [] | |
| for batch in test_loader: | |
| features, labels = batch | |
| if len(features) == 1: | |
| text = features["text"] if "text" in features else features["image"] | |
| preds = model.predict(text) | |
| else: | |
| text, image = features["text"], features["image"] | |
| preds = model.predict([text, image]) | |
| y_true.extend(labels) | |
| y_pred.extend(np.argmax(preds, axis=1)) | |
| y_prob.extend(preds) | |
| y_true, y_pred, y_prob = np.array(y_true), np.array(y_pred), np.array(y_prob) | |
| test_accuracy = accuracy_score(np.argmax(y_true, axis=1), y_pred) | |
| f1 = f1_score(np.argmax(y_true, axis=1), y_pred, average="macro") | |
| auc_scores = roc_auc_score(y_true, y_prob, average="macro", multi_class="ovr") | |
| macro_auc = auc_scores | |
| plt.plot(history.history["accuracy"], label="Train Accuracy") | |
| plt.plot(history.history["val_accuracy"], label="Validation Accuracy") | |
| plt.xlabel("Epoch") | |
| plt.ylabel("Accuracy") | |
| plt.legend() | |
| plt.show() | |
| if report: | |
| test_model(y_true, y_pred, y_prob, encoder=train_loader.encoder) | |
| # π Store results in a dataframe and save in the results folder | |
| if text_input_size is not None and image_input_size is not None: | |
| model_type = "multimodal" | |
| elif text_input_size is not None: | |
| model_type = "text" | |
| elif image_input_size is not None: | |
| model_type = "image" | |
| if save_results: | |
| results = pd.DataFrame( | |
| {"Predictions": y_pred, "True Labels": np.argmax(y_true, axis=1)} | |
| ) | |
| # create results folder if it does not exist | |
| os.makedirs("results", exist_ok=True) | |
| results.to_csv(f"results/{model_type}_results.csv", index=False) | |
| # π Save the model | |
| models_dir = "trained_models" | |
| os.makedirs(models_dir, exist_ok=True) | |
| model_filename = os.path.join(models_dir, f"{model_type}_model") | |
| model.save(model_filename) | |
| print(f"β {model_type} model saved successfully") | |
| else: | |
| test_accuracy, f1, macro_auc = None, None, None | |
| return model, test_accuracy, f1, macro_auc | |