mirror of
https://github.com/PhoenixTwoFive/karaoqueue.git
synced 2025-05-18 18:41:48 +02:00
391 lines
13 KiB
Python
391 lines
13 KiB
Python
from flask import Flask, render_template, abort, request, redirect, send_from_directory, jsonify
|
|
from flask.wrappers import Response
|
|
import helpers
|
|
import database
|
|
import data_adapters
|
|
import os
|
|
import json
|
|
from flask_basicauth import BasicAuth
|
|
from helpers import nocache
|
|
from werkzeug.utils import secure_filename
|
|
app = Flask(__name__, static_url_path='/static')
|
|
|
|
basic_auth = BasicAuth(app)
|
|
accept_entries = True
|
|
|
|
|
|
@app.route("/")
|
|
def home():
|
|
if basic_auth.authenticate():
|
|
return render_template('main_admin.html', list=database.get_list(), auth=basic_auth.authenticate(), debug=app.config['DEBUG'])
|
|
else:
|
|
return render_template('main.html', list=database.get_list(), auth=basic_auth.authenticate(), debug=app.config['DEBUG'])
|
|
|
|
|
|
@app.route("/favicon.ico")
|
|
def favicon():
|
|
return send_from_directory(os.path.join(app.root_path, 'static'),
|
|
'favicon.ico', mimetype='image/vnd.microsoft.icon')
|
|
|
|
|
|
@app.route('/api/enqueue', methods=['POST'])
|
|
@nocache
|
|
def enqueue():
|
|
if not request.json:
|
|
abort(400)
|
|
client_id = request.json['client_id']
|
|
if not helpers.is_valid_uuid(client_id):
|
|
abort(400)
|
|
name = request.json['name'].strip()
|
|
song_id = request.json['id']
|
|
if request.authorization:
|
|
entry_id = database.add_entry(name, song_id, client_id)
|
|
return Response(f"""{{"status":"OK", "entry_id":{entry_id}}}""", mimetype='text/json')
|
|
else:
|
|
if helpers.get_accept_entries(app):
|
|
if not request.json:
|
|
abort(400)
|
|
client_id = request.json['client_id']
|
|
if not helpers.is_valid_uuid(client_id):
|
|
abort(400)
|
|
name = request.json['name']
|
|
song_id = request.json['id']
|
|
if database.check_queue_length() < int(app.config['MAX_QUEUE']):
|
|
if database.check_entry_quota(client_id) < int(app.config['ENTRY_QUOTA']):
|
|
entry_id = database.add_entry(name, song_id, client_id)
|
|
return Response(f"""{{"status":"OK", "entry_id":{entry_id}}}""", mimetype='text/json')
|
|
else:
|
|
return Response('{"status":"Du hast bereits ' + str(database.check_entry_quota(client_id)) + ' Songs eingetragen, dies ist das Maximum an Einträgen die du in der Warteliste haben kannst."}', mimetype='text/json', status=423)
|
|
else:
|
|
return Response('{"status":"Die Warteschlange enthält momentan ' + str(database.check_queue_length()) + ' Einträge und ist lang genug, bitte versuche es noch einmal wenn ein paar Songs gesungen wurden."}', mimetype='text/json', status=423)
|
|
else:
|
|
return Response('{"status":"Currently not accepting entries"}', mimetype='text/json', status=423)
|
|
|
|
|
|
@app.route("/list")
|
|
def songlist():
|
|
return render_template('songlist.html', list=database.get_song_list(), auth=basic_auth.authenticate(), debug=app.config['DEBUG'])
|
|
|
|
|
|
@app.route("/settings")
|
|
@nocache
|
|
@basic_auth.required
|
|
def settings():
|
|
return render_template('settings.html', app=app, auth=basic_auth.authenticate(), themes=helpers.get_themes(), debug=app.config['DEBUG'])
|
|
|
|
|
|
@app.route("/settings", methods=['POST'])
|
|
@nocache
|
|
@basic_auth.required
|
|
def settings_post():
|
|
entryquota = request.form.get("entryquota")
|
|
maxqueue = request.form.get("maxqueue")
|
|
theme = request.form.get("theme")
|
|
username = request.form.get("username")
|
|
password = request.form.get("password")
|
|
changed_credentials = False
|
|
if entryquota.isnumeric() and int(entryquota) > 0: # type: ignore
|
|
app.config['ENTRY_QUOTA'] = int(entryquota) # type: ignore
|
|
else:
|
|
abort(400)
|
|
if maxqueue.isnumeric and int(maxqueue) > 0: # type: ignore
|
|
app.config['MAX_QUEUE'] = int(maxqueue) # type: ignore
|
|
else:
|
|
abort(400)
|
|
if theme is not None and theme in helpers.get_themes():
|
|
helpers.set_theme(app, theme)
|
|
else:
|
|
abort(400)
|
|
if username != "" and username != app.config['BASIC_AUTH_USERNAME']:
|
|
app.config['BASIC_AUTH_USERNAME'] = username
|
|
changed_credentials = True
|
|
if password != "":
|
|
app.config['BASIC_AUTH_PASSWORD'] = password
|
|
changed_credentials = True
|
|
helpers.persist_config(app=app)
|
|
if changed_credentials:
|
|
return redirect("/")
|
|
else:
|
|
return render_template('settings.html', app=app, auth=basic_auth.authenticate(), themes=helpers.get_themes(), debug=app.config['DEBUG'])
|
|
|
|
|
|
@app.route("/api/queue")
|
|
@nocache
|
|
def queue_json():
|
|
list = data_adapters.dict_from_rows(database.get_list())
|
|
return Response(json.dumps(list, ensure_ascii=False).encode('utf-8'), mimetype='text/json')
|
|
|
|
|
|
@app.route("/plays")
|
|
@nocache
|
|
@basic_auth.required
|
|
def played_list():
|
|
return render_template('played_list.html', list=database.get_played_list(), auth=basic_auth.authenticate(), debug=app.config['DEBUG'])
|
|
|
|
|
|
@app.route("/api/songs")
|
|
@nocache
|
|
def songs():
|
|
list = database.get_song_list()
|
|
return Response(json.dumps(list, ensure_ascii=False).encode('utf-8'), mimetype='text/json')
|
|
|
|
|
|
@app.route("/api/songs/update")
|
|
@nocache
|
|
@basic_auth.required
|
|
def update_songs():
|
|
database.delete_all_entries()
|
|
helpers.reset_current_event_id(app)
|
|
status = database.import_songs(
|
|
helpers.get_songs(helpers.get_catalog_url()))
|
|
print(status)
|
|
return Response('{"status": "%s" }' % status, mimetype='text/json')
|
|
|
|
|
|
@app.route("/api/songs/compl") # type: ignore
|
|
@nocache
|
|
def get_song_completions(input_string=""):
|
|
input_string = request.args.get('search', input_string)
|
|
if input_string != "":
|
|
result = [list(x) for x in database.get_song_completions(input_string=input_string)]
|
|
return jsonify(result)
|
|
|
|
else:
|
|
return 400
|
|
|
|
|
|
@app.route("/api/songs/search")
|
|
@nocache
|
|
def query_songs_with_details(input_string=""):
|
|
input_string = request.args.get("q", input_string)
|
|
if input_string == "":
|
|
return Response(status=400)
|
|
result = []
|
|
for x in database.get_songs_with_details(input_string):
|
|
# Turn row into dict. Add field labels.
|
|
result.append(dict(zip(['karafun_id', 'title', 'artist', 'year', 'duo', 'explicit', 'styles', 'languages'], x)))
|
|
return jsonify(result)
|
|
|
|
|
|
@app.route("/api/songs/suggest")
|
|
@nocache
|
|
def query_songs_with_details_suggest(input_string=""):
|
|
input_string = request.args.get("count", input_string)
|
|
if input_string == "":
|
|
return Response(status=400)
|
|
result = []
|
|
if not input_string.isnumeric():
|
|
return Response(status=400)
|
|
count: int = int(input_string)
|
|
for x in database.get_song_suggestions(count):
|
|
# Turn row into dict. Add field labels.
|
|
result.append(dict(zip(['karafun_id', 'title', 'artist', 'year', 'duo', 'explicit', 'styles', 'languages'], x)))
|
|
return jsonify(result)
|
|
|
|
|
|
@app.route("/api/songs/stats")
|
|
@nocache
|
|
# Return the data from long_term_stats as json
|
|
def get_stats():
|
|
db_result = database.get_long_term_stats()
|
|
data = []
|
|
for row in db_result:
|
|
data.append(dict(zip(['id', 'count'], row)))
|
|
return jsonify(data)
|
|
|
|
|
|
@app.route("/api/songs/stats.csv")
|
|
@nocache
|
|
# Return data from long_term_stats as csv
|
|
def get_stats_csv():
|
|
db_result = database.get_long_term_stats()
|
|
csv = "Id,Playbacks\n"
|
|
for row in db_result:
|
|
csv += str(row[0]) + "," + str(row[1]) + "\n"
|
|
return Response(csv, mimetype='text/csv')
|
|
|
|
|
|
@app.route("/api/songs/stats.csv", methods=['POST'])
|
|
@nocache
|
|
@basic_auth.required
|
|
# Update long_term_stats from csv
|
|
def update_stats_csv():
|
|
if not request.files:
|
|
abort(400)
|
|
file = request.files['file']
|
|
if file.filename is None:
|
|
abort(400)
|
|
else:
|
|
filename = secure_filename(file.filename)
|
|
if filename == '':
|
|
abort(400)
|
|
if not filename.endswith('.csv'):
|
|
abort(400)
|
|
if file:
|
|
if database.import_stats(file):
|
|
return Response('{"status": "OK"}', mimetype='text/json')
|
|
else:
|
|
return Response('{"status": "FAIL"}', mimetype='text/json', status=400)
|
|
else:
|
|
abort(400)
|
|
|
|
|
|
@app.route("/api/songs/details/<song_id>")
|
|
def get_song_details(song_id):
|
|
result = database.get_song_details(song_id)
|
|
if result is None:
|
|
abort(404)
|
|
else:
|
|
return jsonify(dict(zip(['karafun_id', 'title', 'artist', 'year', 'duo', 'explicit', 'styles', 'languages'], result[0])))
|
|
|
|
|
|
@app.route("/api/entries/delete/<entry_id>", methods=['GET'])
|
|
@nocache
|
|
@basic_auth.required
|
|
def delete_entry_admin(entry_id):
|
|
if database.delete_entry(entry_id):
|
|
return Response('{"status": "OK"}', mimetype='text/json')
|
|
else:
|
|
return Response('{"status": "FAIL"}', mimetype='text/json')
|
|
|
|
|
|
@app.route("/api/entries/delete/<entry_id>", methods=['POST'])
|
|
@nocache
|
|
def delete_entry_user(entry_id):
|
|
if not request.json:
|
|
abort(400)
|
|
client_id = request.json['client_id']
|
|
if not helpers.is_valid_uuid(client_id):
|
|
abort(400)
|
|
if database.get_raw_entry(entry_id)[3] != client_id: # type: ignore
|
|
abort(403)
|
|
if database.delete_entry(entry_id):
|
|
return Response('{"status": "OK"}', mimetype='text/json')
|
|
else:
|
|
return Response('{"status": "FAIL"}', mimetype='text/json')
|
|
|
|
|
|
@app.route("/api/entries/delete", methods=['POST'])
|
|
@nocache
|
|
@basic_auth.required
|
|
def delete_entries():
|
|
if not request.json:
|
|
abort(400)
|
|
return
|
|
updates = database.delete_entries(request.json)
|
|
if updates >= 0:
|
|
return Response('{"status": "OK", "updates": ' + str(updates) + '}', mimetype='text/json')
|
|
else:
|
|
return Response('{"status": "FAIL"}', mimetype='text/json', status=400)
|
|
|
|
|
|
@app.route("/api/entries/mark_sung/<entry_id>")
|
|
@nocache
|
|
@basic_auth.required
|
|
def mark_sung(entry_id):
|
|
if database.add_sung_song(entry_id):
|
|
return Response('{"status": "OK"}', mimetype='text/json')
|
|
else:
|
|
return Response('{"status": "FAIL"}', mimetype='text/json')
|
|
|
|
|
|
@app.route("/api/entries/mark_transferred/<entry_id>")
|
|
@nocache
|
|
@basic_auth.required
|
|
def mark_transferred(entry_id):
|
|
if database.toggle_transferred(entry_id):
|
|
return Response('{"status": "OK"}', mimetype='text/json')
|
|
else:
|
|
return Response('{"status": "FAIL"}', mimetype='text/json')
|
|
|
|
|
|
@app.route("/api/entries/accept/<value>")
|
|
@nocache
|
|
@basic_auth.required
|
|
def set_accept_entries(value):
|
|
if (value == '0' or value == '1'):
|
|
helpers.set_accept_entries(app, bool(int(value)))
|
|
return Response('{"status": "OK"}', mimetype='text/json')
|
|
else:
|
|
return Response('{"status": "FAIL"}', mimetype='text/json', status=400)
|
|
|
|
|
|
@app.route("/api/entries/accept")
|
|
@nocache
|
|
def get_accept_entries():
|
|
accept_entries = helpers.get_accept_entries(app)
|
|
return Response('{"status": "OK", "value": ' + str(int(accept_entries)) + '}', mimetype='text/json')
|
|
|
|
|
|
@app.route("/api/event/close")
|
|
@nocache
|
|
@basic_auth.required
|
|
def close_event():
|
|
try:
|
|
database.transfer_playbacks()
|
|
database.clear_played_songs()
|
|
database.delete_all_entries()
|
|
helpers.reset_current_event_id(app)
|
|
return Response('{"status": "OK"}', mimetype='text/json')
|
|
except Exception:
|
|
response = jsonify({"status": "FAIL", "message": "An error occured while closing the event."})
|
|
response.status_code = 400
|
|
return response
|
|
|
|
|
|
@app.route("/api/entries/delete_all")
|
|
@nocache
|
|
@basic_auth.required
|
|
def delete_all_entries():
|
|
if database.delete_all_entries():
|
|
helpers.reset_current_event_id(app)
|
|
return Response('{"status": "OK"}', mimetype='text/json')
|
|
else:
|
|
return Response('{"status": "FAIL"}', mimetype='text/json')
|
|
|
|
|
|
@app.route("/login")
|
|
@basic_auth.required
|
|
def admin():
|
|
return redirect("/", code=303)
|
|
|
|
|
|
@app.route("/api/events/current")
|
|
@nocache
|
|
def get_current_event():
|
|
return Response('{"status": "OK", "event": "' + helpers.get_current_event_id(app) + '"}', mimetype='text/json')
|
|
|
|
|
|
def activate_job():
|
|
with app.app_context():
|
|
helpers.load_dbconfig(app)
|
|
helpers.load_version(app)
|
|
database.create_schema()
|
|
database.create_entry_table()
|
|
database.create_song_table()
|
|
database.create_done_song_table()
|
|
database.create_list_view()
|
|
database.create_done_song_view()
|
|
database.create_config_table()
|
|
helpers.setup_config(app)
|
|
|
|
|
|
@app.after_request
|
|
def add_header(response):
|
|
"""
|
|
Add headers to both force latest IE rendering engine or Chrome Frame,
|
|
and also to cache the rendered page for 10 minutes.
|
|
"""
|
|
if 'Cache-Control' not in response.headers:
|
|
response.headers['Cache-Control'] = 'private, max-age=600, no-cache, must-revalidate'
|
|
return response
|
|
|
|
|
|
@app.context_processor
|
|
def inject_version():
|
|
return dict(karaoqueue_version=app.config['VERSION'])
|
|
|
|
|
|
# Perform setup here so it will be executed when the module is imported by the WSGI server.
|
|
activate_job()
|