304 lines
9.7 KiB
Python
304 lines
9.7 KiB
Python
__version__ = "pre-dev"
|
||
__all__ = ["quizify"]
|
||
__author__ = "SimolZimol"
|
||
|
||
from flask import Flask, redirect, request, session, url_for, render_template
|
||
import os
|
||
import spotipy
|
||
from spotipy.oauth2 import SpotifyOAuth
|
||
import random
|
||
from difflib import SequenceMatcher
|
||
import re
|
||
import json
|
||
|
||
app = Flask(__name__)
|
||
app.secret_key = os.getenv("SECRET_KEY")
|
||
|
||
# Erweiterte Berechtigungen für Web Playback SDK
|
||
SCOPE = "user-library-read playlist-read-private streaming user-read-email user-read-private"
|
||
|
||
def get_locale():
|
||
return os.getenv("LANG", "de-DE")
|
||
|
||
def get_translations():
|
||
lang = get_locale()
|
||
path = os.path.join(os.path.dirname(__file__), "locales", f"{lang}.json")
|
||
try:
|
||
with open(path, encoding="utf-8") as f:
|
||
return json.load(f)
|
||
except Exception:
|
||
# Fallback auf Deutsch
|
||
with open(os.path.join(os.path.dirname(__file__), "locales", "de-DE.json"), encoding="utf-8") as f:
|
||
return json.load(f)
|
||
|
||
def get_spotify_client():
|
||
token_info = session.get("token_info", None)
|
||
if not token_info:
|
||
# Kein Token, redirect handled elsewhere
|
||
return None
|
||
# Prüfen, ob Token abgelaufen ist
|
||
sp_oauth = SpotifyOAuth(
|
||
client_id=os.getenv("SPOTIPY_CLIENT_ID"),
|
||
client_secret=os.getenv("SPOTIPY_CLIENT_SECRET"),
|
||
redirect_uri=os.getenv("SPOTIPY_REDIRECT_URI"),
|
||
scope=SCOPE,
|
||
cache_path=".cache"
|
||
)
|
||
if sp_oauth.is_token_expired(token_info):
|
||
token_info = sp_oauth.refresh_access_token(token_info['refresh_token'])
|
||
session["token_info"] = token_info
|
||
return spotipy.Spotify(auth=token_info['access_token'])
|
||
|
||
def similarity(a, b):
|
||
return SequenceMatcher(None, a.lower(), b.lower()).ratio()
|
||
|
||
def clean_title(title):
|
||
# Entfernt alles in () oder []
|
||
title = re.sub(r"(\s*[\(\[][^)\]]*[\)\]])", "", title)
|
||
# Vereinheitliche Apostrophen und Anführungszeichen
|
||
title = title.replace("’", "'").replace("‘", "'").replace("`", "'")
|
||
title = title.replace('"', '').replace("„", '').replace("“", '').replace("”", '')
|
||
title = title.replace("'", "") # Optional: alle Apostrophen entfernen
|
||
return title.strip()
|
||
|
||
def get_all_playlist_tracks(sp, playlist_id):
|
||
tracks = []
|
||
offset = 0
|
||
limit = 100
|
||
while True:
|
||
response = sp.playlist_items(playlist_id, additional_types=["track"], limit=limit, offset=offset)
|
||
items = response["items"]
|
||
if not items:
|
||
break
|
||
tracks.extend([item["track"] for item in items if item.get("track")])
|
||
if len(items) < limit:
|
||
break
|
||
offset += limit
|
||
return tracks
|
||
|
||
@app.route("/")
|
||
def home():
|
||
return render_template("login.html", translations=get_translations())
|
||
|
||
@app.route("/login")
|
||
def login():
|
||
sp_oauth = SpotifyOAuth(
|
||
client_id=os.getenv("SPOTIPY_CLIENT_ID"),
|
||
client_secret=os.getenv("SPOTIPY_CLIENT_SECRET"),
|
||
redirect_uri=os.getenv("SPOTIPY_REDIRECT_URI"),
|
||
scope=SCOPE
|
||
)
|
||
auth_url = sp_oauth.get_authorize_url()
|
||
return redirect(auth_url)
|
||
|
||
@app.route("/callback")
|
||
def callback():
|
||
sp_oauth = SpotifyOAuth(
|
||
client_id=os.getenv("SPOTIPY_CLIENT_ID"),
|
||
client_secret=os.getenv("SPOTIPY_CLIENT_SECRET"),
|
||
redirect_uri=os.getenv("SPOTIPY_REDIRECT_URI"),
|
||
scope=SCOPE
|
||
)
|
||
session.clear()
|
||
code = request.args.get('code')
|
||
token_info = sp_oauth.get_access_token(code)
|
||
session["token_info"] = token_info
|
||
|
||
# Hole User-Infos und speichere sie in der Session
|
||
sp = spotipy.Spotify(auth=token_info['access_token'])
|
||
user = sp.current_user()
|
||
session["user"] = user
|
||
|
||
return redirect("/playlists")
|
||
|
||
@app.route("/playlists")
|
||
def playlists():
|
||
# Sicherstellen, dass der Benutzer eingeloggt ist und ein gültiges Token vorhanden ist
|
||
token_info = session.get('token_info', None)
|
||
if not token_info:
|
||
return redirect('/login')
|
||
|
||
sp = get_spotify_client()
|
||
if not sp:
|
||
return redirect('/login')
|
||
|
||
playlists = sp.current_user_playlists()["items"]
|
||
user = session.get('user')
|
||
access_token = token_info.get('access_token')
|
||
return render_template("playlists.html", playlists=playlists, translations=get_translations(), user=user, access_token=access_token)
|
||
|
||
@app.route("/quiz/<playlist_id>")
|
||
def quiz(playlist_id):
|
||
game_mode = request.args.get('mode', 'artist')
|
||
|
||
sp = get_spotify_client()
|
||
tracks = get_all_playlist_tracks(sp, playlist_id)
|
||
|
||
if not tracks:
|
||
return "Keine Tracks verfügbar!"
|
||
|
||
played_tracks = session.get(f'played_tracks_{playlist_id}', [])
|
||
score = session.get(f'score_{playlist_id}', 0)
|
||
|
||
# Wenn alle Songs gespielt wurden, played_tracks zurücksetzen, Score bleibt!
|
||
available_tracks = [t for t in tracks if t["id"] not in played_tracks]
|
||
if not available_tracks:
|
||
played_tracks = []
|
||
available_tracks = tracks
|
||
# Score NICHT zurücksetzen!
|
||
|
||
track = random.choice(available_tracks)
|
||
played_tracks.append(track["id"])
|
||
session[f'played_tracks_{playlist_id}'] = played_tracks
|
||
session[f'score_{playlist_id}'] = score
|
||
|
||
# Für die Anzeige der beantworteten Fragen
|
||
answered = len(played_tracks) - 1 if len(played_tracks) > 0 else 0
|
||
|
||
token_info = session.get('token_info', None)
|
||
if not token_info:
|
||
return redirect('/login')
|
||
access_token = token_info['access_token']
|
||
|
||
all_tracks = []
|
||
for item in tracks:
|
||
track_info = {
|
||
"id": item["id"],
|
||
"name": item["name"],
|
||
"artist": item["artists"][0]["name"],
|
||
"uri": item["uri"],
|
||
"release_date": item.get("album", {}).get("release_date", "Unbekannt")[:4]
|
||
}
|
||
all_tracks.append(track_info)
|
||
|
||
return render_template(
|
||
"quiz.html",
|
||
track=track,
|
||
access_token=access_token,
|
||
playlist_id=playlist_id,
|
||
game_mode=game_mode,
|
||
all_tracks=all_tracks,
|
||
question_number=len(played_tracks),
|
||
total_questions=len(tracks),
|
||
score=score,
|
||
answered=answered,
|
||
translations=get_translations()
|
||
)
|
||
|
||
@app.route("/search_track", methods=["POST"])
|
||
def search_track():
|
||
data = request.json
|
||
query = data.get('query', '').lower()
|
||
all_tracks = data.get('all_tracks', [])
|
||
|
||
if not query or not all_tracks:
|
||
return {"results": []}
|
||
|
||
# Suche nach bereinigtem Titel und Künstler
|
||
results = []
|
||
for track in all_tracks:
|
||
cleaned_name = clean_title(track["name"])
|
||
cleaned_query = clean_title(query)
|
||
name_similarity = similarity(cleaned_query, cleaned_name)
|
||
artist_similarity = similarity(query, track["artist"])
|
||
|
||
# Wenn Name oder Künstler zu 80% übereinstimmt
|
||
if name_similarity >= 0.8 or artist_similarity >= 0.8:
|
||
results.append({
|
||
"id": track["id"],
|
||
"name": track["name"],
|
||
"artist": track["artist"],
|
||
"uri": track["uri"],
|
||
"similarity": max(name_similarity, artist_similarity)
|
||
})
|
||
|
||
# Nach Ähnlichkeit sortieren
|
||
results.sort(key=lambda x: x["similarity"], reverse=True)
|
||
|
||
return {"results": results}
|
||
|
||
@app.route("/check_answer", methods=["POST"])
|
||
def check_answer():
|
||
data = request.json
|
||
guess = data.get('guess', '').lower()
|
||
correct_answer = data.get('correct_answer', '').lower()
|
||
game_mode = data.get('game_mode', 'artist')
|
||
playlist_id = data.get('playlist_id')
|
||
|
||
if game_mode == 'title':
|
||
guess = clean_title(guess)
|
||
correct_answer = clean_title(correct_answer)
|
||
|
||
if game_mode == 'year':
|
||
is_correct = guess == correct_answer
|
||
else:
|
||
is_correct = similarity(guess, correct_answer) >= 0.9
|
||
|
||
# Score erhöhen, wenn richtig
|
||
if is_correct and playlist_id:
|
||
key = f'score_{playlist_id}'
|
||
session[key] = session.get(key, 0) + 1
|
||
|
||
return {
|
||
"correct": is_correct,
|
||
"correct_answer": correct_answer
|
||
}
|
||
|
||
@app.route("/play_track", methods=["POST"])
|
||
def play_track():
|
||
device_id = request.args.get('device_id')
|
||
track_uri = request.args.get('track_uri')
|
||
position_ms = int(request.args.get('position_ms', 0))
|
||
|
||
if not device_id or not track_uri:
|
||
return {"error": "Missing device_id or track_uri"}, 400
|
||
|
||
sp = get_spotify_client()
|
||
sp.start_playback(device_id=device_id, uris=[track_uri], position_ms=position_ms)
|
||
|
||
return {"success": True}
|
||
|
||
@app.route("/toggle_playback", methods=["POST"])
|
||
def toggle_playback():
|
||
data = request.json
|
||
device_id = data.get('device_id')
|
||
|
||
if not device_id:
|
||
return {"error": "Missing device_id"}, 400
|
||
|
||
sp = get_spotify_client()
|
||
# Playback-Status für das richtige Gerät prüfen
|
||
current_playback = sp.current_playback()
|
||
is_playing = False
|
||
if current_playback and current_playback.get('device', {}).get('id') == device_id:
|
||
is_playing = current_playback.get('is_playing', False)
|
||
|
||
if is_playing:
|
||
sp.pause_playback(device_id=device_id)
|
||
else:
|
||
sp.start_playback(device_id=device_id)
|
||
|
||
return {"success": True}
|
||
|
||
@app.route('/logout')
|
||
def logout():
|
||
session.clear()
|
||
return redirect(url_for('home'))
|
||
|
||
@app.route('/')
|
||
def index():
|
||
user = session.get('user') # Benutzerinfos aus der Session holen, falls vorhanden
|
||
return render_template('index.html', user=user, translations=get_translations())
|
||
|
||
@app.route("/reset_quiz/<playlist_id>")
|
||
def reset_quiz(playlist_id):
|
||
session.pop(f'played_tracks_{playlist_id}', None)
|
||
session.pop(f'score_{playlist_id}', None)
|
||
next_mode = request.args.get('next_mode')
|
||
if next_mode:
|
||
return redirect(url_for('quiz', playlist_id=playlist_id, mode=next_mode))
|
||
return redirect(url_for('playlists'))
|
||
|
||
if __name__ == "__main__":
|
||
app.run(host="0.0.0.0", port=5000, debug=True)
|