2019-02-24 10:30:57 +03:00
# Copyright (c) 2019 Steven Spangler <132@ikl.sh>
# This file is part of liteshort by 132ikl
# This software is license under the MIT license. It should be included in your copy of this software.
# A copy of the MIT license can be obtained at https://mit-license.org/
2019-04-21 02:28:01 +03:00
from flask import Flask , current_app , flash , g , jsonify , make_response , redirect , render_template , request , send_from_directory , url_for
2019-02-22 10:58:38 +03:00
import bcrypt
2019-02-25 07:31:39 +03:00
import os
2019-02-22 10:58:38 +03:00
import random
import sqlite3
import time
2019-02-24 08:07:01 +03:00
import urllib
2019-02-21 23:59:21 +03:00
import yaml
2019-02-24 08:07:01 +03:00
app = Flask ( __name__ )
2019-02-21 23:59:21 +03:00
def load_config ( ) :
2019-02-22 10:58:38 +03:00
new_config = yaml . load ( open ( ' config.yml ' ) )
new_config = { k . lower ( ) : v for k , v in new_config . items ( ) } # Make config keys case insensitive
req_options = { ' admin_username ' : ' admin ' , ' database_name ' : " urls " , ' random_length ' : 4 ,
' allowed_chars ' : ' ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_ ' ,
2019-02-25 06:29:38 +03:00
' random_gen_timeout ' : 5 , ' site_name ' : ' liteshort ' , ' site_domain ' : None , ' show_github_link ' : True ,
2019-04-21 02:28:01 +03:00
' secret_key ' : None , ' disable_api ' : False , ' subdomain ' : ' ' , ' latest ' : ' l '
2019-02-22 10:58:38 +03:00
}
config_types = { ' admin_username ' : str , ' database_name ' : str , ' random_length ' : int ,
2019-02-24 08:07:01 +03:00
' allowed_chars ' : str , ' random_gen_timeout ' : int , ' site_name ' : str ,
2019-02-25 06:29:38 +03:00
' site_domain ' : ( str , type ( None ) ) , ' show_github_link ' : bool , ' secret_key ' : str ,
2019-04-21 02:28:01 +03:00
' disable_api ' : bool , ' subdomain ' : ( str , type ( None ) ) , ' latest ' : ( str , type ( None ) )
2019-02-24 11:31:30 +03:00
}
2019-02-22 10:58:38 +03:00
for option in req_options . keys ( ) :
if option not in new_config . keys ( ) : # Make sure everything in req_options is set in config
new_config [ option ] = req_options [ option ]
for option in new_config . keys ( ) :
if option in config_types :
2019-02-24 08:07:01 +03:00
matches = False
if type ( config_types [ option ] ) is not tuple :
config_types [ option ] = ( config_types [ option ] , ) # Automatically creates tuple for non-tuple types
for req_type in config_types [ option ] : # Iterates through tuple to allow multiple types for config options
if type ( new_config [ option ] ) is req_type :
matches = True
if not matches :
raise TypeError ( option + " is incorrect type " )
2019-02-24 11:31:30 +03:00
if not new_config [ ' disable_api ' ] :
2019-02-25 05:14:51 +03:00
if ' admin_hashed_password ' in new_config . keys ( ) and new_config [ ' admin_hashed_password ' ] :
2019-02-24 11:31:30 +03:00
new_config [ ' password_hashed ' ] = True
2019-02-25 05:14:51 +03:00
elif ' admin_password ' in new_config . keys ( ) and new_config [ ' admin_password ' ] :
2019-02-24 11:31:30 +03:00
new_config [ ' password_hashed ' ] = False
else :
raise TypeError ( ' admin_password or admin_hashed_password must be set in config.yml ' )
2019-02-21 23:59:21 +03:00
return new_config
2019-02-24 08:07:01 +03:00
def authenticate ( username , password ) :
return username == current_app . config [ ' admin_username ' ] and check_password ( password , current_app . config )
def check_long_exist ( long ) :
query = query_db ( ' SELECT short FROM urls WHERE long = ? ' , ( long , ) )
for i in query :
2019-04-21 02:28:01 +03:00
if i and ( len ( i [ ' short ' ] ) < = current_app . config [ " random_length " ] ) and i [ ' short ' ] != current_app . config [ ' latest ' ] : # Checks if query if pre-existing URL is same as random length URL
2019-02-24 08:07:01 +03:00
return i [ ' short ' ]
return False
2019-02-24 09:36:10 +03:00
def check_short_exist ( short ) : # Allow to also check against a long link
if get_long ( short ) :
2019-02-24 08:07:01 +03:00
return True
return False
2019-02-22 10:58:38 +03:00
def check_password ( password , pass_config ) :
if pass_config [ ' password_hashed ' ] :
return bcrypt . checkpw ( password . encode ( ' utf-8 ' ) , pass_config [ ' admin_hashed_password ' ] . encode ( ' utf-8 ' ) )
elif not pass_config [ ' password_hashed ' ] :
return password == pass_config [ ' admin_password ' ]
else :
raise RuntimeError ( ' This should never occur! Bailing... ' )
2019-02-24 08:07:01 +03:00
def delete_url ( deletion ) :
result = query_db ( ' SELECT * FROM urls WHERE short = ? ' , ( deletion , ) , False , None ) # Return as tuple instead of row
get_db ( ) . cursor ( ) . execute ( ' DELETE FROM urls WHERE short = ? ' , ( deletion , ) )
get_db ( ) . commit ( )
return len ( result )
2019-02-22 10:58:38 +03:00
2019-02-24 08:07:01 +03:00
def dict_factory ( cursor , row ) :
d = { }
for idx , col in enumerate ( cursor . description ) :
d [ col [ 0 ] ] = row [ idx ]
return d
def generate_short ( rq ) :
timeout = time . time ( ) + current_app . config [ ' random_gen_timeout ' ]
while True :
if time . time ( ) > = timeout :
return response ( rq , None , ' Timeout while generating random short URL ' )
short = ' ' . join ( random . choice ( current_app . config [ ' allowed_chars ' ] )
for i in range ( current_app . config [ ' random_length ' ] ) )
2019-04-21 02:28:01 +03:00
if not check_short_exist ( short ) and short != app . config [ ' latest ' ] :
2019-02-24 08:07:01 +03:00
return short
2019-02-24 09:36:10 +03:00
def get_long ( short ) :
row = query_db ( ' SELECT long FROM urls WHERE short = ? ' , ( short , ) , True )
if row and row [ ' long ' ] :
return row [ ' long ' ]
return None
2019-04-21 02:28:01 +03:00
def get_baseUrl ( ) :
if current_app . config [ ' site_domain ' ] :
# TODO: un-hack-ify adding the protocol here
return ' https:// ' + current_app . config [ ' site_domain ' ] + ' / '
else :
return request . base_url
2019-02-24 08:07:01 +03:00
def list_shortlinks ( ) :
result = query_db ( ' SELECT * FROM urls ' , ( ) , False , None )
result = nested_list_to_dict ( result )
return result
def nested_list_to_dict ( l ) :
d = { }
for nl in l :
2019-04-21 02:28:01 +03:00
d [ nl [ 0 ] ] = nl [ 1 ]
2019-02-24 08:07:01 +03:00
return d
def response ( rq , result , error_msg = " Error: Unknown error " ) :
2019-02-25 06:29:38 +03:00
if rq . form . get ( ' api ' ) and not rq . form . get ( ' format ' ) == ' json ' :
2019-02-24 08:07:01 +03:00
return " Format type HTML (default) not support for API " # Future-proof for non-json return types
2019-02-25 06:29:38 +03:00
if rq . form . get ( ' format ' ) == ' json ' :
2019-02-24 08:07:01 +03:00
# If not result provided OR result doesn't exist, send error
# Allows for setting an error message with explicitly checking in regular code
if result :
if result is True : # Allows sending with no result (ie. during deletion)
return jsonify ( success = True )
else :
return jsonify ( success = True , result = result )
else :
return jsonify ( success = False , error = error_msg )
else :
2019-02-24 09:36:10 +03:00
if result and result is not True :
flash ( result , ' success ' )
elif not result :
flash ( error_msg , ' error ' )
return render_template ( " main.html " )
2019-02-24 08:07:01 +03:00
2019-04-21 02:28:01 +03:00
def set_latest ( long ) :
if app . config [ ' latest ' ] :
if query_db ( ' SELECT short FROM urls WHERE short = ? ' , ( current_app . config [ ' latest ' ] , ) ) :
get_db ( ) . cursor ( ) . execute ( " UPDATE urls SET long = ? WHERE short = ? " ,
( long , current_app . config [ ' latest ' ] ) )
else :
get_db ( ) . cursor ( ) . execute ( " INSERT INTO urls (long,short) VALUES (?, ?) " ,
( long , current_app . config [ ' latest ' ] ) )
2019-02-24 08:07:01 +03:00
def validate_short ( short ) :
2019-04-21 02:28:01 +03:00
if short == app . config [ ' latest ' ] :
return response ( request , None ,
' Short URL cannot be the same as a special URL ( {} ) ' . format ( short ) )
2019-02-24 08:07:01 +03:00
for char in short :
if char not in current_app . config [ ' allowed_chars ' ] :
return response ( request , None ,
' Character ' + char + ' not allowed in short URL ' )
return True
2019-02-22 10:58:38 +03:00
2019-02-24 08:07:01 +03:00
def validate_long ( long ) : # https://stackoverflow.com/a/36283503
token = urllib . parse . urlparse ( long )
return all ( [ token . scheme , token . netloc ] )
2019-02-22 10:58:38 +03:00
2019-02-24 08:07:01 +03:00
# Database connection functions
2019-02-22 10:58:38 +03:00
def get_db ( ) :
if ' db ' not in g :
g . db = sqlite3 . connect (
' ' . join ( ( current_app . config [ ' database_name ' ] , ' .db ' ) ) ,
detect_types = sqlite3 . PARSE_DECLTYPES
)
g . db . cursor ( ) . execute ( ' CREATE TABLE IF NOT EXISTS urls (long,short) ' )
return g . db
2019-02-24 08:07:01 +03:00
def query_db ( query , args = ( ) , one = False , row_factory = sqlite3 . Row ) :
get_db ( ) . row_factory = row_factory
2019-02-23 09:45:47 +03:00
cur = get_db ( ) . execute ( query , args )
rv = cur . fetchall ( )
cur . close ( )
return ( rv [ 0 ] if rv else None ) if one else rv
2019-02-24 08:07:01 +03:00
@app.teardown_appcontext
def close_db ( error ) :
if hasattr ( g , ' sqlite_db ' ) :
g . sqlite_db . close ( )
2019-02-23 09:45:47 +03:00
2019-02-24 08:07:01 +03:00
app . config . update ( load_config ( ) ) # Add YAML config to Flask config
2019-02-24 09:36:10 +03:00
app . secret_key = app . config [ ' secret_key ' ]
2019-02-25 06:29:38 +03:00
app . config [ ' SERVER_NAME ' ] = app . config [ ' site_domain ' ]
2019-02-21 23:59:21 +03:00
2019-02-25 07:31:39 +03:00
@app.route ( ' /favicon.ico ' , subdomain = app . config [ ' subdomain ' ] )
def favicon ( ) :
return send_from_directory ( os . path . join ( app . root_path , ' static ' ) ,
' favicon.ico ' , mimetype = ' image/vnd.microsoft.icon ' )
2019-02-25 06:29:38 +03:00
@app.route ( ' / ' , subdomain = app . config [ ' subdomain ' ] )
2019-02-22 10:58:38 +03:00
def main ( ) :
2019-02-24 09:36:10 +03:00
return response ( request , True )
@app.route ( ' /<url> ' )
def main_redir ( url ) :
long = get_long ( url )
if long :
2019-04-21 02:28:01 +03:00
resp = make_response ( redirect ( long , 301 ) )
else :
flash ( ' Short URL " ' + url + ' " doesn \' t exist ' , ' error ' )
resp = make_response ( redirect ( url_for ( ' main ' ) ) )
resp . headers . set ( ' Cache-Control ' , ' no-store, must-revalidate ' )
return resp
2019-02-22 10:58:38 +03:00
2019-02-25 06:29:38 +03:00
@app.route ( ' / ' , methods = [ ' POST ' ] , subdomain = app . config [ ' subdomain ' ] )
2019-02-22 10:58:38 +03:00
def main_post ( ) :
2019-02-25 06:29:38 +03:00
if request . form . get ( ' long ' ) :
2019-02-24 09:36:10 +03:00
if not validate_long ( request . form [ ' long ' ] ) :
return response ( request , None , " Long URL is not valid " )
2019-02-25 06:29:38 +03:00
if request . form . get ( ' short ' ) :
2019-02-24 08:07:01 +03:00
# Validate long as URL and short custom text against allowed characters
result = validate_short ( request . form [ ' short ' ] )
if validate_short ( request . form [ ' short ' ] ) is True :
short = request . form [ ' short ' ]
else :
return result
2019-02-24 09:36:10 +03:00
if get_long ( short ) == request . form [ ' long ' ] :
2019-04-21 02:28:01 +03:00
return response ( request , get_baseUrl ( ) + short ,
2019-02-24 08:07:01 +03:00
' Error: Failed to return pre-existing non-random shortlink ' )
2019-02-22 10:58:38 +03:00
else :
2019-02-24 08:07:01 +03:00
short = generate_short ( request )
2019-02-24 09:36:10 +03:00
if check_short_exist ( short ) :
2019-02-24 08:07:01 +03:00
return response ( request , None ,
2019-02-24 09:36:10 +03:00
' Short URL already taken ' )
2019-02-23 09:45:47 +03:00
long_exists = check_long_exist ( request . form [ ' long ' ] )
2019-02-25 06:29:38 +03:00
if long_exists and not request . form . get ( ' short ' ) :
2019-04-21 02:28:01 +03:00
set_latest ( request . form [ ' long ' ] )
get_db ( ) . commit ( )
return response ( request , get_baseUrl ( ) + long_exists ,
2019-02-24 08:07:01 +03:00
' Error: Failed to return pre-existing random shortlink ' )
get_db ( ) . cursor ( ) . execute ( ' INSERT INTO urls (long,short) VALUES (?,?) ' , ( request . form [ ' long ' ] , short ) )
2019-04-21 02:28:01 +03:00
set_latest ( request . form [ ' long ' ] )
2019-02-24 08:07:01 +03:00
get_db ( ) . commit ( )
2019-04-21 02:28:01 +03:00
return response ( request , get_baseUrl ( ) + short ,
2019-02-24 08:07:01 +03:00
' Error: Failed to generate ' )
2019-02-25 06:29:38 +03:00
elif request . form . get ( ' api ' ) :
2019-02-24 11:31:30 +03:00
if current_app . config [ ' disable_api ' ] :
return response ( request , None , " API is disabled. " )
2019-02-24 08:07:01 +03:00
# All API calls require authentication
if not request . authorization \
or not authenticate ( request . authorization [ ' username ' ] , request . authorization [ ' password ' ] ) :
return response ( request , None , " BaiscAuth failed " )
command = request . form [ ' api ' ]
if command == ' list ' or command == ' listshort ' :
return response ( request , list_shortlinks ( ) , " Failed to list items " )
elif command == ' listlong ' :
shortlinks = list_shortlinks ( )
shortlinks = { v : k for k , v in shortlinks . items ( ) }
return response ( request , shortlinks , " Failed to list items " )
elif command == ' delete ' :
deleted = 0
if ' long ' not in request . form and ' short ' not in request . form :
return response ( request , None , " Provide short or long in POST data " )
if ' short ' in request . form :
deleted = delete_url ( request . form [ ' short ' ] ) + deleted
if ' long ' in request . form :
deleted = delete_url ( request . form [ ' long ' ] ) + deleted
if deleted > 0 :
return response ( request , " Deleted " + str ( deleted ) + " URLs " )
else :
return response ( request , None , " Failed to delete URL " )
else :
return response ( request , None , ' Command ' + command + ' not found ' )
2019-02-22 10:58:38 +03:00
else :
2019-02-24 08:07:01 +03:00
return response ( request , None , ' Long URL required ' )
2019-02-21 23:59:21 +03:00
if __name__ == ' __main__ ' :
app . run ( )