Files
hoi4botdc/app.py
2025-10-30 22:21:55 +01:00

1645 lines
66 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import discord
from discord.ext import commands
import os
import asyncio
import logging
from dotenv import load_dotenv
import aiomysql
import json
from datetime import datetime
from typing import Optional, List, Dict
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(level=logging.INFO)
# Database configuration
DATABASE_URL = os.getenv('DATABASE_URL')
DB_HOST = os.getenv('DB_HOST')
DB_PORT = os.getenv('DB_PORT', '5432') # Default to PostgreSQL port
DB_NAME = os.getenv('DB_NAME')
DB_USER = os.getenv('DB_USER')
DB_PASSWORD = os.getenv('DB_PASSWORD')
# Build DATABASE_URL from individual components if not provided
if not DATABASE_URL and all([DB_HOST, DB_NAME, DB_USER, DB_PASSWORD]):
DATABASE_URL = f"mysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
print(f"📝 Built DATABASE_URL from individual environment variables")
# Parse MySQL connection details from DATABASE_URL
def parse_database_url(url):
"""Parse MySQL connection URL into components"""
if not url:
return None
# Remove mysql:// prefix
if url.startswith('mysql://'):
url = url[8:]
# Split user:pass@host:port/db
if '@' in url:
auth, host_db = url.split('@', 1)
if ':' in auth:
user, password = auth.split(':', 1)
else:
user, password = auth, ''
else:
return None
if '/' in host_db:
host_port, database = host_db.split('/', 1)
else:
return None
if ':' in host_port:
host, port = host_port.split(':', 1)
try:
port = int(port)
except ValueError:
port = 3306
else:
host, port = host_port, 3306
return {
'host': host,
'port': port,
'user': user,
'password': password,
'db': database
}
# Global database connection pool
db_pool = None
# Bot configuration
intents = discord.Intents.default()
intents.message_content = True
intents.guilds = True
intents.members = True
bot = commands.Bot(command_prefix='!', intents=intents)
@bot.event
async def on_ready():
"""Event triggered when the bot is ready"""
print(f'{bot.user} is online and ready!')
print(f'Bot ID: {bot.user.id}')
print(f'Discord.py Version: {discord.__version__}')
print('------')
# Initialize database
await init_database()
# Set bot status
await bot.change_presence(
activity=discord.Game(name="Hearts of Iron IV ELO"),
status=discord.Status.online
)
# Sync hybrid commands on startup
try:
synced = await bot.tree.sync()
print(f'Synced {len(synced)} hybrid commands')
except Exception as e:
print(f'Failed to sync commands: {e}')
@bot.event
async def on_guild_join(guild):
"""Event triggered when the bot joins a server"""
print(f'Bot joined server "{guild.name}" (ID: {guild.id})')
@bot.event
async def on_guild_remove(guild):
"""Event triggered when the bot leaves a server"""
print(f'Bot left server "{guild.name}" (ID: {guild.id})')
# Owner Configuration
OWNER_ID = 253922739709018114
# ELO Configuration
STARTING_ELO = 800
K_FACTOR = 32
T_LEVEL_MULTIPLIERS = {
1: 0.8, # T1 countries get less points
2: 1.0, # T2 countries get normal points
3: 1.2 # T3 countries get more points
}
# Preferred team emoji overrides (will be used if no guild/file match is found)
TEAM_EMOTE_OVERRIDES: Dict[str, str] = {
"axis": "<:fascism:1432391685127536762>",
"allies": "<:democracy:1432391686612586528>",
"comintern": "<:communism:1432391682267025479>",
"cuf": "<:ChineseUnitedFront:1432422469985112156>",
"default": "<:neutrality:1432391681059197102>",
}
# Role mapping for ELO tiers (IDs provided by user)
# Assumption: Top tier is >= 900 ELO rather than strictly >900,
# to avoid leaving 900 unassigned. Adjust if you want 900 handled differently.
STANDARD_ELO_ROLE_IDS = {
"gte_900": 1432368177014374497,
"851_899": 1432368177014374496,
"801_850": 1432368177014374495,
"eq_800": 1432368177014374494,
"751_799": 1432368177014374493,
"701_750": 1432368177014374492,
"lt_700": 1432368177014374491,
}
COMPETITIVE_ELO_ROLE_IDS = {
"gte_900": 1432368177030893672,
"851_899": 1432368177030893671,
"801_850": 1432368177030893670,
"eq_800": 1432368177030893669,
"751_799": 1432368177030893668,
"701_750": 1432368177014374499,
"lt_700": 1432368177014374498,
}
def _role_id_for_elo(elo: int, category: str) -> Optional[int]:
"""Return the role ID for the given ELO and category ('standard'|'competitive')."""
ids = STANDARD_ELO_ROLE_IDS if category == "standard" else COMPETITIVE_ELO_ROLE_IDS
if elo >= 900:
return ids["gte_900"]
if 851 <= elo <= 899:
return ids["851_899"]
if 801 <= elo <= 850:
return ids["801_850"]
if elo == 800:
return ids["eq_800"]
if 751 <= elo <= 799:
return ids["751_799"]
if 701 <= elo <= 750:
return ids["701_750"]
# < 700
return ids["lt_700"]
def _category_role_ids(category: str) -> List[int]:
ids = STANDARD_ELO_ROLE_IDS if category == "standard" else COMPETITIVE_ELO_ROLE_IDS
return list(ids.values())
async def update_member_elo_role(member: discord.Member, elo: int, category: str, reason: Optional[str] = None):
"""Ensure the member has exactly one rank role for the given category matching their ELO.
- category: 'standard' or 'competitive'
"""
if member is None or member.guild is None:
return
try:
guild = member.guild
target_role_id = _role_id_for_elo(elo, category)
if not target_role_id:
return
target_role = guild.get_role(int(target_role_id))
if not target_role:
return # Role not found in this guild
# Remove any other roles from the same category
category_ids = set(_category_role_ids(category))
roles_to_remove = [r for r in member.roles if r.id in category_ids and r.id != target_role.id]
if roles_to_remove:
await member.remove_roles(*roles_to_remove, reason=reason)
# Add the target role if missing
if target_role not in member.roles:
await member.add_roles(target_role, reason=reason)
except discord.Forbidden:
# Missing permissions or role hierarchy issue
logging.warning(f"Insufficient permissions to modify roles for {member} in category {category}")
except discord.HTTPException as e:
logging.warning(f"Failed to update roles for {member}: {e}")
# Emoji and formatting helpers
def _all_accessible_emojis(ctx: commands.Context) -> List[discord.Emoji]:
"""Return emojis from current guild plus all guilds the bot is in."""
try:
guild_emojis = list(ctx.guild.emojis) if ctx.guild else []
except Exception:
guild_emojis = []
try:
bot_emojis = list(bot.emojis)
except Exception:
bot_emojis = []
return guild_emojis + bot_emojis
def find_custom_emoji(ctx: commands.Context, keyword_variants: List[str]) -> Optional[str]:
"""Try to find a custom emoji by a list of keyword variants (case-insensitive). Returns str(emoji) or None."""
emojis = _all_accessible_emojis(ctx)
for kw in keyword_variants:
kw = kw.lower()
for e in emojis:
try:
if kw in (e.name or '').lower():
return str(e)
except Exception:
continue
# Fallback to markdown-defined emojis if available
try:
if EMOTE_MAP:
for kw in keyword_variants:
key = kw.lower()
# Exact name match
if key in EMOTE_MAP:
return EMOTE_MAP[key]
# Substring match
for name_lower, mention in EMOTE_MAP.items():
if key in name_lower:
return mention
except NameError:
# EMOTE_MAP not defined yet
pass
return None
def get_t_emoji(ctx: commands.Context, t_level: int) -> str:
"""Return a suitable emoji for a T level, preferring custom emojis if present."""
mapping = {
1: ["hoi4_t1", "t1", "tier1"],
2: ["hoi4_t2", "t2", "tier2"],
3: ["hoi4_t3", "t3", "tier3"],
}
custom = find_custom_emoji(ctx, mapping.get(t_level, []))
if custom:
return custom
# Fallback unicode
return {1: "🔹", 2: "🔸", 3: "🔺"}.get(t_level, "🔹")
def get_team_emoji(ctx: commands.Context, team_name: str) -> str:
"""Return a team emoji using user's preferred overrides, with guild/file matches first."""
name = (team_name or "").lower()
def pick(keywords: List[str], override_key: str) -> str:
# Try specific keywords (ideology-first), then generic ones
custom = find_custom_emoji(ctx, keywords)
if custom:
return custom
# As an extra attempt, try a couple generic HOI4 icons
custom = find_custom_emoji(ctx, ["eagle_hoi", "peace_hoi", "navy_hoi", "secretweapon_hoi"])
if custom:
return custom
# Fall back to explicit override mapping
return TEAM_EMOTE_OVERRIDES.get(override_key, TEAM_EMOTE_OVERRIDES.get("default", "🎖️"))
if any(k in name for k in ["axis", "achse"]):
return pick(["fascism", "axis", "hoi4_axis"], "axis")
if any(k in name for k in ["allies", "alliierten", "ally"]):
return pick(["democracy", "allies", "hoi4_allies"], "allies")
if any(k in name for k in ["comintern", "ussr", "soviet"]):
return pick(["communism", "comintern", "hoi4_comintern"], "comintern")
# Chinese United Front (optional special case)
if ("chinese united front" in name) or ("chinese" in name and "front" in name) or ("cuf" in name):
return pick(["ChineseUnitedFront", "chineseunitedfront", "cuf"], "cuf")
# Default/other
custom = find_custom_emoji(ctx, ["neutrality", "hoi4", "hearts_of_iron", "iron"])
if custom:
return custom
return TEAM_EMOTE_OVERRIDES.get("default", "🎖️")
def _flag_from_iso2(code: str) -> Optional[str]:
"""Return unicode flag from 2-letter ISO code (e.g., 'DE' -> 🇩🇪)."""
if not code or len(code) != 2:
return None
code = code.upper()
base = 0x1F1E6
try:
return chr(base + ord(code[0]) - ord('A')) + chr(base + ord(code[1]) - ord('A'))
except Exception:
return None
# Emotes markdown loader and map
def load_emote_markdown(path: Optional[str] = None) -> Dict[str, str]:
"""Parse emotes.markdown and return a mapping of lowercased emoji names to their mention strings.
Expected line format: <:Name:123456789012345678>
Lines that don't match are ignored."""
if path is None:
base_dir = os.path.dirname(os.path.abspath(__file__))
path = os.path.join(base_dir, 'emotes.markdown')
mapping: Dict[str, str] = {}
try:
with open(path, 'r', encoding='utf-8') as f:
for raw in f:
line = raw.strip()
if not line or not line.startswith('<:') or ':' not in line[2:]:
continue
# Format is <:NAME:ID>
try:
inner = line[2:-1] if line.endswith('>') else line[2:]
name, emoji_id = inner.split(':', 1)
name = name.strip()
mention = f"<:{name}:{emoji_id.strip('>')}>"
mapping[name.lower()] = mention
except Exception:
continue
except FileNotFoundError:
# Silent if not present
pass
except Exception as e:
print(f"⚠️ Failed to load emotes.markdown: {e}")
return mapping
# Load emotes mapping at import
EMOTE_MAP: Dict[str, str] = load_emote_markdown()
if EMOTE_MAP:
print(f"😀 Loaded {len(EMOTE_MAP)} custom emojis from emotes.markdown")
def load_country_tags(path: Optional[str] = None) -> Dict[str, str]:
"""Load HOI4 country tags mapping from tags.txt.
Supported formats per line:
TAG=Country Name | TAG:Country Name | TAG,Country Name | TAG Country Name
Lines starting with # are ignored.
Returns dict like { 'GER': 'Germany', ... }"""
if path is None:
base_dir = os.path.dirname(os.path.abspath(__file__))
path = os.path.join(base_dir, 'tags.txt')
mapping: Dict[str, str] = {}
try:
with open(path, 'r', encoding='utf-8') as f:
for raw in f:
line = raw.strip()
if not line or line.startswith('#'):
continue
tag = None
name = None
for sep in ['=', ';', ',', ':']:
if sep in line:
left, right = line.split(sep, 1)
tag = left.strip().upper()
name = right.strip()
break
if tag is None:
parts = line.split(None, 1)
if len(parts) == 2:
tag = parts[0].strip().upper()
name = parts[1].strip()
else:
tag = line.strip().upper()
name = line.strip()
if tag and name:
mapping[tag] = name
except FileNotFoundError:
print(" tags.txt not found; proceeding without country tag labels")
except Exception as e:
print(f"⚠️ Failed to load tags.txt: {e}")
return mapping
# Loaded at import
COUNTRY_TAGS: Dict[str, str] = load_country_tags()
if COUNTRY_TAGS:
print(f"🗺️ Loaded {len(COUNTRY_TAGS)} HOI4 country tags")
def get_country_label(country_tag: Optional[str]) -> Optional[str]:
"""Return a display label like "[GER] Germany" if known, or "[GER]" if unknown."""
if not country_tag:
return None
tag = country_tag.strip().upper()
name = COUNTRY_TAGS.get(tag)
return f"[{tag}] {name}" if name else f"[{tag}]"
def get_country_emoji(ctx: commands.Context, country: Optional[str]) -> str:
"""Prefer custom emoji matching the HOI4 tag (e.g., ger, hoi4_ger). If parameter is ISO2, show unicode flag. Else empty."""
if not country:
return ""
c = country.strip()
# Try custom emoji lookups using tag variants
variants = [c, c.lower(), f"hoi4_{c.lower()}", f"country_{c.lower()}"]
custom = find_custom_emoji(ctx, variants)
if custom:
return custom
# If user passed ISO2, render unicode flag
if len(c) == 2:
flag = _flag_from_iso2(c)
if flag:
return flag
# Otherwise, no emoji fallback to avoid noisy globes
return ""
# Database Functions
# Database Functions
async def init_database():
"""Initialize database connection and create tables"""
global db_pool
try:
# Parse DATABASE_URL for MySQL connection
db_config = parse_database_url(DATABASE_URL)
if not db_config:
raise ValueError("Invalid DATABASE_URL format")
print(f"🔌 Connecting to MySQL: {db_config['host']}:{db_config['port']}/{db_config['db']}")
# Create MySQL connection pool
db_pool = await aiomysql.create_pool(
host=db_config['host'],
port=db_config['port'],
user=db_config['user'],
password=db_config['password'],
db=db_config['db'],
charset='utf8mb4',
autocommit=True,
maxsize=10
)
async with db_pool.acquire() as conn:
async with conn.cursor() as cursor:
# Create players table (MySQL syntax)
await cursor.execute('''
CREATE TABLE IF NOT EXISTS players (
id INT AUTO_INCREMENT PRIMARY KEY,
discord_id BIGINT UNIQUE NOT NULL,
username VARCHAR(255) NOT NULL,
standard_elo INT DEFAULT 800,
competitive_elo INT DEFAULT 800,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
)
''')
# Create games table (MySQL syntax)
await cursor.execute('''
CREATE TABLE IF NOT EXISTS games (
id INT AUTO_INCREMENT PRIMARY KEY,
game_name VARCHAR(255) NOT NULL,
game_type VARCHAR(50) NOT NULL,
status VARCHAR(50) DEFAULT 'setup',
players JSON NOT NULL,
winner_team VARCHAR(255),
notification_message_id BIGINT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
finished_at TIMESTAMP NULL
)
''')
# Add notification_message_id column if it doesn't exist (for existing databases)
try:
await cursor.execute('''
ALTER TABLE games
ADD COLUMN notification_message_id BIGINT NULL
''')
except:
# Column already exists, ignore error
pass
# Create game_results table (MySQL syntax)
await cursor.execute('''
CREATE TABLE IF NOT EXISTS game_results (
id INT AUTO_INCREMENT PRIMARY KEY,
game_id INT,
discord_id BIGINT NOT NULL,
team_name VARCHAR(255) NOT NULL,
t_level INT NOT NULL,
old_elo INT NOT NULL,
new_elo INT NOT NULL,
elo_change INT NOT NULL,
won BOOLEAN NOT NULL,
result_type VARCHAR(10) DEFAULT 'loss',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (game_id) REFERENCES games(id)
)
''')
# Add result_type column if it doesn't exist (for existing databases)
try:
await cursor.execute('''
ALTER TABLE game_results
ADD COLUMN result_type VARCHAR(10) DEFAULT 'loss'
''')
except:
# Column already exists, ignore error
pass
print("✅ Database initialized successfully")
except Exception as e:
print(f"❌ Database initialization failed: {e}")
import traceback
traceback.print_exc()
async def get_or_create_player(discord_id: int, username: str) -> Dict:
"""Get or create a player in the database"""
async with db_pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
# Try to get existing player
await cursor.execute(
"SELECT * FROM players WHERE discord_id = %s", (discord_id,)
)
player = await cursor.fetchone()
if not player:
# Create new player
await cursor.execute(
"INSERT INTO players (discord_id, username) VALUES (%s, %s)",
(discord_id, username)
)
await cursor.execute(
"SELECT * FROM players WHERE discord_id = %s", (discord_id,)
)
player = await cursor.fetchone()
else:
# Update username if changed
await cursor.execute(
"UPDATE players SET username = %s, updated_at = CURRENT_TIMESTAMP WHERE discord_id = %s",
(username, discord_id)
)
return dict(player)
def calculate_elo_change(player_elo: int, opponent_avg_elo: int, result: str, t_level: int) -> int:
"""Calculate ELO change using standard ELO formula with T-level multiplier
result can be 'win', 'loss', or 'draw'
In draws:
- If you're expected to win (higher ELO), you lose points for drawing
- If you're expected to lose (lower ELO), you gain points for drawing
- The bigger the ELO difference, the bigger the swing
"""
expected_score = 1 / (1 + 10 ** ((opponent_avg_elo - player_elo) / 400))
if result == 'win':
actual_score = 1.0
elif result == 'draw':
actual_score = 0.5 # Draw = 50% score
else: # loss
actual_score = 0.0
# Calculate the base ELO change
base_change = K_FACTOR * (actual_score - expected_score)
# Apply T-level multiplier
t_multiplier = T_LEVEL_MULTIPLIERS.get(t_level, 1.0)
final_change = base_change * t_multiplier
# For debugging/logging purposes, let's see what happens in draws
if result == 'draw':
elo_diff = player_elo - opponent_avg_elo
expected_percentage = expected_score * 100
print(f"📊 Draw calculation: Player ELO {player_elo} vs Opponent Avg {opponent_avg_elo}")
print(f" ELO Difference: {elo_diff:+d} | Expected win chance: {expected_percentage:.1f}%")
print(f" ELO change: {final_change:+.1f} (T{t_level} multiplier: {t_multiplier})")
return round(final_change)
# Owner only decorator
def is_owner():
def predicate(ctx):
return ctx.author.id == OWNER_ID
return commands.check(predicate)
# Owner Commands
@bot.hybrid_command(name='reload', description='Reloads the bot and syncs slash commands (Owner only)')
@is_owner()
async def reload_bot(ctx):
"""Reloads the bot and syncs slash commands (Owner only)"""
try:
print(f"🔄 Reload command started by {ctx.author} (ID: {ctx.author.id})")
# Send initial message
embed = discord.Embed(
title="🔄 Bot Reload",
description="Reloading bot and syncing commands...",
color=discord.Color.yellow()
)
message = await ctx.send(embed=embed)
print("📤 Initial reload message sent")
# Sync slash commands
print("🔄 Starting command sync...")
synced = await bot.tree.sync()
print(f"✅ Synced {len(synced)} commands successfully")
# Update embed with success
embed = discord.Embed(
title="✅ Bot Reloaded Successfully",
description=f"Bot has been reloaded!\nSynced {len(synced)} slash commands.",
color=discord.Color.green()
)
embed.add_field(name="Servers", value=len(bot.guilds), inline=True)
embed.add_field(name="Latency", value=f"{round(bot.latency * 1000)}ms", inline=True)
embed.set_footer(text=f"Reloaded by {ctx.author}", icon_url=ctx.author.avatar.url if ctx.author.avatar else None)
await message.edit(embed=embed)
print("✅ Reload completed successfully")
except Exception as e:
print(f"❌ Reload failed with error: {type(e).__name__}: {e}")
import traceback
traceback.print_exc()
embed = discord.Embed(
title="❌ Reload Failed",
description=f"**Error Type:** {type(e).__name__}\n**Error:** {str(e)[:1500]}",
color=discord.Color.red()
)
await ctx.send(embed=embed)
# Game notification channel
GAME_NOTIFICATION_CHANNEL_ID = 1432368177685332030
async def create_game_notification_embed(game_data: Dict, players_data: List[Dict]) -> discord.Embed:
"""Create an embed for game notifications"""
embed = discord.Embed(
title=f"🎮 {game_data['game_name']}",
description=f"**Type:** {game_data['game_type'].title()}\n**Status:** Setup Phase",
color=discord.Color.blue()
)
if not players_data:
embed.add_field(name="Players", value="No players yet", inline=False)
return embed
# Group players by team
teams = {}
for p in players_data:
team = p['team_name']
if team not in teams:
teams[team] = []
teams[team].append(p)
# Add team fields
for team_name, members in teams.items():
team_emoji = "🎖️" # Simple fallback since we don't have ctx here
avg_elo = sum(m['current_elo'] for m in members) / len(members) if members else 0
field_name = f"{team_emoji} {team_name} (avg {avg_elo:.0f})"
lines = []
for m in sorted(members, key=lambda mm: (-mm.get('t_level', 2), mm['username'])):
t_level = m.get('t_level', 2)
t_emoji = {1: "🔹", 2: "🔸", 3: "🔺"}.get(t_level, "🔹")
country = m.get('country')
country_text = f" [{country}]" if country else ""
lines.append(f"{t_emoji} {m['username']}{country_text} ({m['current_elo']})")
embed.add_field(name=field_name, value="\n".join(lines), inline=True)
embed.set_footer(text=f"Players: {len(players_data)} | Teams: {len(teams)}")
return embed
async def update_game_notification(game_data: Dict, players_data: List[Dict]):
"""Update or create game notification in the notification channel"""
try:
channel = bot.get_channel(GAME_NOTIFICATION_CHANNEL_ID)
if not channel:
return
embed = await create_game_notification_embed(game_data, players_data)
if game_data.get('notification_message_id'):
# Try to edit existing message
try:
message = await channel.fetch_message(game_data['notification_message_id'])
await message.edit(embed=embed)
except (discord.NotFound, discord.HTTPException):
# Message not found, create new one
message = await channel.send(embed=embed)
# Update DB with new message ID
async with db_pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute(
"UPDATE games SET notification_message_id = %s WHERE id = %s",
(message.id, game_data['id'])
)
else:
# Create new message
message = await channel.send(embed=embed)
# Store message ID in database
async with db_pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute(
"UPDATE games SET notification_message_id = %s WHERE id = %s",
(message.id, game_data['id'])
)
except Exception as e:
logging.warning(f"Failed to update game notification: {e}")
# HOI4 ELO Commands
@bot.hybrid_command(name='hoi4create', description='Create a new HOI4 game')
async def hoi4create(ctx, game_type: str, game_name: str):
"""Create a new HOI4 game"""
if game_type.lower() not in ['standard', 'competitive']:
await ctx.send("❌ Game type must be either 'standard' or 'competitive'")
return
try:
async with db_pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
# Check if game name already exists and is active
await cursor.execute(
"SELECT * FROM games WHERE game_name = %s AND status = 'setup'",
(game_name,)
)
existing_game = await cursor.fetchone()
if existing_game:
await ctx.send(f"❌ A game with name '{game_name}' is already in setup phase!")
return
# Create new game
await cursor.execute(
"INSERT INTO games (game_name, game_type, status, players) VALUES (%s, %s, 'setup', %s)",
(game_name, game_type.lower(), '[]')
)
# Get the created game data for notification
await cursor.execute(
"SELECT * FROM games WHERE game_name = %s AND game_type = %s ORDER BY id DESC LIMIT 1",
(game_name, game_type.lower())
)
game_data = await cursor.fetchone()
embed = discord.Embed(
title="🎮 Game Created",
description=f"HOI4 {game_type.title()} game '{game_name}' has been created!",
color=discord.Color.green()
)
embed.add_field(name="Game Name", value=game_name, inline=True)
embed.add_field(name="Type", value=game_type.title(), inline=True)
embed.add_field(name="Status", value="Setup Phase", inline=True)
embed.set_footer(text="Use /hoi4setup to add players to this game")
await ctx.send(embed=embed)
# Create notification in notification channel
if game_data:
await update_game_notification(dict(game_data), [])
except Exception as e:
await ctx.send(f"❌ Error creating game: {str(e)}")
@bot.hybrid_command(name='hoi4delete', description='Delete a game lobby')
async def hoi4delete(ctx, game_name: str):
"""Delete a game lobby that is in setup phase"""
try:
async with db_pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
# Get the game
await cursor.execute(
"SELECT * FROM games WHERE game_name = %s AND status = 'setup'",
(game_name,)
)
game = await cursor.fetchone()
if not game:
await ctx.send(f"❌ No active game found with name '{game_name}' in setup phase!")
return
# Delete the game
await cursor.execute(
"DELETE FROM games WHERE id = %s",
(game['id'],)
)
embed = discord.Embed(
title="🗑️ Game Deleted",
description=f"Game '{game_name}' has been deleted!",
color=discord.Color.red()
)
embed.add_field(name="Game Name", value=game_name, inline=True)
embed.add_field(name="Type", value=game['game_type'].title(), inline=True)
embed.set_footer(text=f"Deleted by {ctx.author}")
await ctx.send(embed=embed)
except Exception as e:
await ctx.send(f"❌ Error deleting game: {str(e)}")
@bot.hybrid_command(name='hoi4remove', description='Remove a player from an existing game')
async def hoi4remove(ctx, game_name: str, user: discord.Member):
"""Remove a player from an existing game"""
try:
async with db_pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
# Get the game
await cursor.execute(
"SELECT * FROM games WHERE game_name = %s AND status = 'setup'",
(game_name,)
)
game = await cursor.fetchone()
if not game:
await ctx.send(f"❌ No game found with name '{game_name}' in setup phase!")
return
# Parse existing players
players = json.loads(game['players']) if game['players'] else []
# Find and remove the player
player_found = False
new_players = []
for p in players:
if p['discord_id'] == user.id:
player_found = True
# Skip this player (don't add to new_players)
else:
new_players.append(p)
if not player_found:
await ctx.send(f"{user.display_name} is not in this game!")
return
# Update game with new player list
await cursor.execute(
"UPDATE games SET players = %s WHERE id = %s",
(json.dumps(new_players), game['id'])
)
embed = discord.Embed(
title="✅ Player Removed",
description=f"{user.display_name} has been removed from '{game_name}'!",
color=discord.Color.orange()
)
embed.add_field(name="Player", value=user.display_name, inline=True)
embed.add_field(name="Game", value=game_name, inline=True)
embed.add_field(name="Players Left", value=len(new_players), inline=True)
embed.set_footer(text=f"Removed by {ctx.author}")
await ctx.send(embed=embed)
# Update notification in notification channel
await update_game_notification(dict(game), new_players)
except Exception as e:
await ctx.send(f"❌ Error removing player: {str(e)}")
@bot.hybrid_command(name='hoi4setup', description='Add a player to an existing game')
async def hoi4setup(ctx, game_name: str, user: discord.Member, team_name: str, t_level: int, country: Optional[str] = None, modifier: Optional[str] = None):
"""Add a player to an existing game. Use modifier='--force' to bypass MP ban."""
if t_level not in [1, 2, 3]:
await ctx.send("❌ T-Level must be 1, 2, or 3")
return
# Check for MP ban role (unless --force is used)
MP_BAN_ROLE_ID = 1432368177052127353
if modifier != "--force":
mp_ban_role = discord.utils.get(user.roles, id=MP_BAN_ROLE_ID)
if mp_ban_role:
embed = discord.Embed(
title="🚫 Player Banned",
description=f"{user.display_name} is currently banned from multiplayer games!",
color=discord.Color.red()
)
embed.add_field(name="Banned Player", value=user.mention, inline=True)
embed.add_field(name="Ban Role", value=mp_ban_role.name, inline=True)
embed.set_footer(text="Contact an administrator if this is an error")
await ctx.send(embed=embed)
return
try:
async with db_pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
# Get the game
await cursor.execute(
"SELECT * FROM games WHERE game_name = %s AND status = 'setup'",
(game_name,)
)
game = await cursor.fetchone()
if not game:
await ctx.send(f"❌ No game found with name '{game_name}' in setup phase!")
return
# Get or create player
player = await get_or_create_player(user.id, user.display_name)
# Parse existing players
players = json.loads(game['players']) if game['players'] else []
# Check if player already in game
for p in players:
if p['discord_id'] == user.id:
await ctx.send(f"{user.display_name} is already in this game!")
return
# Add player to game
player_data = {
'discord_id': user.id,
'username': user.display_name,
'team_name': team_name,
't_level': t_level,
'current_elo': player[f"{game['game_type']}_elo"],
'country': country.strip() if country else None
}
players.append(player_data)
# Update game
await cursor.execute(
"UPDATE games SET players = %s WHERE id = %s",
(json.dumps(players), game['id'])
)
embed = discord.Embed(
title="✅ Player Added",
description=f"{user.display_name} has been added to '{game_name}'!",
color=discord.Color.green()
)
embed.add_field(name="Player", value=user.display_name, inline=True)
embed.add_field(name="Team", value=team_name, inline=True)
embed.add_field(name="T-Level", value=f"T{t_level}", inline=True)
embed.add_field(name="Current ELO", value=player[f"{game['game_type']}_elo"], inline=True)
if country:
flag = get_country_emoji(ctx, country)
label = get_country_label(country)
value = f"{flag} {label}".strip()
embed.add_field(name="Country", value=value, inline=True)
embed.add_field(name="Players in Game", value=len(players), inline=True)
await ctx.send(embed=embed)
# Update notification in notification channel
await update_game_notification(dict(game), players)
except Exception as e:
await ctx.send(f"❌ Error adding player: {str(e)}")
@bot.hybrid_command(name='hoi4end', description='End a game and calculate ELO changes')
async def hoi4end(ctx, game_name: str, winner_team: str):
"""End a game and calculate ELO changes. Use 'draw' for ties."""
try:
async with db_pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
# Get the game
await cursor.execute(
"SELECT * FROM games WHERE game_name = %s AND status = 'setup'",
(game_name,)
)
game = await cursor.fetchone()
if not game:
await ctx.send(f"❌ No active game found with name '{game_name}'!")
return
players = json.loads(game['players']) if game['players'] else []
if len(players) < 2:
await ctx.send("❌ Game needs at least 2 players to end!")
return
# Check if winner team exists or if it's a draw
teams = {p['team_name'] for p in players}
is_draw = winner_team.lower() == 'draw'
if not is_draw and winner_team not in teams:
available_teams = ', '.join(teams)
await ctx.send(f"❌ Team '{winner_team}' not found in game! Available teams: {available_teams}, draw")
return
# Calculate team averages
team_elos = {}
team_players = {}
for player in players:
team = player['team_name']
if team not in team_elos:
team_elos[team] = []
team_players[team] = []
team_elos[team].append(player['current_elo'])
team_players[team].append(player)
# Calculate average ELOs for each team
team_averages = {team: sum(elos) / len(elos) for team, elos in team_elos.items()}
elo_changes = []
# Calculate ELO changes for each player
for player in players:
team = player['team_name']
# Determine result for this player
if is_draw:
result = 'draw'
elif team == winner_team:
result = 'win'
else:
result = 'loss'
# Calculate opponent average (average of all other teams)
opponent_elos = []
for other_team, elos in team_elos.items():
if other_team != team:
opponent_elos.extend(elos)
opponent_avg = sum(opponent_elos) / len(opponent_elos) if opponent_elos else player['current_elo']
elo_change = calculate_elo_change(
player['current_elo'],
opponent_avg,
result,
player['t_level']
)
new_elo = max(0, player['current_elo'] + elo_change) # Prevent negative ELO
elo_changes.append({
'discord_id': player['discord_id'],
'username': player['username'],
'team_name': team,
't_level': player['t_level'],
'old_elo': player['current_elo'],
'new_elo': new_elo,
'elo_change': elo_change,
'result': result
})
# Update player ELOs and save game results
for change in elo_changes:
# Update player ELO
elo_field = f"{game['game_type']}_elo"
await cursor.execute(
f"UPDATE players SET {elo_field} = %s, updated_at = CURRENT_TIMESTAMP WHERE discord_id = %s",
(change['new_elo'], change['discord_id'])
)
# Save game result
won = change['result'] == 'win'
await cursor.execute(
"""INSERT INTO game_results
(game_id, discord_id, team_name, t_level, old_elo, new_elo, elo_change, won, result_type)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)""",
(game['id'], change['discord_id'], change['team_name'],
change['t_level'], change['old_elo'], change['new_elo'],
change['elo_change'], won, change['result'])
)
# Mark game as finished
final_result = "Draw" if is_draw else winner_team
await cursor.execute(
"UPDATE games SET status = 'finished', winner_team = %s, finished_at = CURRENT_TIMESTAMP WHERE id = %s",
(final_result, game['id'])
)
# After DB updates, try to sync Discord roles for affected players (only for this game's category)
try:
guild = ctx.guild
if guild:
for change in elo_changes:
member = guild.get_member(change['discord_id'])
if member is None:
try:
member = await guild.fetch_member(change['discord_id'])
except Exception:
member = None
if member:
await update_member_elo_role(
member,
change['new_elo'],
game['game_type'],
reason=f"HOI4 {game['game_type']} ELO updated in '{game_name}'"
)
except Exception as e:
logging.warning(f"Role sync after game end failed: {e}")
# Create result embed
if is_draw:
embed = discord.Embed(
title="🤝 Game Finished!",
description=f"Game '{game_name}' has ended!\n**Result: Draw**",
color=discord.Color.orange()
)
else:
embed = discord.Embed(
title="🏆 Game Finished!",
description=f"Game '{game_name}' has ended!\n**Winner: {winner_team}**",
color=discord.Color.gold()
)
# Group results by team
teams_results = {}
for change in elo_changes:
team = change['team_name']
if team not in teams_results:
teams_results[team] = []
teams_results[team].append(change)
for team, team_changes in teams_results.items():
team_text = ""
for change in team_changes:
emoji = "📈" if change['elo_change'] > 0 else "📉" if change['elo_change'] < 0 else "➡️"
result_emoji = ""
if change['result'] == 'win':
result_emoji = "🏆"
elif change['result'] == 'draw':
result_emoji = "🤝"
else:
result_emoji = "💔"
team_text += f"{change['username']}: {change['old_elo']}{change['new_elo']} ({change['elo_change']:+d}) {emoji}\n"
# Team header with appropriate icon
if is_draw:
team_header = f"🤝 Team {team}"
elif team == winner_team:
team_header = f"🏆 Team {team}"
else:
team_header = f"💔 Team {team}"
embed.add_field(
name=team_header,
value=team_text,
inline=False
)
embed.set_footer(text=f"Game Type: {game['game_type'].title()}")
await ctx.send(embed=embed)
# Post final game result to notification channel
try:
channel = bot.get_channel(GAME_NOTIFICATION_CHANNEL_ID)
if channel:
final_embed = discord.Embed(
title=f"🏁 Game Finished: {game_name}",
description=f"**Result:** {'Draw' if is_draw else f'{winner_team} Victory'}",
color=discord.Color.gold() if not is_draw else discord.Color.orange()
)
final_embed.add_field(name="Game Type", value=game['game_type'].title(), inline=True)
final_embed.add_field(name="Players", value=len(players), inline=True)
final_embed.add_field(name="Teams", value=len(teams), inline=True)
# Add team results
for team, team_changes in teams_results.items():
avg_change = sum(c['elo_change'] for c in team_changes) / len(team_changes)
emoji = "🏆" if team == winner_team and not is_draw else "🤝" if is_draw else "💔"
final_embed.add_field(
name=f"{emoji} {team}",
value=f"{len(team_changes)} players\nAvg ELO change: {avg_change:+.1f}",
inline=True
)
await channel.send(embed=final_embed)
except Exception as e:
logging.warning(f"Failed to post final game notification: {e}")
except Exception as e:
await ctx.send(f"❌ Error ending game: {str(e)}")
@bot.hybrid_command(name='hoi4stats', description='Show your HOI4 ELO statistics')
async def hoi4stats(ctx, user: Optional[discord.Member] = None):
"""Show HOI4 ELO statistics for a user"""
target_user = user or ctx.author
try:
player = await get_or_create_player(target_user.id, target_user.display_name)
# Get player rankings
async with db_pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
# Get standard rank
await cursor.execute(
"SELECT COUNT(*) + 1 as player_rank FROM players WHERE standard_elo > %s",
(player['standard_elo'],)
)
standard_rank_result = await cursor.fetchone()
standard_rank = standard_rank_result['player_rank']
# Get competitive rank
await cursor.execute(
"SELECT COUNT(*) + 1 as player_rank FROM players WHERE competitive_elo > %s",
(player['competitive_elo'],)
)
competitive_rank_result = await cursor.fetchone()
competitive_rank = competitive_rank_result['player_rank']
# Get total player count
await cursor.execute("SELECT COUNT(*) as total FROM players")
total_players_result = await cursor.fetchone()
total_players = total_players_result['total']
# Get game statistics with proper draw detection
await cursor.execute(
"""SELECT
COUNT(*) as total_games,
SUM(CASE WHEN result_type = 'win' THEN 1 ELSE 0 END) as games_won,
SUM(CASE WHEN result_type = 'draw' THEN 1 ELSE 0 END) as games_drawn,
SUM(CASE WHEN result_type = 'loss' THEN 1 ELSE 0 END) as games_lost
FROM game_results WHERE discord_id = %s""",
(target_user.id,)
)
game_stats = await cursor.fetchone()
total_games = game_stats['total_games'] or 0
games_won = game_stats['games_won'] or 0
games_drawn = game_stats['games_drawn'] or 0
games_lost = game_stats['games_lost'] or 0
win_rate = (games_won / total_games * 100) if total_games > 0 else 0
# Create rank indicators with medals
def get_rank_display(rank, total):
if rank == 1:
return f"🥇 #{rank} of {total}"
elif rank == 2:
return f"🥈 #{rank} of {total}"
elif rank == 3:
return f"🥉 #{rank} of {total}"
else:
return f"#{rank} of {total}"
embed = discord.Embed(
title=f"📊 HOI4 ELO Stats - {target_user.display_name}",
color=discord.Color.blue()
)
# ELO and Rankings
embed.add_field(
name="🎯 Standard ELO",
value=f"**{player['standard_elo']}** ELO\n{get_rank_display(standard_rank, total_players)}",
inline=True
)
embed.add_field(
name="🏆 Competitive ELO",
value=f"**{player['competitive_elo']}** ELO\n{get_rank_display(competitive_rank, total_players)}",
inline=True
)
embed.add_field(
name="📅 Player Since",
value=player['created_at'].strftime("%d/%m/%Y"),
inline=True
)
# Game Statistics
if total_games > 0:
embed.add_field(
name="🎮 Games Played",
value=f"**{total_games}** total games",
inline=True
)
if games_drawn > 0:
embed.add_field(
name="📊 W/D/L Record",
value=f"**{games_won}W** / **{games_drawn}D** / **{games_lost}L**",
inline=True
)
else:
embed.add_field(
name="📈 Win/Loss",
value=f"**{games_won}W** / **{games_lost}L**",
inline=True
)
embed.add_field(
name="📊 Win Rate",
value=f"**{win_rate:.1f}%**",
inline=True
)
else:
embed.add_field(
name="🎮 Games Played",
value="No games played yet",
inline=False
)
if target_user.avatar:
embed.set_thumbnail(url=target_user.avatar.url)
# Add percentile information
standard_percentile = ((total_players - standard_rank) / total_players * 100) if total_players > 0 else 0
competitive_percentile = ((total_players - competitive_rank) / total_players * 100) if total_players > 0 else 0
embed.set_footer(
text=f"Standard: Top {100-standard_percentile:.1f}% | Competitive: Top {100-competitive_percentile:.1f}%"
)
await ctx.send(embed=embed)
# Try to keep user's roles in sync for both categories when stats are viewed
try:
if ctx.guild and isinstance(target_user, discord.Member):
await update_member_elo_role(
target_user, player['standard_elo'], 'standard', reason='HOI4 stats viewed: role sync')
await update_member_elo_role(
target_user, player['competitive_elo'], 'competitive', reason='HOI4 stats viewed: role sync')
except Exception as e:
logging.warning(f"Role sync on stats failed: {e}")
except Exception as e:
await ctx.send(f"❌ Error getting stats: {str(e)}")
@bot.hybrid_command(name='hoi4games', description='Show active games as team showcases')
async def hoi4games(ctx):
"""Show all active games with teams presented side-by-side like a showcase."""
try:
async with db_pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
await cursor.execute(
"SELECT * FROM games WHERE status = 'setup' ORDER BY created_at DESC"
)
games = await cursor.fetchall()
if not games:
await ctx.send("📝 No active games found. Use `/hoi4create` to create a new game!")
return
embed = discord.Embed(
title="🎮 Active HOI4 Games",
description="Team lineups are shown side-by-side. Use /hoi4setup to add players.",
color=discord.Color.green()
)
# Limit total embed fields to avoid Discord limit (25). Each game uses up to 3 fields.
max_games = max(1, 25 // 3)
games = games[:max_games]
for game in games:
players = json.loads(game['players']) if game['players'] else []
# Build team structures
teams: Dict[str, List[Dict]] = {}
for p in players:
teams.setdefault(p['team_name'], []).append(p)
# Compute average ELO per team
team_avgs = {t: (sum(m['current_elo'] for m in mlist) / len(mlist)) if mlist else 0 for t, mlist in teams.items()}
# Sort teams by name to keep stable order
ordered_teams = sorted(teams.items(), key=lambda x: x[0].lower())
# Helper to format a team's field
def build_team_field(team_name: str, members: List[Dict]) -> Dict[str, str]:
t_emoji = get_team_emoji(ctx, team_name)
avg = team_avgs.get(team_name, 0)
name = f"{t_emoji} {team_name} (avg {avg:.0f})"
# Each player line: T-level emoji, name, elo
lines = []
for m in sorted(members, key=lambda mm: (-mm.get('t_level', 2), mm['username'].lower())):
te = get_t_emoji(ctx, int(m.get('t_level', 2)))
ctry = m.get('country')
flag = get_country_emoji(ctx, ctry) if ctry else ""
label = get_country_label(ctry) if ctry else None
parts = [te]
if flag:
parts.append(flag)
if label:
parts.append(label)
parts.append(m['username'])
lines.append(f"{' '.join(parts)} ({m['current_elo']})")
value = "\n".join(lines) if lines else "No players yet"
# Discord field value max ~1024 chars; trim if necessary
if len(value) > 1000:
value = value[:997] + "..."
return {"name": name, "value": value}
# If no teams yet, show empty placeholder for this game
if not ordered_teams:
embed.add_field(
name=f"{game['game_name']} ({game['game_type'].title()})",
value="No players yet",
inline=False
)
continue
# For two teams, show A vs B with a center VS field; otherwise, list each team inline
team_fields = [build_team_field(tn, members) for tn, members in ordered_teams]
# Add a header field per game
embed.add_field(
name=f"🎯 {game['game_name']} ({game['game_type'].title()})",
value=f"Players: {len(players)} | Teams: {len(ordered_teams)}",
inline=False
)
if len(team_fields) == 1:
f1 = team_fields[0]
embed.add_field(name=f1["name"], value=f1["value"], inline=False)
elif len(team_fields) >= 2:
f1, f2 = team_fields[0], team_fields[1]
embed.add_field(name=f1["name"], value=f1["value"], inline=True)
embed.add_field(name="⚔️ VS ⚔️", value="\u200b", inline=True)
embed.add_field(name=f2["name"], value=f2["value"], inline=True)
# If more teams exist, add them below
for extra in team_fields[2:]:
embed.add_field(name=extra["name"], value=extra["value"], inline=False)
await ctx.send(embed=embed)
except Exception as e:
await ctx.send(f"❌ Error getting games: {str(e)}")
@bot.hybrid_command(name='hoi4history', description='Show past games with optional filters')
async def hoi4history(ctx, limit: Optional[int] = 10, player: Optional[discord.Member] = None, game_name: Optional[str] = None, game_type: Optional[str] = None):
"""Show past games with optional filters"""
if limit > 50:
limit = 50 # Prevent too many results
try:
async with db_pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
# Build dynamic query based on filters
query = "SELECT * FROM games WHERE status = 'finished'"
params = []
if game_name:
query += " AND game_name LIKE %s"
params.append(f"%{game_name}%")
if game_type and game_type.lower() in ['standard', 'competitive']:
query += " AND game_type = %s"
params.append(game_type.lower())
if player:
query += " AND JSON_CONTAINS(players, JSON_OBJECT('discord_id', %s))"
params.append(player.id)
query += " ORDER BY finished_at DESC LIMIT %s"
params.append(limit)
await cursor.execute(query, params)
games = await cursor.fetchall()
if not games:
await ctx.send("📝 No finished games found with the specified filters!")
return
embed = discord.Embed(
title="📚 HOI4 Game History",
color=discord.Color.blue()
)
if player:
embed.description = f"Filtered by player: {player.display_name}"
if game_name:
embed.description = f"Filtered by game name: {game_name}"
if game_type:
embed.description = f"Filtered by type: {game_type.title()}"
for game in games:
players = json.loads(game['players']) if game['players'] else []
# Count teams and players
teams = {}
for p in players:
team = p['team_name']
if team not in teams:
teams[team] = 0
teams[team] += 1
# Format date
finished_date = game['finished_at'].strftime("%d/%m/%Y %H:%M") if game['finished_at'] else "Unknown"
# Winner indicator
winner = game['winner_team'] if game['winner_team'] else "Unknown"
game_info = f"**Winner:** {winner}\n"
game_info += f"**Type:** {game['game_type'].title()}\n"
game_info += f"**Players:** {len(players)} | **Teams:** {len(teams)}\n"
game_info += f"**Finished:** {finished_date}"
embed.add_field(
name=f"🏆 {game['game_name']}",
value=game_info,
inline=False
)
embed.set_footer(text=f"Showing {len(games)} of last {limit} games")
await ctx.send(embed=embed)
except Exception as e:
await ctx.send(f"❌ Error getting game history: {str(e)}")
@bot.hybrid_command(name='hoi4leaderboard', description='Show ELO leaderboard')
async def hoi4leaderboard(ctx, game_type: Optional[str] = "standard", limit: Optional[int] = 10):
"""Show ELO leaderboard for standard or competitive"""
if game_type.lower() not in ['standard', 'competitive']:
await ctx.send("❌ Game type must be either 'standard' or 'competitive'")
return
if limit > 25:
limit = 25 # Prevent too many results
try:
async with db_pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
# Get top players by ELO
elo_field = f"{game_type.lower()}_elo"
await cursor.execute(
f"SELECT discord_id, username, {elo_field}, created_at FROM players ORDER BY {elo_field} DESC LIMIT %s",
(limit,)
)
players = await cursor.fetchall()
# Also get some statistics
await cursor.execute(
f"SELECT COUNT(*) as total_players, AVG({elo_field}) as avg_elo, MAX({elo_field}) as max_elo, MIN({elo_field}) as min_elo FROM players"
)
stats = await cursor.fetchone()
if not players:
await ctx.send("📝 No players found in the database!")
return
# Create leaderboard embed
embed = discord.Embed(
title=f"🏆 HOI4 {game_type.title()} Leaderboard",
color=discord.Color.gold()
)
# Add statistics
embed.description = f"**Total Players:** {stats['total_players']} | **Average ELO:** {stats['avg_elo']:.0f}"
leaderboard_text = ""
medals = ["🥇", "🥈", "🥉"]
for i, player in enumerate(players, 1):
# Get medal or rank number
if i <= 3:
rank_indicator = medals[i-1]
else:
rank_indicator = f"**{i}.**"
elo_value = player[elo_field]
username = player['username']
# Get additional player stats
async with db_pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
# Count games played with proper result detection
await cursor.execute(
"""SELECT
COUNT(*) as games_played,
SUM(CASE WHEN result_type = 'win' THEN 1 ELSE 0 END) as games_won,
SUM(CASE WHEN result_type = 'draw' THEN 1 ELSE 0 END) as games_drawn
FROM game_results WHERE discord_id = %s""",
(player['discord_id'],)
)
player_stats = await cursor.fetchone()
games_played = player_stats['games_played'] or 0
games_won = player_stats['games_won'] or 0
games_drawn = player_stats['games_drawn'] or 0
win_rate = (games_won / games_played * 100) if games_played > 0 else 0
leaderboard_text += f"{rank_indicator} **{username}** - {elo_value} ELO\n"
if games_drawn > 0:
leaderboard_text += f" 📊 {games_played} games | {games_won}W-{games_drawn}D-{games_played - games_won - games_drawn}L | {win_rate:.1f}% win rate\n\n"
else:
leaderboard_text += f" 📊 {games_played} games | {win_rate:.1f}% win rate\n\n"
embed.add_field(
name="Rankings",
value=leaderboard_text,
inline=False
)
embed.set_footer(text=f"ELO Range: {stats['min_elo']:.0f} - {stats['max_elo']:.0f}")
await ctx.send(embed=embed)
except Exception as e:
await ctx.send(f"❌ Error getting leaderboard: {str(e)}")
@bot.event
async def on_command_error(ctx, error):
"""Handles command errors"""
if isinstance(error, commands.CheckFailure):
await ctx.send("❌ You don't have permission to use this command!")
elif isinstance(error, commands.CommandNotFound):
# Silently ignore command not found errors
pass
elif isinstance(error, commands.MissingRequiredArgument):
await ctx.send(f"❌ Missing arguments! Command: `{ctx.command}`")
elif isinstance(error, commands.BadArgument):
await ctx.send("❌ Invalid argument!")
else:
# Log detailed error information
print(f"❌ Unknown error in command '{ctx.command}': {type(error).__name__}: {error}")
import traceback
traceback.print_exc()
# Send detailed error to user if owner
if ctx.author.id == OWNER_ID:
await ctx.send(f"❌ **Error Details (Owner only):**\n```python\n{type(error).__name__}: {str(error)[:1800]}\n```")
else:
await ctx.send("❌ An unknown error occurred!")
async def main():
"""Main function to start the bot"""
# Load Discord token from environment variables
token = os.getenv('DISCORD_TOKEN')
if not token:
print("❌ DISCORD_TOKEN environment variable not found!")
print("Please set the DISCORD_TOKEN variable in Coolify or create a .env file")
return
if not DATABASE_URL:
print("❌ DATABASE_URL environment variable not found!")
print("Please set either DATABASE_URL or individual DB variables (DB_HOST, DB_NAME, DB_USER, DB_PASSWORD) in Coolify")
print(f"Current values - HOST: {DB_HOST}, NAME: {DB_NAME}, USER: {DB_USER}, PASSWORD: {'***' if DB_PASSWORD else 'None'}")
return
try:
print("🚀 Starting bot...")
await bot.start(token)
except discord.LoginFailure:
print("❌ Invalid Discord token!")
except Exception as e:
print(f"❌ Error starting bot: {e}")
finally:
if db_pool:
await db_pool.close()
if __name__ == "__main__":
asyncio.run(main())