From 1fe86f4e606c096018bf02fb4bef39cdce9a8dcb Mon Sep 17 00:00:00 2001 From: 132ikl <132@ikl.sh> Date: Thu, 16 Jan 2020 14:35:43 -0500 Subject: [PATCH] Run Black formatting, isort, autoflake --- liteshort.py | 308 ++++++++++++++++++++++++++++++++------------------- wsgi.py | 1 - 2 files changed, 191 insertions(+), 118 deletions(-) diff --git a/liteshort.py b/liteshort.py index 2dae58b..37b5948 100644 --- a/liteshort.py +++ b/liteshort.py @@ -3,67 +3,109 @@ # This software is license under the MIT license. It should be included in your copy of this software. # A copy of the MIT license can be obtained at https://mit-license.org/ -from flask import Flask, current_app, flash, g, jsonify, make_response, redirect, render_template, request, send_from_directory, url_for -import bcrypt import os import random import sqlite3 import time import urllib + +import bcrypt import yaml +from flask import (Flask, current_app, flash, g, jsonify, make_response, + redirect, render_template, request, send_from_directory, + url_for) app = Flask(__name__) def load_config(): - new_config = yaml.load(open('config.yml')) - new_config = {k.lower(): v for k, v in new_config.items()} # Make config keys case insensitive + new_config = yaml.load(open("config.yml")) + new_config = { + k.lower(): v for k, v in new_config.items() + } # Make config keys case insensitive - req_options = {'admin_username': 'admin', 'database_name': "urls", 'random_length': 4, - 'allowed_chars': 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_', - 'random_gen_timeout': 5, 'site_name': 'liteshort', 'site_domain': None, 'show_github_link': True, - 'secret_key': None, 'disable_api': False, 'subdomain': '', 'latest': 'l', 'selflinks': False - } + req_options = { + "admin_username": "admin", + "database_name": "urls", + "random_length": 4, + "allowed_chars": "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_", + "random_gen_timeout": 5, + "site_name": "liteshort", + "site_domain": None, + "show_github_link": True, + "secret_key": None, + "disable_api": False, + "subdomain": "", + "latest": "l", + "selflinks": False, + } - config_types = {'admin_username': str, 'database_name': str, 'random_length': int, - 'allowed_chars': str, 'random_gen_timeout': int, 'site_name': str, - 'site_domain': (str, type(None)), 'show_github_link': bool, 'secret_key': str, - 'disable_api': bool, 'subdomain': (str, type(None)), 'latest': (str, type(None)), 'selflinks': bool - } + config_types = { + "admin_username": str, + "database_name": str, + "random_length": int, + "allowed_chars": str, + "random_gen_timeout": int, + "site_name": str, + "site_domain": (str, type(None)), + "show_github_link": bool, + "secret_key": str, + "disable_api": bool, + "subdomain": (str, type(None)), + "latest": (str, type(None)), + "selflinks": bool, + } for option in req_options.keys(): - if option not in new_config.keys(): # Make sure everything in req_options is set in config + if ( + option not in new_config.keys() + ): # Make sure everything in req_options is set in config new_config[option] = req_options[option] for option in new_config.keys(): if option in config_types: matches = False if type(config_types[option]) is not tuple: - config_types[option] = (config_types[option],) # Automatically creates tuple for non-tuple types - for req_type in config_types[option]: # Iterates through tuple to allow multiple types for config options + config_types[option] = ( + config_types[option], + ) # Automatically creates tuple for non-tuple types + for req_type in config_types[ + option + ]: # Iterates through tuple to allow multiple types for config options if type(new_config[option]) is req_type: matches = True if not matches: raise TypeError(option + " is incorrect type") - if not new_config['disable_api']: - if 'admin_hashed_password' in new_config.keys() and new_config['admin_hashed_password']: - new_config['password_hashed'] = True - elif 'admin_password' in new_config.keys() and new_config['admin_password']: - new_config['password_hashed'] = False + if not new_config["disable_api"]: + if ( + "admin_hashed_password" in new_config.keys() + and new_config["admin_hashed_password"] + ): + new_config["password_hashed"] = True + elif "admin_password" in new_config.keys() and new_config["admin_password"]: + new_config["password_hashed"] = False else: - raise TypeError('admin_password or admin_hashed_password must be set in config.yml') + raise TypeError( + "admin_password or admin_hashed_password must be set in config.yml" + ) return new_config def authenticate(username, password): - return username == current_app.config['admin_username'] and check_password(password, current_app.config) + return username == current_app.config["admin_username"] and check_password( + password, current_app.config + ) def check_long_exist(long): - query = query_db('SELECT short FROM urls WHERE long = ?', (long,)) + query = query_db("SELECT short FROM urls WHERE long = ?", (long,)) for i in query: - if i and (len(i['short']) <= current_app.config["random_length"]) and i['short'] != current_app.config['latest']: # Checks if query if pre-existing URL is same as random length URL - return i['short'] + if ( + i + and (len(i["short"]) <= current_app.config["random_length"]) + and i["short"] != current_app.config["latest"] + ): # Checks if query if pre-existing URL is same as random length URL + return i["short"] return False @@ -72,23 +114,30 @@ def check_short_exist(short): # Allow to also check against a long link return True return False + def check_self_link(long): - if get_baseUrl().rstrip('/') in long: + if get_baseUrl().rstrip("/") in long: return True return False + def check_password(password, pass_config): - if pass_config['password_hashed']: - return bcrypt.checkpw(password.encode('utf-8'), pass_config['admin_hashed_password'].encode('utf-8')) - elif not pass_config['password_hashed']: - return password == pass_config['admin_password'] + if pass_config["password_hashed"]: + return bcrypt.checkpw( + password.encode("utf-8"), + pass_config["admin_hashed_password"].encode("utf-8"), + ) + elif not pass_config["password_hashed"]: + return password == pass_config["admin_password"] else: - raise RuntimeError('This should never occur! Bailing...') + raise RuntimeError("This should never occur! Bailing...") def delete_url(deletion): - result = query_db('SELECT * FROM urls WHERE short = ?', (deletion,), False, None) # Return as tuple instead of row - get_db().cursor().execute('DELETE FROM urls WHERE short = ?', (deletion,)) + result = query_db( + "SELECT * FROM urls WHERE short = ?", (deletion,), False, None + ) # Return as tuple instead of row + get_db().cursor().execute("DELETE FROM urls WHERE short = ?", (deletion,)) get_db().commit() return len(result) @@ -101,33 +150,35 @@ def dict_factory(cursor, row): def generate_short(rq): - timeout = time.time() + current_app.config['random_gen_timeout'] + timeout = time.time() + current_app.config["random_gen_timeout"] while True: if time.time() >= timeout: - return response(rq, None, 'Timeout while generating random short URL') - short = ''.join(random.choice(current_app.config['allowed_chars']) - for i in range(current_app.config['random_length'])) - if not check_short_exist(short) and short != app.config['latest']: + return response(rq, None, "Timeout while generating random short URL") + short = "".join( + random.choice(current_app.config["allowed_chars"]) + for i in range(current_app.config["random_length"]) + ) + if not check_short_exist(short) and short != app.config["latest"]: return short def get_long(short): - row = query_db('SELECT long FROM urls WHERE short = ?', (short,), True) - if row and row['long']: - return row['long'] + row = query_db("SELECT long FROM urls WHERE short = ?", (short,), True) + if row and row["long"]: + return row["long"] return None def get_baseUrl(): - if current_app.config['site_domain']: + if current_app.config["site_domain"]: # TODO: un-hack-ify adding the protocol here - return 'https://' + current_app.config['site_domain'] + '/' + return "https://" + current_app.config["site_domain"] + "/" else: return request.base_url def list_shortlinks(): - result = query_db('SELECT * FROM urls', (), False, None) + result = query_db("SELECT * FROM urls", (), False, None) result = nested_list_to_dict(result) return result @@ -140,9 +191,9 @@ def nested_list_to_dict(l): def response(rq, result, error_msg="Error: Unknown error"): - if rq.form.get('api') and not rq.form.get('format') == 'json': + if rq.form.get("api") and not rq.form.get("format") == "json": return "Format type HTML (default) not support for API" # Future-proof for non-json return types - if rq.form.get('format') == 'json': + if rq.form.get("format") == "json": # If not result provided OR result doesn't exist, send error # Allows for setting an error message with explicitly checking in regular code if result: @@ -154,30 +205,40 @@ def response(rq, result, error_msg="Error: Unknown error"): return jsonify(success=False, error=error_msg) else: if result and result is not True: - flash(result, 'success') + flash(result, "success") elif not result: - flash(error_msg, 'error') + flash(error_msg, "error") return render_template("main.html") def set_latest(long): - if app.config['latest']: - if query_db('SELECT short FROM urls WHERE short = ?', (current_app.config['latest'],)): - get_db().cursor().execute("UPDATE urls SET long = ? WHERE short = ?", - (long, current_app.config['latest'])) + if app.config["latest"]: + if query_db( + "SELECT short FROM urls WHERE short = ?", (current_app.config["latest"],) + ): + get_db().cursor().execute( + "UPDATE urls SET long = ? WHERE short = ?", + (long, current_app.config["latest"]), + ) else: - get_db().cursor().execute("INSERT INTO urls (long,short) VALUES (?, ?)", - (long, current_app.config['latest'])) + get_db().cursor().execute( + "INSERT INTO urls (long,short) VALUES (?, ?)", + (long, current_app.config["latest"]), + ) def validate_short(short): - if short == app.config['latest']: - return response(request, None, - 'Short URL cannot be the same as a special URL ({})'.format(short)) + if short == app.config["latest"]: + return response( + request, + None, + "Short URL cannot be the same as a special URL ({})".format(short), + ) for char in short: - if char not in current_app.config['allowed_chars']: - return response(request, None, - 'Character ' + char + ' not allowed in short URL') + if char not in current_app.config["allowed_chars"]: + return response( + request, None, "Character " + char + " not allowed in short URL" + ) return True @@ -185,16 +246,17 @@ def validate_long(long): # https://stackoverflow.com/a/36283503 token = urllib.parse.urlparse(long) return all([token.scheme, token.netloc]) + # Database connection functions def get_db(): - if 'db' not in g: + if "db" not in g: g.db = sqlite3.connect( - ''.join((current_app.config['database_name'], '.db')), - detect_types=sqlite3.PARSE_DECLTYPES + "".join((current_app.config["database_name"], ".db")), + detect_types=sqlite3.PARSE_DECLTYPES, ) - g.db.cursor().execute('CREATE TABLE IF NOT EXISTS urls (long,short)') + g.db.cursor().execute("CREATE TABLE IF NOT EXISTS urls (long,short)") return g.db @@ -208,104 +270,116 @@ def query_db(query, args=(), one=False, row_factory=sqlite3.Row): @app.teardown_appcontext def close_db(error): - if hasattr(g, 'sqlite_db'): + if hasattr(g, "sqlite_db"): g.sqlite_db.close() app.config.update(load_config()) # Add YAML config to Flask config -app.secret_key = app.config['secret_key'] -app.config['SERVER_NAME'] = app.config['site_domain'] +app.secret_key = app.config["secret_key"] +app.config["SERVER_NAME"] = app.config["site_domain"] -@app.route('/favicon.ico', subdomain=app.config['subdomain']) +@app.route("/favicon.ico", subdomain=app.config["subdomain"]) def favicon(): - return send_from_directory(os.path.join(app.root_path, 'static'), - 'favicon.ico', mimetype='image/vnd.microsoft.icon') + return send_from_directory( + os.path.join(app.root_path, "static"), + "favicon.ico", + mimetype="image/vnd.microsoft.icon", + ) -@app.route('/', subdomain=app.config['subdomain']) +@app.route("/", subdomain=app.config["subdomain"]) def main(): return response(request, True) -@app.route('/') +@app.route("/") def main_redir(url): long = get_long(url) if long: resp = make_response(redirect(long, 301)) else: - flash('Short URL "' + url + '" doesn\'t exist', 'error') - resp = make_response(redirect(url_for('main'))) - resp.headers.set('Cache-Control', 'no-store, must-revalidate') + flash('Short URL "' + url + "\" doesn't exist", "error") + resp = make_response(redirect(url_for("main"))) + resp.headers.set("Cache-Control", "no-store, must-revalidate") return resp -@app.route('/', methods=['POST'], subdomain=app.config['subdomain']) +@app.route("/", methods=["POST"], subdomain=app.config["subdomain"]) def main_post(): - if request.form.get('long'): - if not validate_long(request.form['long']): + if request.form.get("long"): + if not validate_long(request.form["long"]): return response(request, None, "Long URL is not valid") - if request.form.get('short'): + if request.form.get("short"): # Validate long as URL and short custom text against allowed characters - result = validate_short(request.form['short']) - if validate_short(request.form['short']) is True: - short = request.form['short'] + result = validate_short(request.form["short"]) + if validate_short(request.form["short"]) is True: + short = request.form["short"] else: return result - if get_long(short) == request.form['long']: - return response(request, get_baseUrl() + short, - 'Error: Failed to return pre-existing non-random shortlink') + if get_long(short) == request.form["long"]: + return response( + request, + get_baseUrl() + short, + "Error: Failed to return pre-existing non-random shortlink", + ) else: short = generate_short(request) if check_short_exist(short): - return response(request, None, - 'Short URL already taken') - long_exists = check_long_exist(request.form['long']) - if check_self_link(request.form['long']) and not current_app.config['selflinks']: - return response(request, None, - 'You cannot link to this site') - if long_exists and not request.form.get('short'): - set_latest(request.form['long']) + return response(request, None, "Short URL already taken") + long_exists = check_long_exist(request.form["long"]) + if ( + check_self_link(request.form["long"]) + and not current_app.config["selflinks"] + ): + return response(request, None, "You cannot link to this site") + if long_exists and not request.form.get("short"): + set_latest(request.form["long"]) get_db().commit() - return response(request, get_baseUrl() + long_exists, - 'Error: Failed to return pre-existing random shortlink') - get_db().cursor().execute('INSERT INTO urls (long,short) VALUES (?,?)', (request.form['long'], short)) - set_latest(request.form['long']) + return response( + request, + get_baseUrl() + long_exists, + "Error: Failed to return pre-existing random shortlink", + ) + get_db().cursor().execute( + "INSERT INTO urls (long,short) VALUES (?,?)", (request.form["long"], short) + ) + set_latest(request.form["long"]) get_db().commit() - return response(request, get_baseUrl() + short, - 'Error: Failed to generate') - elif request.form.get('api'): - if current_app.config['disable_api']: + return response(request, get_baseUrl() + short, "Error: Failed to generate") + elif request.form.get("api"): + if current_app.config["disable_api"]: return response(request, None, "API is disabled.") # All API calls require authentication - if not request.authorization \ - or not authenticate(request.authorization['username'], request.authorization['password']): + if not request.authorization or not authenticate( + request.authorization["username"], request.authorization["password"] + ): return response(request, None, "BaiscAuth failed") - command = request.form['api'] - if command == 'list' or command == 'listshort': + command = request.form["api"] + if command == "list" or command == "listshort": return response(request, list_shortlinks(), "Failed to list items") - elif command == 'listlong': + elif command == "listlong": shortlinks = list_shortlinks() shortlinks = {v: k for k, v in shortlinks.items()} return response(request, shortlinks, "Failed to list items") - elif command == 'delete': + elif command == "delete": deleted = 0 - if 'long' not in request.form and 'short' not in request.form: + if "long" not in request.form and "short" not in request.form: return response(request, None, "Provide short or long in POST data") - if 'short' in request.form: - deleted = delete_url(request.form['short']) + deleted - if 'long' in request.form: - deleted = delete_url(request.form['long']) + deleted + if "short" in request.form: + deleted = delete_url(request.form["short"]) + deleted + if "long" in request.form: + deleted = delete_url(request.form["long"]) + deleted if deleted > 0: return response(request, "Deleted " + str(deleted) + " URLs") else: return response(request, None, "Failed to delete URL") else: - return response(request, None, 'Command ' + command + ' not found') + return response(request, None, "Command " + command + " not found") else: - return response(request, None, 'Long URL required') + return response(request, None, "Long URL required") -if __name__ == '__main__': +if __name__ == "__main__": app.run() diff --git a/wsgi.py b/wsgi.py index 950c5ee..55a289f 100644 --- a/wsgi.py +++ b/wsgi.py @@ -2,4 +2,3 @@ from liteshort import app if __name__ == "__main__": app.run() -