1
0
mirror of https://git.ikl.sh/132ikl/liteshort.git synced 2023-08-10 21:13:04 +03:00

Use Accept header, fix long link deletion, misc. API changes

This commit is contained in:
132ikl 2020-03-30 01:40:01 -04:00
parent a3c4ad4010
commit 2ee8473a44

View File

@ -1,4 +1,4 @@
# Copyright (c) 2019 Steven Spangler <132@ikl.sh> # Copyright (c) 2020 Steven Spangler <132@ikl.sh>
# This file is part of liteshort by 132ikl # This file is part of liteshort by 132ikl
# This software is license under the MIT license. It should be included in your copy of this software. # This software is license under the MIT license. It should be included in your copy of this software.
# A copy of the MIT license can be obtained at https://mit-license.org/ # A copy of the MIT license can be obtained at https://mit-license.org/
@ -11,17 +11,28 @@ import urllib
import bcrypt import bcrypt
import yaml import yaml
from flask import (Flask, current_app, flash, g, jsonify, make_response, from flask import (
redirect, render_template, request, send_from_directory, Flask,
url_for) current_app,
flash,
g,
jsonify,
make_response,
redirect,
render_template,
request,
send_from_directory,
url_for,
)
app = Flask(__name__) app = Flask(__name__)
def load_config(): def load_config():
new_config = yaml.load(open("config.yml")) with open("config.yml") as config:
new_config = { configYaml = yaml.safe_load(config)
k.lower(): v for k, v in new_config.items() config = {
k.lower(): v for k, v in configYaml.items()
} # Make config keys case insensitive } # Make config keys case insensitive
req_options = { req_options = {
@ -60,11 +71,11 @@ def load_config():
for option in req_options.keys(): for option in req_options.keys():
if ( if (
option not in new_config.keys() option not in config.keys()
): # Make sure everything in req_options is set in config ): # Make sure everything in req_options is set in config
new_config[option] = req_options[option] config[option] = req_options[option]
for option in new_config.keys(): for option in config.keys():
if option in config_types: if option in config_types:
matches = False matches = False
if type(config_types[option]) is not tuple: if type(config_types[option]) is not tuple:
@ -74,23 +85,20 @@ def load_config():
for req_type in config_types[ for req_type in config_types[
option option
]: # Iterates through tuple to allow multiple types for config options ]: # Iterates through tuple to allow multiple types for config options
if type(new_config[option]) is req_type: if type(config[option]) is req_type:
matches = True matches = True
if not matches: if not matches:
raise TypeError(option + " is incorrect type") raise TypeError(option + " is incorrect type")
if not new_config["disable_api"]: if not config["disable_api"]:
if ( if "admin_hashed_password" in config.keys() and config["admin_hashed_password"]:
"admin_hashed_password" in new_config.keys() config["password_hashed"] = True
and new_config["admin_hashed_password"] elif "admin_password" in config.keys() and config["admin_password"]:
): config["password_hashed"] = False
new_config["password_hashed"] = True
elif "admin_password" in new_config.keys() and new_config["admin_password"]:
new_config["password_hashed"] = False
else: else:
raise TypeError( raise TypeError(
"admin_password or admin_hashed_password must be set in config.yml" "admin_password or admin_hashed_password must be set in config.yml"
) )
return new_config return config
def authenticate(username, password): def authenticate(username, password):
@ -139,7 +147,7 @@ def check_password(password, pass_config):
raise RuntimeError("This should never occur! Bailing...") raise RuntimeError("This should never occur! Bailing...")
def delete_url(deletion): def delete_short(deletion):
result = query_db( result = query_db(
"SELECT * FROM urls WHERE short = ?", (deletion,), False, None "SELECT * FROM urls WHERE short = ?", (deletion,), False, None
) # Return as tuple instead of row ) # Return as tuple instead of row
@ -148,6 +156,18 @@ def delete_url(deletion):
return len(result) return len(result)
def delete_long(long):
if "//" in long:
long = long.split("//")[-1]
long = "%" + long + "%"
result = query_db(
"SELECT * FROM urls WHERE long LIKE ?", (long,), False, None
) # Return as tuple instead of row
get_db().cursor().execute("DELETE FROM urls WHERE long LIKE ?", (long,))
get_db().commit()
return len(result)
def dict_factory(cursor, row): def dict_factory(cursor, row):
d = {} d = {}
for idx, col in enumerate(cursor.description): for idx, col in enumerate(cursor.description):
@ -197,18 +217,13 @@ def nested_list_to_dict(l):
def response(rq, result, error_msg="Error: Unknown error"): def response(rq, result, error_msg="Error: Unknown error"):
if rq.form.get("api") and not rq.form.get("format") == "json": if rq.form.get("api"):
return "Format type HTML (default) not support for API" # Future-proof for non-json return types if rq.accept_mimetypes.accept_json:
if rq.form.get("format") == "json":
# If not result provided OR result doesn't exist, send error
# Allows for setting an error message with explicitly checking in regular code
if result: if result:
if result is True: # Allows sending with no result (ie. during deletion) return jsonify(success=bool(result), result=result)
return jsonify(success=True) return jsonify(success=bool(result), message=error_msg)
else: else:
return jsonify(success=True, result=result) return "Format type HTML (default) not supported for API" # Future-proof for non-json return types
else:
return jsonify(success=False, error=error_msg)
else: else:
if result and result is not True: if result and result is not True:
flash(result, "success") flash(result, "success")
@ -313,6 +328,39 @@ def main_redir(url):
@app.route("/", methods=["POST"], subdomain=app.config["subdomain"]) @app.route("/", methods=["POST"], subdomain=app.config["subdomain"])
def main_post(): def main_post():
if request.form.get("api"):
if current_app.config["disable_api"]:
return response(request, None, "API is disabled.")
# All API calls require authentication
if not request.authorization or not authenticate(
request.authorization["username"], request.authorization["password"]
):
return response(request, None, "BaiscAuth failed")
command = request.form["api"]
if command == "list" or command == "listshort":
return response(request, list_shortlinks(), "Failed to list items")
elif command == "listlong":
shortlinks = list_shortlinks()
shortlinks = {v: k for k, v in shortlinks.items()}
return response(request, shortlinks, "Failed to list items")
elif command == "delete":
deleted = 0
if "long" not in request.form and "short" not in request.form:
return response(request, None, "Provide short or long in POST data")
if "short" in request.form:
deleted = delete_short(request.form["short"]) + deleted
if "long" in request.form:
deleted = delete_long(request.form["long"]) + deleted
if deleted > 0:
return response(
request,
"Deleted " + str(deleted) + " URL" + ("s" if deleted > 1 else ""),
)
else:
return response(request, None, "URL not found")
else:
return response(request, None, "Command " + command + " not found")
if request.form.get("long"): if request.form.get("long"):
if not validate_long(request.form["long"]): if not validate_long(request.form["long"]):
return response(request, None, "Long URL is not valid") return response(request, None, "Long URL is not valid")
@ -349,37 +397,7 @@ def main_post():
) )
set_latest(request.form["long"]) set_latest(request.form["long"])
get_db().commit() get_db().commit()
return response(request, get_baseUrl() + short, "Error: Failed to generate") return response(request, get_baseUrl() + short, "Error: Failed to generate")
elif request.form.get("api"):
if current_app.config["disable_api"]:
return response(request, None, "API is disabled.")
# All API calls require authentication
if not request.authorization or not authenticate(
request.authorization["username"], request.authorization["password"]
):
return response(request, None, "BaiscAuth failed")
command = request.form["api"]
if command == "list" or command == "listshort":
return response(request, list_shortlinks(), "Failed to list items")
elif command == "listlong":
shortlinks = list_shortlinks()
shortlinks = {v: k for k, v in shortlinks.items()}
return response(request, shortlinks, "Failed to list items")
elif command == "delete":
deleted = 0
if "long" not in request.form and "short" not in request.form:
return response(request, None, "Provide short or long in POST data")
if "short" in request.form:
deleted = delete_url(request.form["short"]) + deleted
if "long" in request.form:
deleted = delete_url(request.form["long"]) + deleted
if deleted > 0:
return response(request, "Deleted " + str(deleted) + " URLs")
else:
return response(request, None, "Failed to delete URL")
else:
return response(request, None, "Command " + command + " not found")
else: else:
return response(request, None, "Long URL required") return response(request, None, "Long URL required")