diff --git a/liteshort.py b/liteshort.py index cfe7d1f..969e93a 100644 --- a/liteshort.py +++ b/liteshort.py @@ -1,4 +1,4 @@ -# Copyright (c) 2019 Steven Spangler <132@ikl.sh> +# Copyright (c) 2020 Steven Spangler <132@ikl.sh> # This file is part of liteshort by 132ikl # This software is license under the MIT license. It should be included in your copy of this software. # A copy of the MIT license can be obtained at https://mit-license.org/ @@ -11,17 +11,28 @@ import urllib import bcrypt import yaml -from flask import (Flask, current_app, flash, g, jsonify, make_response, - redirect, render_template, request, send_from_directory, - url_for) +from flask import ( + Flask, + current_app, + flash, + g, + jsonify, + make_response, + redirect, + render_template, + request, + send_from_directory, + url_for, +) app = Flask(__name__) def load_config(): - new_config = yaml.load(open("config.yml")) - new_config = { - k.lower(): v for k, v in new_config.items() + with open("config.yml") as config: + configYaml = yaml.safe_load(config) + config = { + k.lower(): v for k, v in configYaml.items() } # Make config keys case insensitive req_options = { @@ -60,11 +71,11 @@ def load_config(): for option in req_options.keys(): if ( - option not in new_config.keys() + option not in config.keys() ): # Make sure everything in req_options is set in config - new_config[option] = req_options[option] + config[option] = req_options[option] - for option in new_config.keys(): + for option in config.keys(): if option in config_types: matches = False if type(config_types[option]) is not tuple: @@ -74,23 +85,20 @@ def load_config(): for req_type in config_types[ option ]: # Iterates through tuple to allow multiple types for config options - if type(new_config[option]) is req_type: + if type(config[option]) is req_type: matches = True if not matches: raise TypeError(option + " is incorrect type") - if not new_config["disable_api"]: - if ( - "admin_hashed_password" in new_config.keys() - and new_config["admin_hashed_password"] - ): - new_config["password_hashed"] = True - elif "admin_password" in new_config.keys() and new_config["admin_password"]: - new_config["password_hashed"] = False + if not config["disable_api"]: + if "admin_hashed_password" in config.keys() and config["admin_hashed_password"]: + config["password_hashed"] = True + elif "admin_password" in config.keys() and config["admin_password"]: + config["password_hashed"] = False else: raise TypeError( "admin_password or admin_hashed_password must be set in config.yml" ) - return new_config + return config def authenticate(username, password): @@ -139,7 +147,7 @@ def check_password(password, pass_config): raise RuntimeError("This should never occur! Bailing...") -def delete_url(deletion): +def delete_short(deletion): result = query_db( "SELECT * FROM urls WHERE short = ?", (deletion,), False, None ) # Return as tuple instead of row @@ -148,6 +156,18 @@ def delete_url(deletion): return len(result) +def delete_long(long): + if "//" in long: + long = long.split("//")[-1] + long = "%" + long + "%" + result = query_db( + "SELECT * FROM urls WHERE long LIKE ?", (long,), False, None + ) # Return as tuple instead of row + get_db().cursor().execute("DELETE FROM urls WHERE long LIKE ?", (long,)) + get_db().commit() + return len(result) + + def dict_factory(cursor, row): d = {} for idx, col in enumerate(cursor.description): @@ -197,18 +217,13 @@ def nested_list_to_dict(l): def response(rq, result, error_msg="Error: Unknown error"): - if rq.form.get("api") and not rq.form.get("format") == "json": - return "Format type HTML (default) not support for API" # Future-proof for non-json return types - if rq.form.get("format") == "json": - # If not result provided OR result doesn't exist, send error - # Allows for setting an error message with explicitly checking in regular code - if result: - if result is True: # Allows sending with no result (ie. during deletion) - return jsonify(success=True) - else: - return jsonify(success=True, result=result) + if rq.form.get("api"): + if rq.accept_mimetypes.accept_json: + if result: + return jsonify(success=bool(result), result=result) + return jsonify(success=bool(result), message=error_msg) else: - return jsonify(success=False, error=error_msg) + return "Format type HTML (default) not supported for API" # Future-proof for non-json return types else: if result and result is not True: flash(result, "success") @@ -313,6 +328,39 @@ def main_redir(url): @app.route("/", methods=["POST"], subdomain=app.config["subdomain"]) def main_post(): + if request.form.get("api"): + if current_app.config["disable_api"]: + return response(request, None, "API is disabled.") + # All API calls require authentication + if not request.authorization or not authenticate( + request.authorization["username"], request.authorization["password"] + ): + return response(request, None, "BaiscAuth failed") + command = request.form["api"] + if command == "list" or command == "listshort": + return response(request, list_shortlinks(), "Failed to list items") + elif command == "listlong": + shortlinks = list_shortlinks() + shortlinks = {v: k for k, v in shortlinks.items()} + return response(request, shortlinks, "Failed to list items") + elif command == "delete": + deleted = 0 + if "long" not in request.form and "short" not in request.form: + return response(request, None, "Provide short or long in POST data") + if "short" in request.form: + deleted = delete_short(request.form["short"]) + deleted + if "long" in request.form: + deleted = delete_long(request.form["long"]) + deleted + if deleted > 0: + return response( + request, + "Deleted " + str(deleted) + " URL" + ("s" if deleted > 1 else ""), + ) + else: + return response(request, None, "URL not found") + else: + return response(request, None, "Command " + command + " not found") + if request.form.get("long"): if not validate_long(request.form["long"]): return response(request, None, "Long URL is not valid") @@ -349,37 +397,7 @@ def main_post(): ) set_latest(request.form["long"]) get_db().commit() - return response(request, get_baseUrl() + short, "Error: Failed to generate") - elif request.form.get("api"): - if current_app.config["disable_api"]: - return response(request, None, "API is disabled.") - # All API calls require authentication - if not request.authorization or not authenticate( - request.authorization["username"], request.authorization["password"] - ): - return response(request, None, "BaiscAuth failed") - command = request.form["api"] - if command == "list" or command == "listshort": - return response(request, list_shortlinks(), "Failed to list items") - elif command == "listlong": - shortlinks = list_shortlinks() - shortlinks = {v: k for k, v in shortlinks.items()} - return response(request, shortlinks, "Failed to list items") - elif command == "delete": - deleted = 0 - if "long" not in request.form and "short" not in request.form: - return response(request, None, "Provide short or long in POST data") - if "short" in request.form: - deleted = delete_url(request.form["short"]) + deleted - if "long" in request.form: - deleted = delete_url(request.form["long"]) + deleted - if deleted > 0: - return response(request, "Deleted " + str(deleted) + " URLs") - else: - return response(request, None, "Failed to delete URL") - else: - return response(request, None, "Command " + command + " not found") else: return response(request, None, "Long URL required")