Spaces:
Sleeping
Sleeping
| import logging | |
| import os | |
| import re | |
| from datetime import datetime, timedelta | |
| from argon2 import PasswordHasher | |
| from dotenv import load_dotenv | |
| from flask import Flask, request, jsonify, render_template | |
| from flask_sqlalchemy import SQLAlchemy | |
| from sqlalchemy.exc import IntegrityError | |
| # load environment vars from .env | |
| load_dotenv(".env") | |
| logger = logging.getLogger(__name__) | |
| app = Flask(__name__, template_folder="assets", static_folder="assets") | |
| app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///users.db" | |
| db = SQLAlchemy(app) | |
| hasher = PasswordHasher() | |
| HOST = os.environ.get("HOST", "0.0.0.0") | |
| PORT = int(os.environ.get("PORT", 5000)) | |
| DEBUG = True if str(os.environ.get("DEBUG", "true")).lower() == "true" else False | |
| MAX_LOGIN_ATTEMPTS = 10 | |
| LOCKOUT_DURATION = timedelta(seconds=60) | |
| COMMON_SWEAR_WORDS = [ | |
| "fuck", "shit", "bitch", "asshole", "bastard", "cunt", "dick", "cock", "pussy", | |
| "motherfucker", "wanker", "twat", "bollocks", "arsehole", "crap", "damn", "bugger", | |
| "bloody", "sod", "git", "idiot", "moron", "prick", "slut", "whore", "nigger", "retard" | |
| ] | |
| CHARACTER_SUBSTITUTIONS = { | |
| "0": "o", | |
| "1": "i", | |
| "3": "e", | |
| "4": "a", | |
| "5": "s", | |
| "6": "g", | |
| "7": "t", | |
| "8": "b", | |
| "9": "g", | |
| "@": "a", | |
| "$": "s", | |
| "!": "i" | |
| } | |
| def load_passwords_from_file(file_path: str) -> set: | |
| """Loads passwords from a file specified""" | |
| try: | |
| with open(file_path, 'r') as file: | |
| passwords = set(file.read().splitlines()) | |
| return passwords | |
| except Exception as e: | |
| logger.exception(f"Error occurred while retrieving passwords. {str(e)}") | |
| return set() | |
| WEAK_PASSWORDS = load_passwords_from_file('assets/weakpasswords.txt') | |
| BREACHED_PASSWORDS = load_passwords_from_file('assets/breachedpasswords.txt') | |
| BREACHED_PASSWORDS_LOWER = [p.lower() for p in BREACHED_PASSWORDS] | |
| # create a sqlite model for | |
| # storing user data | |
| class User(db.Model): | |
| id = db.Column(db.Integer, primary_key=True) | |
| username = db.Column(db.String(80), unique=True, nullable=False) | |
| hashed_password = db.Column(db.String(256), nullable=False) | |
| salt = db.Column(db.String(16), nullable=False) | |
| login_attempts = db.Column(db.Integer, default=0) | |
| last_login_time = db.Column(db.DateTime, default=datetime.utcnow) | |
| # create database tables | |
| with app.app_context(): | |
| db.create_all() | |
| # custom exception definitions | |
| class CredentialValidationError(Exception): | |
| def __init__(self, message): | |
| self.message = message | |
| super().__init__(message) | |
| def apply_character_substitutions(word): | |
| """Converts a char or number substituted | |
| word into its original representation""" | |
| for original, replacement in CHARACTER_SUBSTITUTIONS.items(): | |
| word = word.replace(original, replacement) | |
| return word | |
| def validate_username(username: str) -> None: | |
| """Validates the username by checking | |
| it against a pre-defined ruleset""" | |
| # 1. raise a validation exception if username | |
| # contains non-alphanumeric chars other than | |
| # the underscore | |
| if not re.match(r'^[a-zA-Z0-9_]+$', username): | |
| raise CredentialValidationError("Invalid username") | |
| # 2. raise a validation exception when the | |
| # username contains a commonly used swear word | |
| if any(word in username.lower() for word in COMMON_SWEAR_WORDS): | |
| raise CredentialValidationError("Username cannot contain swear words") | |
| # 3. raises when users attempt to bypass the | |
| # above rule using symbol and number substitutions | |
| if any(word in apply_character_substitutions(username.lower()) for word in COMMON_SWEAR_WORDS): | |
| raise CredentialValidationError( | |
| "Username contains a symbol or number substituted swear word" | |
| ) | |
| def validate_password(username: str, password: str) -> None: | |
| """Validates a user password according | |
| to the NISP password guidelines""" | |
| # 1. Length: At least 8 characters | |
| if len(password) < 8: | |
| raise CredentialValidationError( | |
| "The password must be at least 8 characters long" | |
| ) | |
| # 2. Complexity: Overly complex rules | |
| # will not be enforced | |
| # 3. Composition: Disallowing consequent | |
| # characters if consequent char count > 3 | |
| if bool(re.search(r'(.)\1\1+', password)): | |
| raise CredentialValidationError( | |
| "The password cannot contain 3 or more " | |
| "consequent repeated characters" | |
| ) | |
| # 4. Expiration: Password expiration is | |
| # not checked since it's not recommended | |
| # to frequently expire passwords | |
| # 5. Similarity to username: If exactly or | |
| # partially similar to the username, an | |
| # exception will be raised | |
| if username.lower() in password.lower(): | |
| raise CredentialValidationError( | |
| "The password cannot be similar to the username" | |
| ) | |
| if apply_character_substitutions(username.lower()) == apply_character_substitutions(password.lower()): | |
| raise CredentialValidationError( | |
| "The password cannot be similar to the username, " | |
| "even with character and number substitutions" | |
| ) | |
| # 6. Data Breaches and Weak Passwords: any weak | |
| # password or breached passwords are disallowed | |
| if password in WEAK_PASSWORDS: | |
| raise CredentialValidationError( | |
| "Password is too weak. Please try again with a strong password" | |
| ) | |
| if password in BREACHED_PASSWORDS: | |
| raise CredentialValidationError( | |
| "Found the password in an already breached password dictionary, " | |
| "thus not secure. Please try again with a strong password" | |
| ) | |
| if password.lower() in BREACHED_PASSWORDS_LOWER: | |
| raise CredentialValidationError( | |
| "Password is very similar to a password in an already breached " | |
| "password dictionary. Please try again with a strong password" | |
| ) | |
| def hash_password(password) -> tuple[str, ...]: | |
| try: | |
| salt = os.urandom(16) | |
| hashed_password = hasher.hash(password + salt.hex()) | |
| return hashed_password, salt.hex() | |
| except Exception as e: | |
| logger.exception("Couldn't hash the password") | |
| raise e | |
| def verify_password(hashed_password, password, salt) -> bool: | |
| try: | |
| hasher.verify(hashed_password, password + salt) | |
| return True | |
| except Exception as e: | |
| logger.exception(f"Couldn't verify the password. {str(e)}") | |
| return False | |
| def homepage(): | |
| return render_template("index.html") | |
| def enroll_page(): | |
| return render_template("enroll.html") | |
| def enroll(): | |
| """Enrolling new users""" | |
| try: | |
| data = request.get_json() | |
| username = data.get("username") | |
| password = data.get("password") | |
| validate_username(username=username) | |
| validate_password(username=username, password=password) | |
| # securely hashing the password | |
| # and storing it in the sqlite db | |
| hashed_password, salt = hash_password(password) | |
| user = User(username=username.lower(), hashed_password=hashed_password, salt=salt) | |
| db.session.add(user) | |
| db.session.commit() | |
| return jsonify({"message": "User enrolled successfully"}), 200 | |
| except CredentialValidationError as e: | |
| logger.exception(str(e)) | |
| return jsonify({"error": str(e)}), 400 | |
| except IntegrityError as e: | |
| logger.exception(str(e)) | |
| return jsonify({"error": "Username is already taken"}), 409 | |
| except Exception as e: | |
| logger.exception(str(e)) | |
| return jsonify({"error": "Internal Server Error", "details": str(e)}), 500 | |
| def authenticate(): | |
| """Authenticates a user based | |
| on user credentials""" | |
| try: | |
| data = request.get_json() | |
| username = data.get("username") | |
| password = data.get("password") | |
| # 2FA/MFA: NIST entertains 2FA or MFA, but here it | |
| # is not imposed due to its implementation complexity | |
| user = User.query.filter_by(username=username.lower()).first() | |
| if user is None: | |
| return jsonify({"error": "User not found"}), 404 | |
| # Retry attempts: Users are given 10 consequent | |
| # login attempts until they're locked out | |
| if user.login_attempts >= MAX_LOGIN_ATTEMPTS: | |
| lockout_time = user.last_login_time + LOCKOUT_DURATION | |
| remaining_duration = lockout_time - datetime.utcnow() | |
| if remaining_duration.total_seconds() > 0: | |
| remaining_seconds = int(remaining_duration.total_seconds()) | |
| minutes, seconds = divmod(remaining_seconds, 60) | |
| if minutes > 0: | |
| remaining_time_str = f"{minutes} minute(s) and {seconds} second(s)" | |
| else: | |
| remaining_time_str = f"{seconds} second(s)" | |
| return jsonify( | |
| { | |
| "error": f"Account locked out. Try again in {remaining_time_str}" | |
| } | |
| ), 401 | |
| else: | |
| user.login_attempts = 0 | |
| user.last_login_time = datetime.utcnow() | |
| db.session.commit() | |
| # Verify the password | |
| if not verify_password(user.hashed_password, password, user.salt): | |
| user.login_attempts += 1 | |
| user.last_login_time = datetime.utcnow() | |
| db.session.commit() | |
| if user.login_attempts >= MAX_LOGIN_ATTEMPTS: | |
| remaining_time = int(LOCKOUT_DURATION.total_seconds()) | |
| minutes, seconds = divmod(remaining_time, 60) | |
| if minutes > 0: | |
| remaining_time_str = f"{minutes} minute(s) and {seconds} second(s)" | |
| else: | |
| remaining_time_str = f"{seconds} second(s)" | |
| return jsonify({"error": f"Account locked out. Try again in {remaining_time_str}"}), 401 | |
| return jsonify({"error": "Invalid password"}), 401 | |
| # Reset if a valid login attempt | |
| user.login_attempts = 0 | |
| user.last_login_time = datetime.utcnow() | |
| db.session.commit() | |
| return jsonify({"message": f"Authentication successful. Welcome @{username.lower()}!"}), 200 | |
| except Exception as e: | |
| logger.exception(str(e)) | |
| return jsonify({"error": "Internal Server Error", "details": str(e)}), 500 | |
| def get_users(): | |
| """Retrieves the list of all users""" | |
| try: | |
| users = User.query.all() | |
| user_list = [{"id": user.id, "username": user.username} for user in users] | |
| return jsonify(user_list), 200 | |
| except Exception as e: | |
| logger.exception(str(e)) | |
| return jsonify({"error": "Internal Server Error", "details": str(e)}), 500 | |
| def delete_user(username): | |
| """Deletes a user by username""" | |
| try: | |
| user = User.query.filter_by(username=username.lower()).first() | |
| if user is None: | |
| return jsonify({"error": "User not found"}), 404 | |
| db.session.delete(user) | |
| db.session.commit() | |
| return jsonify({"message": "User deleted successfully"}), 200 | |
| except Exception as e: | |
| logger.exception(str(e)) | |
| return jsonify({"error": "Internal Server Error", "details": str(e)}), 500 | |
| if __name__ == "__main__": | |
| app.run(host=HOST, port=PORT, debug=DEBUG) | |