| import streamlit.components.v1 as components |
| import streamlit as st |
| from random import randrange, uniform |
| import pandas as pd |
| import logging |
| import numpy as np |
| import random |
| from datetime import datetime, timedelta |
| from babel.numbers import format_currency |
|
|
| |
| COL_NAMES = [ |
| "Transaction date", |
| "Transaction type", |
| "Amount transferred", |
| "Sender's initial balance", |
| "Sender's new balance", |
| "Recipient's initial balance", |
| "Recipient's new balance", |
| "Sender exactly credited", |
| "Receiver exactly credited", |
| "Large amount", |
| "Frequent receiver", |
| "Merchant receiver", |
| "Sender ID", |
| "Receiver ID", |
| ] |
|
|
| |
| feature_texts = { |
| 0: "Date of transaction", |
| 1: "Amount transferred", |
| 2: "Initial balance of sender", |
| 3: "New balance of sender", |
| 4: "Initial balance of recipient", |
| 5: "New balance of recipient", |
| 6: "Sender's balance was exactly credited", |
| 7: "Receiver's balance was exactly credited", |
| 8: "Large amount", |
| 9: "Frequent receiver of transactions", |
| 10: "Receiver is merchant", |
| 11: "Sender ID", |
| 12: "Receiver ID", |
| 13: "Transaction type is Cash out", |
| 14: "Transaction type is Transfer", |
| 15: "Transaction type is Payment", |
| 16: "Transaction type is Cash in", |
| 17: "Transaction type is Debit", |
| } |
|
|
| |
| CATEGORIES = np.array(["CASH_OUT", "TRANSFER", "PAYMENT", "CASH_IN", "DEBIT"]) |
|
|
|
|
| |
| def transformation(input, categories): |
| new_x = input |
| cat = np.array(input[1]) |
| del new_x[1] |
| result_array = np.zeros(5, dtype=int) |
| match_index = np.where(categories == cat)[0] |
| result_array[match_index] = 1 |
| new_x.extend(result_array.tolist()) |
| python_objects = [ |
| np_type.item() if isinstance(np_type, np.generic) else np_type |
| for np_type in new_x |
| ] |
| return python_objects |
|
|
|
|
| |
| def get_request_body(datapoint): |
| data = datapoint.iloc[0].tolist() |
| instances = [int(x) if isinstance(x, (np.int32, np.int64)) else x for x in data] |
| request_body = {"instances": [instances]} |
| return request_body |
|
|
|
|
| |
| def get_explainability_texts(shap_values, feature_texts): |
| |
| positive_dict = {index: val for index, val in enumerate(shap_values) if val > 0} |
| |
| sorted_positive_indices = [ |
| index |
| for index, _ in sorted( |
| positive_dict.items(), key=lambda item: abs(item[1]), reverse=True |
| ) |
| ] |
| positive_texts = [feature_texts[x] for x in sorted_positive_indices] |
| positive_texts = positive_texts[2:] |
| sorted_positive_indices = sorted_positive_indices[2:] |
| if len(positive_texts) > 5: |
| positive_texts = positive_texts[:5] |
| sorted_positive_indices = sorted_positive_indices[:5] |
| return positive_texts, sorted_positive_indices |
|
|
|
|
| |
| |
| def random_past_date_from_last_year(): |
| one_year_ago = datetime.now() - timedelta(days=365) |
| random_days = random.randint(0, (datetime.now() - one_year_ago).days) |
| random_date = one_year_ago + timedelta(days=random_days) |
| return random_date.strftime("%Y-%m-%d") |
|
|
|
|
| |
| def get_explainability_values(pos_indices, data): |
| rounded_data = [ |
| round(value, 2) if isinstance(value, float) else value for value in data |
| ] |
| transformed_data = transformation(input=rounded_data, categories=CATEGORIES) |
| vals = [] |
| for idx in pos_indices: |
| if idx in range(6, 11) or idx in range(13, 18): |
| val = str(bool(transformed_data[idx])).capitalize() |
| else: |
| val = transformed_data[idx] |
| vals.append(val) |
| return vals |
|
|
|
|
| |
| def modify_datapoint( |
| datapoint, |
| ): |
| data = datapoint.iloc[0].tolist() |
| data[0] = random_past_date_from_last_year() |
| modified_amounts = data.copy() |
| if any(val > 12000 for val in data[2:7]): |
| modified_amounts[2:7] = [ |
| value / 100 if value != 0 else 0 for value in data[2:7] |
| ] |
| if any(val > 120000 for val in modified_amounts[2:7]): |
| new_list = [value / 10 if value != 0 else 0 for value in modified_amounts[2:7]] |
| modified_amounts[2:7] = new_list |
| rounded_data = [ |
| round(value, 2) if isinstance(value, float) else value |
| for value in modified_amounts |
| ] |
| rounded_data[2:7] = [ |
| format_currency(value, "EUR", locale="en_GB") for value in rounded_data[2:7] |
| ] |
| return rounded_data |
|
|
|
|
| |
| def get_weights(shap_values, sorted_indices, target_sum=0.95): |
| weights = [shap_values[x] for x in sorted_indices] |
| total_sum = sum(weights) |
| |
| scaled_values = [val * (target_sum / total_sum) for val in weights] |
| return scaled_values |
|
|
|
|
| |
| def get_fake_certainty(): |
| |
| fake_certainty = uniform(0.75, 0.99) |
| formatted_fake_certainty = "{:.2%}".format(fake_certainty) |
| return formatted_fake_certainty |
|
|
|
|
| |
| def get_random_suspicious_transaction(data): |
| suspicious_data = data[data["isFraud"] == 1] |
| max_n = len(suspicious_data) |
| random_nr = randrange(max_n) |
| suspicious_transaction = suspicious_data[random_nr - 1 : random_nr].drop( |
| "isFraud", axis=1 |
| ) |
| return suspicious_transaction |
|
|
|
|
| |
| def send_evaluation( |
| client, deployment_id, request_log_id, prediction_log_id, evaluation_input |
| ): |
| """Send evaluation to Deeploy.""" |
| try: |
| with st.spinner("Submitting response..."): |
| |
| client.evaluate( |
| deployment_id, |
| prediction_log_id, |
| evaluation_input |
| ) |
| return True |
| except Exception as e: |
| logging.error(e) |
| st.error( |
| "Failed to submit feedback." |
| + "Check whether you are using the right model URL and Token. " |
| + "Contact Deeploy if the problem persists." |
| ) |
| st.write(f"Error message: {e}") |
|
|
|
|
| |
| def get_model_url(): |
| """Get model url and retrieve workspace id and deployment id from it""" |
| model_url = st.text_area( |
| "Model URL (default is the demo deployment)", |
| "https://api.app.deeploy.ml/workspaces/708b5808-27af-461a-8ee5-80add68384c7/deployments/ac56dbdf-ba04-462f-aa70-5a0d18698e42/", |
| height=125, |
| ) |
| elems = model_url.split("/") |
| try: |
| workspace_id = elems[4] |
| deployment_id = elems[6] |
| except IndexError: |
| workspace_id = "" |
| deployment_id = "" |
| return model_url, workspace_id, deployment_id |
|
|
|
|
| |
| def get_comment_explanation(certainty, explainability_texts, explainability_values): |
| cleaned = [x.replace(":", "") for x in explainability_texts] |
| fi = [f"{cleaned[i]} is {x}" for i, x in enumerate(explainability_values)] |
| fi.insert(0, "Important suspicious features: ") |
| result = "\n".join(fi) |
| comment = f"Model certainty is {certainty}" + "\n" "\n" + result |
| return comment |
|
|
|
|
| |
| def create_data_input_table(data, col_names): |
| st.subheader("Transaction details") |
| data[7:12] = [bool(value) for value in data[7:12]] |
| rounded_list = [ |
| round(value, 2) if isinstance(value, float) else value for value in data |
| ] |
| df = pd.DataFrame({"Feature name": col_names, "Value": rounded_list}) |
| st.dataframe( |
| df, hide_index=True, width=475, height=35 * len(df) + 38 |
| ) |
|
|
|
|
| |
| def create_table(texts, values, weights, title): |
| df = pd.DataFrame( |
| {"Feature Explanation": texts, "Value": values, "Weight": weights} |
| ) |
| st.markdown(f"#### {title}") |
| st.dataframe( |
| df, |
| hide_index=True, |
| width=475, |
| column_config={ |
| "Weight": st.column_config.ProgressColumn( |
| "Weight", width="small", format="%.2f", min_value=0, max_value=1 |
| ) |
| }, |
| ) |
|
|
|
|
| |
| def ChangeButtonColour(widget_label, font_color, background_color="transparent"): |
| htmlstr = f""" |
| <script> |
| var elements = window.parent.document.querySelectorAll('button'); |
| for (var i = 0; i < elements.length; ++i) {{ |
| if (elements[i].innerText == '{widget_label}') {{ |
| elements[i].style.color ='{font_color}'; |
| elements[i].style.background = '{background_color}' |
| }} |
| }} |
| </script> |
| """ |
| components.html(f"{htmlstr}", height=0, width=0) |
|
|