| import warnings |
| warnings.simplefilter('ignore') |
| import numpy as np |
| import pandas as pd |
| from tqdm import tqdm |
| from sklearn import metrics |
| import transformers |
| import torch |
| from torch.utils.data import Dataset, DataLoader, RandomSampler, SequentialSampler |
| from transformers import DistilBertTokenizer, DistilBertModel |
| import logging |
| logging.basicConfig(level=logging.ERROR) |
|
|
| |
|
|
| from torch import cuda |
| device = 'cuda' if cuda.is_available() else 'cpu' |
|
|
| def hamming_score(y_true, y_pred, normalize=True, sample_weight=None): |
| acc_list = [] |
| for i in range(y_true.shape[0]): |
| set_true = set( np.where(y_true[i])[0] ) |
| set_pred = set( np.where(y_pred[i])[0] ) |
| tmp_a = None |
| if len(set_true) == 0 and len(set_pred) == 0: |
| tmp_a = 1 |
| else: |
| tmp_a = len(set_true.intersection(set_pred))/\ |
| float( len(set_true.union(set_pred)) ) |
| acc_list.append(tmp_a) |
| return np.mean(acc_list) |
|
|
| data = pd.read_csv('Vulnerable code dataset 15_12_22 - Training.csv') |
| |
| new_df = pd.DataFrame() |
| new_df['text'] = data['text'] |
| new_df['labels'] = data['label'] |
| new_df.head() |
|
|
|
|
| |
|
|
| |
| MAX_LEN = 128 |
| TRAIN_BATCH_SIZE = 4 |
| VALID_BATCH_SIZE = 4 |
| EPOCHS = 1 |
| LEARNING_RATE = 1e-05 |
| tokenizer = DistilBertTokenizer.from_pretrained('distilbert-base-uncased', truncation=True, do_lower_case=True) |
|
|
| class MultiLabelDataset(Dataset): |
|
|
| def __init__(self, dataframe, tokenizer, max_len): |
| self.tokenizer = tokenizer |
| self.data = dataframe |
| self.text = dataframe.text |
| self.targets = self.data.labels |
| self.max_len = max_len |
|
|
| def __len__(self): |
| return len(self.text) |
|
|
| def __getitem__(self, index): |
| text = str(self.text[index]) |
| text = " ".join(text.split()) |
|
|
| inputs = self.tokenizer.encode_plus( |
| text, |
| None, |
| add_special_tokens=True, |
| max_length=self.max_len, |
| pad_to_max_length=True, |
| return_token_type_ids=True |
| ) |
| ids = inputs['input_ids'] |
| mask = inputs['attention_mask'] |
| token_type_ids = inputs["token_type_ids"] |
|
|
|
|
| return { |
| 'ids': torch.tensor(ids, dtype=torch.long), |
| 'mask': torch.tensor(mask, dtype=torch.long), |
| 'token_type_ids': torch.tensor(token_type_ids, dtype=torch.long), |
| 'targets': torch.tensor(self.targets[index], dtype=torch.float) |
| } |
|
|
| train_size = 0.8 |
| train_data=new_df.sample(frac=train_size,random_state=200) |
| test_data=new_df.drop(train_data.index).reset_index(drop=True) |
| train_data = train_data.reset_index(drop=True) |
|
|
|
|
| print("FULL Dataset: {}".format(new_df.shape)) |
| print("TRAIN Dataset: {}".format(train_data.shape)) |
| print("TEST Dataset: {}".format(test_data.shape)) |
|
|
| training_set = MultiLabelDataset(train_data, tokenizer, MAX_LEN) |
| testing_set = MultiLabelDataset(test_data, tokenizer, MAX_LEN) |
|
|
| train_params = {'batch_size': TRAIN_BATCH_SIZE, |
| 'shuffle': True, |
| 'num_workers': 0 |
| } |
|
|
| test_params = {'batch_size': VALID_BATCH_SIZE, |
| 'shuffle': True, |
| 'num_workers': 0 |
| } |
|
|
| training_loader = DataLoader(training_set, **train_params) |
| testing_loader = DataLoader(testing_set, **test_params) |
|
|
| |
|
|
| class DistilBERTClass(torch.nn.Module): |
| def __init__(self): |
| super(DistilBERTClass, self).__init__() |
| self.l1 = DistilBertModel.from_pretrained("distilbert-base-uncased") |
| self.pre_classifier = torch.nn.Linear(768, 768) |
| self.dropout = torch.nn.Dropout(0.1) |
| self.classifier = torch.nn.Linear(768, 6) |
|
|
| def forward(self, input_ids, attention_mask, token_type_ids): |
| output_1 = self.l1(input_ids=input_ids, attention_mask=attention_mask) |
| hidden_state = output_1[0] |
| pooler = hidden_state[:, 0] |
| pooler = self.pre_classifier(pooler) |
| pooler = torch.nn.Tanh()(pooler) |
| pooler = self.dropout(pooler) |
| output = self.classifier(pooler) |
| return output |
|
|
| model = DistilBERTClass() |
| model.to(device) |
|
|
| def loss_fn(outputs, targets): |
| return torch.nn.BCEWithLogitsLoss()(outputs, targets) |
|
|
| optimizer = torch.optim.Adam(params = model.parameters(), lr=LEARNING_RATE) |
|
|
| def train(epoch): |
| model.train() |
| for _,data in tqdm(enumerate(training_loader, 0)): |
| ids = data['ids'].to(device, dtype = torch.long) |
| mask = data['mask'].to(device, dtype = torch.long) |
| token_type_ids = data['token_type_ids'].to(device, dtype = torch.long) |
| targets = data['targets'].to(device, dtype = torch.float) |
|
|
| outputs = model(ids, mask, token_type_ids) |
|
|
| optimizer.zero_grad() |
| loss = loss_fn(outputs, targets) |
| if _%5000==0: |
| print(f'Epoch: {epoch}, Loss: {loss.item()}') |
|
|
| loss.backward() |
| optimizer.step() |
|
|
| for epoch in range(EPOCHS): |
| train(epoch) |
|
|
| def validation(testing_loader): |
| model.eval() |
| fin_targets=[] |
| fin_outputs=[] |
| with torch.no_grad(): |
| for _, data in tqdm(enumerate(testing_loader, 0)): |
| ids = data['ids'].to(device, dtype = torch.long) |
| mask = data['mask'].to(device, dtype = torch.long) |
| token_type_ids = data['token_type_ids'].to(device, dtype = torch.long) |
| targets = data['targets'].to(device, dtype = torch.float) |
| outputs = model(ids, mask, token_type_ids) |
| fin_targets.extend(targets.cpu().detach().numpy().tolist()) |
| fin_outputs.extend(torch.sigmoid(outputs).cpu().detach().numpy().tolist()) |
| return fin_outputs, fin_targets |
|
|
| outputs, targets = validation(testing_loader) |
|
|
| final_outputs = np.array(outputs) >=0.5 |
|
|
| val_hamming_loss = metrics.hamming_loss(targets, final_outputs) |
| val_hamming_score = hamming_score(np.array(targets), np.array(final_outputs)) |
|
|
| print(f"Hamming Score = {val_hamming_score}") |
| print(f"Hamming Loss = {val_hamming_loss}") |