import discord from discord.ext import commands import os import asyncio import logging from dotenv import load_dotenv import aiomysql import json from datetime import datetime from typing import Optional, List, Dict # Load environment variables load_dotenv() # Configure logging logging.basicConfig(level=logging.INFO) # Database configuration DATABASE_URL = os.getenv('DATABASE_URL') DB_HOST = os.getenv('DB_HOST') DB_PORT = os.getenv('DB_PORT', '5432') # Default to PostgreSQL port DB_NAME = os.getenv('DB_NAME') DB_USER = os.getenv('DB_USER') DB_PASSWORD = os.getenv('DB_PASSWORD') # Build DATABASE_URL from individual components if not provided if not DATABASE_URL and all([DB_HOST, DB_NAME, DB_USER, DB_PASSWORD]): DATABASE_URL = f"mysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}" print(f"📝 Built DATABASE_URL from individual environment variables") # Parse MySQL connection details from DATABASE_URL def parse_database_url(url): """Parse MySQL connection URL into components""" if not url: return None # Remove mysql:// prefix if url.startswith('mysql://'): url = url[8:] # Split user:pass@host:port/db if '@' in url: auth, host_db = url.split('@', 1) if ':' in auth: user, password = auth.split(':', 1) else: user, password = auth, '' else: return None if '/' in host_db: host_port, database = host_db.split('/', 1) else: return None if ':' in host_port: host, port = host_port.split(':', 1) try: port = int(port) except ValueError: port = 3306 else: host, port = host_port, 3306 return { 'host': host, 'port': port, 'user': user, 'password': password, 'db': database } # Global database connection pool db_pool = None # Bot configuration intents = discord.Intents.default() intents.message_content = True intents.guilds = True intents.members = True bot = commands.Bot(command_prefix='!', intents=intents) @bot.event async def on_ready(): """Event triggered when the bot is ready""" print(f'{bot.user} is online and ready!') print(f'Bot ID: {bot.user.id}') print(f'Discord.py Version: {discord.__version__}') print('------') # Initialize database await init_database() # Set bot status await bot.change_presence( activity=discord.Game(name="Hearts of Iron IV ELO"), status=discord.Status.online ) # Sync hybrid commands on startup try: synced = await bot.tree.sync() print(f'Synced {len(synced)} hybrid commands') except Exception as e: print(f'Failed to sync commands: {e}') @bot.event async def on_guild_join(guild): """Event triggered when the bot joins a server""" print(f'Bot joined server "{guild.name}" (ID: {guild.id})') @bot.event async def on_guild_remove(guild): """Event triggered when the bot leaves a server""" print(f'Bot left server "{guild.name}" (ID: {guild.id})') # Owner Configuration OWNER_ID = 253922739709018114 # ELO Configuration STARTING_ELO = 800 K_FACTOR = 32 T_LEVEL_MULTIPLIERS = { 1: 0.8, # T1 countries get less points 2: 1.0, # T2 countries get normal points 3: 1.2 # T3 countries get more points } # Preferred team emoji overrides (will be used if no guild/file match is found) TEAM_EMOTE_OVERRIDES: Dict[str, str] = { "axis": "<:fascism:1432391685127536762>", "allies": "<:democracy:1432391686612586528>", "comintern": "<:communism:1432391682267025479>", "cuf": "<:ChineseUnitedFront:1432422469985112156>", "default": "<:neutrality:1432391681059197102>", } # Emoji and formatting helpers def _all_accessible_emojis(ctx: commands.Context) -> List[discord.Emoji]: """Return emojis from current guild plus all guilds the bot is in.""" try: guild_emojis = list(ctx.guild.emojis) if ctx.guild else [] except Exception: guild_emojis = [] try: bot_emojis = list(bot.emojis) except Exception: bot_emojis = [] return guild_emojis + bot_emojis def find_custom_emoji(ctx: commands.Context, keyword_variants: List[str]) -> Optional[str]: """Try to find a custom emoji by a list of keyword variants (case-insensitive). Returns str(emoji) or None.""" emojis = _all_accessible_emojis(ctx) for kw in keyword_variants: kw = kw.lower() for e in emojis: try: if kw in (e.name or '').lower(): return str(e) except Exception: continue # Fallback to markdown-defined emojis if available try: if EMOTE_MAP: for kw in keyword_variants: key = kw.lower() # Exact name match if key in EMOTE_MAP: return EMOTE_MAP[key] # Substring match for name_lower, mention in EMOTE_MAP.items(): if key in name_lower: return mention except NameError: # EMOTE_MAP not defined yet pass return None def get_t_emoji(ctx: commands.Context, t_level: int) -> str: """Return a suitable emoji for a T level, preferring custom emojis if present.""" mapping = { 1: ["hoi4_t1", "t1", "tier1"], 2: ["hoi4_t2", "t2", "tier2"], 3: ["hoi4_t3", "t3", "tier3"], } custom = find_custom_emoji(ctx, mapping.get(t_level, [])) if custom: return custom # Fallback unicode return {1: "🔹", 2: "🔸", 3: "đŸ”ē"}.get(t_level, "🔹") def get_team_emoji(ctx: commands.Context, team_name: str) -> str: """Return a team emoji using user's preferred overrides, with guild/file matches first.""" name = (team_name or "").lower() def pick(keywords: List[str], override_key: str) -> str: # Try specific keywords (ideology-first), then generic ones custom = find_custom_emoji(ctx, keywords) if custom: return custom # As an extra attempt, try a couple generic HOI4 icons custom = find_custom_emoji(ctx, ["eagle_hoi", "peace_hoi", "navy_hoi", "secretweapon_hoi"]) if custom: return custom # Fall back to explicit override mapping return TEAM_EMOTE_OVERRIDES.get(override_key, TEAM_EMOTE_OVERRIDES.get("default", "đŸŽ–ī¸")) if any(k in name for k in ["axis", "achse"]): return pick(["fascism", "axis", "hoi4_axis"], "axis") if any(k in name for k in ["allies", "alliierten", "ally"]): return pick(["democracy", "allies", "hoi4_allies"], "allies") if any(k in name for k in ["comintern", "ussr", "soviet"]): return pick(["communism", "comintern", "hoi4_comintern"], "comintern") # Chinese United Front (optional special case) if ("chinese united front" in name) or ("chinese" in name and "front" in name) or ("cuf" in name): return pick(["ChineseUnitedFront", "chineseunitedfront", "cuf"], "cuf") # Default/other custom = find_custom_emoji(ctx, ["neutrality", "hoi4", "hearts_of_iron", "iron"]) if custom: return custom return TEAM_EMOTE_OVERRIDES.get("default", "đŸŽ–ī¸") def _flag_from_iso2(code: str) -> Optional[str]: """Return unicode flag from 2-letter ISO code (e.g., 'DE' -> 🇩đŸ‡Ē).""" if not code or len(code) != 2: return None code = code.upper() base = 0x1F1E6 try: return chr(base + ord(code[0]) - ord('A')) + chr(base + ord(code[1]) - ord('A')) except Exception: return None # Emotes markdown loader and map def load_emote_markdown(path: Optional[str] = None) -> Dict[str, str]: """Parse emotes.markdown and return a mapping of lowercased emoji names to their mention strings. Expected line format: <:Name:123456789012345678> Lines that don't match are ignored.""" if path is None: base_dir = os.path.dirname(os.path.abspath(__file__)) path = os.path.join(base_dir, 'emotes.markdown') mapping: Dict[str, str] = {} try: with open(path, 'r', encoding='utf-8') as f: for raw in f: line = raw.strip() if not line or not line.startswith('<:') or ':' not in line[2:]: continue # Format is <:NAME:ID> try: inner = line[2:-1] if line.endswith('>') else line[2:] name, emoji_id = inner.split(':', 1) name = name.strip() mention = f"<:{name}:{emoji_id.strip('>')}>" mapping[name.lower()] = mention except Exception: continue except FileNotFoundError: # Silent if not present pass except Exception as e: print(f"âš ī¸ Failed to load emotes.markdown: {e}") return mapping # Load emotes mapping at import EMOTE_MAP: Dict[str, str] = load_emote_markdown() if EMOTE_MAP: print(f"😀 Loaded {len(EMOTE_MAP)} custom emojis from emotes.markdown") def load_country_tags(path: Optional[str] = None) -> Dict[str, str]: """Load HOI4 country tags mapping from tags.txt. Supported formats per line: TAG=Country Name | TAG:Country Name | TAG,Country Name | TAG Country Name Lines starting with # are ignored. Returns dict like { 'GER': 'Germany', ... }""" if path is None: base_dir = os.path.dirname(os.path.abspath(__file__)) path = os.path.join(base_dir, 'tags.txt') mapping: Dict[str, str] = {} try: with open(path, 'r', encoding='utf-8') as f: for raw in f: line = raw.strip() if not line or line.startswith('#'): continue tag = None name = None for sep in ['=', ';', ',', ':']: if sep in line: left, right = line.split(sep, 1) tag = left.strip().upper() name = right.strip() break if tag is None: parts = line.split(None, 1) if len(parts) == 2: tag = parts[0].strip().upper() name = parts[1].strip() else: tag = line.strip().upper() name = line.strip() if tag and name: mapping[tag] = name except FileNotFoundError: print("â„šī¸ tags.txt not found; proceeding without country tag labels") except Exception as e: print(f"âš ī¸ Failed to load tags.txt: {e}") return mapping # Loaded at import COUNTRY_TAGS: Dict[str, str] = load_country_tags() if COUNTRY_TAGS: print(f"đŸ—ēī¸ Loaded {len(COUNTRY_TAGS)} HOI4 country tags") def get_country_label(country_tag: Optional[str]) -> Optional[str]: """Return a display label like "[GER] Germany" if known, or "[GER]" if unknown.""" if not country_tag: return None tag = country_tag.strip().upper() name = COUNTRY_TAGS.get(tag) return f"[{tag}] {name}" if name else f"[{tag}]" def get_country_emoji(ctx: commands.Context, country: Optional[str]) -> str: """Prefer custom emoji matching the HOI4 tag (e.g., ger, hoi4_ger). If parameter is ISO2, show unicode flag. Else empty.""" if not country: return "" c = country.strip() # Try custom emoji lookups using tag variants variants = [c, c.lower(), f"hoi4_{c.lower()}", f"country_{c.lower()}"] custom = find_custom_emoji(ctx, variants) if custom: return custom # If user passed ISO2, render unicode flag if len(c) == 2: flag = _flag_from_iso2(c) if flag: return flag # Otherwise, no emoji fallback to avoid noisy globes return "" # Database Functions # Database Functions async def init_database(): """Initialize database connection and create tables""" global db_pool try: # Parse DATABASE_URL for MySQL connection db_config = parse_database_url(DATABASE_URL) if not db_config: raise ValueError("Invalid DATABASE_URL format") print(f"🔌 Connecting to MySQL: {db_config['host']}:{db_config['port']}/{db_config['db']}") # Create MySQL connection pool db_pool = await aiomysql.create_pool( host=db_config['host'], port=db_config['port'], user=db_config['user'], password=db_config['password'], db=db_config['db'], charset='utf8mb4', autocommit=True, maxsize=10 ) async with db_pool.acquire() as conn: async with conn.cursor() as cursor: # Create players table (MySQL syntax) await cursor.execute(''' CREATE TABLE IF NOT EXISTS players ( id INT AUTO_INCREMENT PRIMARY KEY, discord_id BIGINT UNIQUE NOT NULL, username VARCHAR(255) NOT NULL, standard_elo INT DEFAULT 800, competitive_elo INT DEFAULT 800, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ) ''') # Create games table (MySQL syntax) await cursor.execute(''' CREATE TABLE IF NOT EXISTS games ( id INT AUTO_INCREMENT PRIMARY KEY, game_name VARCHAR(255) NOT NULL, game_type VARCHAR(50) NOT NULL, status VARCHAR(50) DEFAULT 'setup', players JSON NOT NULL, winner_team VARCHAR(255), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, finished_at TIMESTAMP NULL ) ''') # Create game_results table (MySQL syntax) await cursor.execute(''' CREATE TABLE IF NOT EXISTS game_results ( id INT AUTO_INCREMENT PRIMARY KEY, game_id INT, discord_id BIGINT NOT NULL, team_name VARCHAR(255) NOT NULL, t_level INT NOT NULL, old_elo INT NOT NULL, new_elo INT NOT NULL, elo_change INT NOT NULL, won BOOLEAN NOT NULL, result_type VARCHAR(10) DEFAULT 'loss', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (game_id) REFERENCES games(id) ) ''') # Add result_type column if it doesn't exist (for existing databases) try: await cursor.execute(''' ALTER TABLE game_results ADD COLUMN result_type VARCHAR(10) DEFAULT 'loss' ''') except: # Column already exists, ignore error pass print("✅ Database initialized successfully") except Exception as e: print(f"❌ Database initialization failed: {e}") import traceback traceback.print_exc() async def get_or_create_player(discord_id: int, username: str) -> Dict: """Get or create a player in the database""" async with db_pool.acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cursor: # Try to get existing player await cursor.execute( "SELECT * FROM players WHERE discord_id = %s", (discord_id,) ) player = await cursor.fetchone() if not player: # Create new player await cursor.execute( "INSERT INTO players (discord_id, username) VALUES (%s, %s)", (discord_id, username) ) await cursor.execute( "SELECT * FROM players WHERE discord_id = %s", (discord_id,) ) player = await cursor.fetchone() else: # Update username if changed await cursor.execute( "UPDATE players SET username = %s, updated_at = CURRENT_TIMESTAMP WHERE discord_id = %s", (username, discord_id) ) return dict(player) def calculate_elo_change(player_elo: int, opponent_avg_elo: int, result: str, t_level: int) -> int: """Calculate ELO change using standard ELO formula with T-level multiplier result can be 'win', 'loss', or 'draw' In draws: - If you're expected to win (higher ELO), you lose points for drawing - If you're expected to lose (lower ELO), you gain points for drawing - The bigger the ELO difference, the bigger the swing """ expected_score = 1 / (1 + 10 ** ((opponent_avg_elo - player_elo) / 400)) if result == 'win': actual_score = 1.0 elif result == 'draw': actual_score = 0.5 # Draw = 50% score else: # loss actual_score = 0.0 # Calculate the base ELO change base_change = K_FACTOR * (actual_score - expected_score) # Apply T-level multiplier t_multiplier = T_LEVEL_MULTIPLIERS.get(t_level, 1.0) final_change = base_change * t_multiplier # For debugging/logging purposes, let's see what happens in draws if result == 'draw': elo_diff = player_elo - opponent_avg_elo expected_percentage = expected_score * 100 print(f"📊 Draw calculation: Player ELO {player_elo} vs Opponent Avg {opponent_avg_elo}") print(f" ELO Difference: {elo_diff:+d} | Expected win chance: {expected_percentage:.1f}%") print(f" ELO change: {final_change:+.1f} (T{t_level} multiplier: {t_multiplier})") return round(final_change) # Owner only decorator def is_owner(): def predicate(ctx): return ctx.author.id == OWNER_ID return commands.check(predicate) # Owner Commands @bot.hybrid_command(name='reload', description='Reloads the bot and syncs slash commands (Owner only)') @is_owner() async def reload_bot(ctx): """Reloads the bot and syncs slash commands (Owner only)""" try: print(f"🔄 Reload command started by {ctx.author} (ID: {ctx.author.id})") # Send initial message embed = discord.Embed( title="🔄 Bot Reload", description="Reloading bot and syncing commands...", color=discord.Color.yellow() ) message = await ctx.send(embed=embed) print("📤 Initial reload message sent") # Sync slash commands print("🔄 Starting command sync...") synced = await bot.tree.sync() print(f"✅ Synced {len(synced)} commands successfully") # Update embed with success embed = discord.Embed( title="✅ Bot Reloaded Successfully", description=f"Bot has been reloaded!\nSynced {len(synced)} slash commands.", color=discord.Color.green() ) embed.add_field(name="Servers", value=len(bot.guilds), inline=True) embed.add_field(name="Latency", value=f"{round(bot.latency * 1000)}ms", inline=True) embed.set_footer(text=f"Reloaded by {ctx.author}", icon_url=ctx.author.avatar.url if ctx.author.avatar else None) await message.edit(embed=embed) print("✅ Reload completed successfully") except Exception as e: print(f"❌ Reload failed with error: {type(e).__name__}: {e}") import traceback traceback.print_exc() embed = discord.Embed( title="❌ Reload Failed", description=f"**Error Type:** {type(e).__name__}\n**Error:** {str(e)[:1500]}", color=discord.Color.red() ) await ctx.send(embed=embed) # HOI4 ELO Commands @bot.hybrid_command(name='hoi4create', description='Create a new HOI4 game') async def hoi4create(ctx, game_type: str, game_name: str): """Create a new HOI4 game""" if game_type.lower() not in ['standard', 'competitive']: await ctx.send("❌ Game type must be either 'standard' or 'competitive'") return try: async with db_pool.acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cursor: # Check if game name already exists and is active await cursor.execute( "SELECT * FROM games WHERE game_name = %s AND status = 'setup'", (game_name,) ) existing_game = await cursor.fetchone() if existing_game: await ctx.send(f"❌ A game with name '{game_name}' is already in setup phase!") return # Create new game await cursor.execute( "INSERT INTO games (game_name, game_type, status, players) VALUES (%s, %s, 'setup', %s)", (game_name, game_type.lower(), '[]') ) embed = discord.Embed( title="🎮 Game Created", description=f"HOI4 {game_type.title()} game '{game_name}' has been created!", color=discord.Color.green() ) embed.add_field(name="Game Name", value=game_name, inline=True) embed.add_field(name="Type", value=game_type.title(), inline=True) embed.add_field(name="Status", value="Setup Phase", inline=True) embed.set_footer(text="Use /hoi4setup to add players to this game") await ctx.send(embed=embed) except Exception as e: await ctx.send(f"❌ Error creating game: {str(e)}") @bot.hybrid_command(name='hoi4setup', description='Add a player to an existing game') async def hoi4setup(ctx, game_name: str, user: discord.Member, team_name: str, t_level: int, country: Optional[str] = None): """Add a player to an existing game""" if t_level not in [1, 2, 3]: await ctx.send("❌ T-Level must be 1, 2, or 3") return try: async with db_pool.acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cursor: # Get the game await cursor.execute( "SELECT * FROM games WHERE game_name = %s AND status = 'setup'", (game_name,) ) game = await cursor.fetchone() if not game: await ctx.send(f"❌ No game found with name '{game_name}' in setup phase!") return # Get or create player player = await get_or_create_player(user.id, user.display_name) # Parse existing players players = json.loads(game['players']) if game['players'] else [] # Check if player already in game for p in players: if p['discord_id'] == user.id: await ctx.send(f"❌ {user.display_name} is already in this game!") return # Add player to game player_data = { 'discord_id': user.id, 'username': user.display_name, 'team_name': team_name, 't_level': t_level, 'current_elo': player[f"{game['game_type']}_elo"], 'country': country.strip() if country else None } players.append(player_data) # Update game await cursor.execute( "UPDATE games SET players = %s WHERE id = %s", (json.dumps(players), game['id']) ) embed = discord.Embed( title="✅ Player Added", description=f"{user.display_name} has been added to '{game_name}'!", color=discord.Color.green() ) embed.add_field(name="Player", value=user.display_name, inline=True) embed.add_field(name="Team", value=team_name, inline=True) embed.add_field(name="T-Level", value=f"T{t_level}", inline=True) embed.add_field(name="Current ELO", value=player[f"{game['game_type']}_elo"], inline=True) if country: flag = get_country_emoji(ctx, country) label = get_country_label(country) value = f"{flag} {label}".strip() embed.add_field(name="Country", value=value, inline=True) embed.add_field(name="Players in Game", value=len(players), inline=True) await ctx.send(embed=embed) except Exception as e: await ctx.send(f"❌ Error adding player: {str(e)}") @bot.hybrid_command(name='hoi4end', description='End a game and calculate ELO changes') async def hoi4end(ctx, game_name: str, winner_team: str): """End a game and calculate ELO changes. Use 'draw' for ties.""" try: async with db_pool.acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cursor: # Get the game await cursor.execute( "SELECT * FROM games WHERE game_name = %s AND status = 'setup'", (game_name,) ) game = await cursor.fetchone() if not game: await ctx.send(f"❌ No active game found with name '{game_name}'!") return players = json.loads(game['players']) if game['players'] else [] if len(players) < 2: await ctx.send("❌ Game needs at least 2 players to end!") return # Check if winner team exists or if it's a draw teams = {p['team_name'] for p in players} is_draw = winner_team.lower() == 'draw' if not is_draw and winner_team not in teams: available_teams = ', '.join(teams) await ctx.send(f"❌ Team '{winner_team}' not found in game! Available teams: {available_teams}, draw") return # Calculate team averages team_elos = {} team_players = {} for player in players: team = player['team_name'] if team not in team_elos: team_elos[team] = [] team_players[team] = [] team_elos[team].append(player['current_elo']) team_players[team].append(player) # Calculate average ELOs for each team team_averages = {team: sum(elos) / len(elos) for team, elos in team_elos.items()} elo_changes = [] # Calculate ELO changes for each player for player in players: team = player['team_name'] # Determine result for this player if is_draw: result = 'draw' elif team == winner_team: result = 'win' else: result = 'loss' # Calculate opponent average (average of all other teams) opponent_elos = [] for other_team, elos in team_elos.items(): if other_team != team: opponent_elos.extend(elos) opponent_avg = sum(opponent_elos) / len(opponent_elos) if opponent_elos else player['current_elo'] elo_change = calculate_elo_change( player['current_elo'], opponent_avg, result, player['t_level'] ) new_elo = max(0, player['current_elo'] + elo_change) # Prevent negative ELO elo_changes.append({ 'discord_id': player['discord_id'], 'username': player['username'], 'team_name': team, 't_level': player['t_level'], 'old_elo': player['current_elo'], 'new_elo': new_elo, 'elo_change': elo_change, 'result': result }) # Update player ELOs and save game results for change in elo_changes: # Update player ELO elo_field = f"{game['game_type']}_elo" await cursor.execute( f"UPDATE players SET {elo_field} = %s, updated_at = CURRENT_TIMESTAMP WHERE discord_id = %s", (change['new_elo'], change['discord_id']) ) # Save game result won = change['result'] == 'win' await cursor.execute( """INSERT INTO game_results (game_id, discord_id, team_name, t_level, old_elo, new_elo, elo_change, won, result_type) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)""", (game['id'], change['discord_id'], change['team_name'], change['t_level'], change['old_elo'], change['new_elo'], change['elo_change'], won, change['result']) ) # Mark game as finished final_result = "Draw" if is_draw else winner_team await cursor.execute( "UPDATE games SET status = 'finished', winner_team = %s, finished_at = CURRENT_TIMESTAMP WHERE id = %s", (final_result, game['id']) ) # Create result embed if is_draw: embed = discord.Embed( title="🤝 Game Finished!", description=f"Game '{game_name}' has ended!\n**Result: Draw**", color=discord.Color.orange() ) else: embed = discord.Embed( title="🏆 Game Finished!", description=f"Game '{game_name}' has ended!\n**Winner: {winner_team}**", color=discord.Color.gold() ) # Group results by team teams_results = {} for change in elo_changes: team = change['team_name'] if team not in teams_results: teams_results[team] = [] teams_results[team].append(change) for team, team_changes in teams_results.items(): team_text = "" for change in team_changes: emoji = "📈" if change['elo_change'] > 0 else "📉" if change['elo_change'] < 0 else "âžĄī¸" result_emoji = "" if change['result'] == 'win': result_emoji = "🏆" elif change['result'] == 'draw': result_emoji = "🤝" else: result_emoji = "💔" team_text += f"{change['username']}: {change['old_elo']} → {change['new_elo']} ({change['elo_change']:+d}) {emoji}\n" # Team header with appropriate icon if is_draw: team_header = f"🤝 Team {team}" elif team == winner_team: team_header = f"🏆 Team {team}" else: team_header = f"💔 Team {team}" embed.add_field( name=team_header, value=team_text, inline=False ) embed.set_footer(text=f"Game Type: {game['game_type'].title()}") await ctx.send(embed=embed) except Exception as e: await ctx.send(f"❌ Error ending game: {str(e)}") @bot.hybrid_command(name='hoi4stats', description='Show your HOI4 ELO statistics') async def hoi4stats(ctx, user: Optional[discord.Member] = None): """Show HOI4 ELO statistics for a user""" target_user = user or ctx.author try: player = await get_or_create_player(target_user.id, target_user.display_name) # Get player rankings async with db_pool.acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cursor: # Get standard rank await cursor.execute( "SELECT COUNT(*) + 1 as player_rank FROM players WHERE standard_elo > %s", (player['standard_elo'],) ) standard_rank_result = await cursor.fetchone() standard_rank = standard_rank_result['player_rank'] # Get competitive rank await cursor.execute( "SELECT COUNT(*) + 1 as player_rank FROM players WHERE competitive_elo > %s", (player['competitive_elo'],) ) competitive_rank_result = await cursor.fetchone() competitive_rank = competitive_rank_result['player_rank'] # Get total player count await cursor.execute("SELECT COUNT(*) as total FROM players") total_players_result = await cursor.fetchone() total_players = total_players_result['total'] # Get game statistics with proper draw detection await cursor.execute( """SELECT COUNT(*) as total_games, SUM(CASE WHEN result_type = 'win' THEN 1 ELSE 0 END) as games_won, SUM(CASE WHEN result_type = 'draw' THEN 1 ELSE 0 END) as games_drawn, SUM(CASE WHEN result_type = 'loss' THEN 1 ELSE 0 END) as games_lost FROM game_results WHERE discord_id = %s""", (target_user.id,) ) game_stats = await cursor.fetchone() total_games = game_stats['total_games'] or 0 games_won = game_stats['games_won'] or 0 games_drawn = game_stats['games_drawn'] or 0 games_lost = game_stats['games_lost'] or 0 win_rate = (games_won / total_games * 100) if total_games > 0 else 0 # Create rank indicators with medals def get_rank_display(rank, total): if rank == 1: return f"đŸĨ‡ #{rank} of {total}" elif rank == 2: return f"đŸĨˆ #{rank} of {total}" elif rank == 3: return f"đŸĨ‰ #{rank} of {total}" else: return f"#{rank} of {total}" embed = discord.Embed( title=f"📊 HOI4 ELO Stats - {target_user.display_name}", color=discord.Color.blue() ) # ELO and Rankings embed.add_field( name="đŸŽ¯ Standard ELO", value=f"**{player['standard_elo']}** ELO\n{get_rank_display(standard_rank, total_players)}", inline=True ) embed.add_field( name="🏆 Competitive ELO", value=f"**{player['competitive_elo']}** ELO\n{get_rank_display(competitive_rank, total_players)}", inline=True ) embed.add_field( name="📅 Player Since", value=player['created_at'].strftime("%m/%d/%Y"), inline=True ) # Game Statistics if total_games > 0: embed.add_field( name="🎮 Games Played", value=f"**{total_games}** total games", inline=True ) if games_drawn > 0: embed.add_field( name="📊 W/D/L Record", value=f"**{games_won}W** / **{games_drawn}D** / **{games_lost}L**", inline=True ) else: embed.add_field( name="📈 Win/Loss", value=f"**{games_won}W** / **{games_lost}L**", inline=True ) embed.add_field( name="📊 Win Rate", value=f"**{win_rate:.1f}%**", inline=True ) else: embed.add_field( name="🎮 Games Played", value="No games played yet", inline=False ) if target_user.avatar: embed.set_thumbnail(url=target_user.avatar.url) # Add percentile information standard_percentile = ((total_players - standard_rank) / total_players * 100) if total_players > 0 else 0 competitive_percentile = ((total_players - competitive_rank) / total_players * 100) if total_players > 0 else 0 embed.set_footer( text=f"Standard: Top {100-standard_percentile:.1f}% | Competitive: Top {100-competitive_percentile:.1f}%" ) await ctx.send(embed=embed) except Exception as e: await ctx.send(f"❌ Error getting stats: {str(e)}") @bot.hybrid_command(name='hoi4games', description='Show active games as team showcases') async def hoi4games(ctx): """Show all active games with teams presented side-by-side like a showcase.""" try: async with db_pool.acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cursor: await cursor.execute( "SELECT * FROM games WHERE status = 'setup' ORDER BY created_at DESC" ) games = await cursor.fetchall() if not games: await ctx.send("📝 No active games found. Use `/hoi4create` to create a new game!") return embed = discord.Embed( title="🎮 Active HOI4 Games", description="Team lineups are shown side-by-side. Use /hoi4setup to add players.", color=discord.Color.green() ) # Limit total embed fields to avoid Discord limit (25). Each game uses up to 3 fields. max_games = max(1, 25 // 3) games = games[:max_games] for game in games: players = json.loads(game['players']) if game['players'] else [] # Build team structures teams: Dict[str, List[Dict]] = {} for p in players: teams.setdefault(p['team_name'], []).append(p) # Compute average ELO per team team_avgs = {t: (sum(m['current_elo'] for m in mlist) / len(mlist)) if mlist else 0 for t, mlist in teams.items()} # Sort teams by name to keep stable order ordered_teams = sorted(teams.items(), key=lambda x: x[0].lower()) # Helper to format a team's field def build_team_field(team_name: str, members: List[Dict]) -> Dict[str, str]: t_emoji = get_team_emoji(ctx, team_name) avg = team_avgs.get(team_name, 0) name = f"{t_emoji} {team_name} (avg {avg:.0f})" # Each player line: T-level emoji, name, elo lines = [] for m in sorted(members, key=lambda mm: (-mm.get('t_level', 2), mm['username'].lower())): te = get_t_emoji(ctx, int(m.get('t_level', 2))) ctry = m.get('country') flag = get_country_emoji(ctx, ctry) if ctry else "" label = get_country_label(ctry) if ctry else None parts = [te] if flag: parts.append(flag) if label: parts.append(label) parts.append(m['username']) lines.append(f"{' '.join(parts)} ({m['current_elo']})") value = "\n".join(lines) if lines else "No players yet" # Discord field value max ~1024 chars; trim if necessary if len(value) > 1000: value = value[:997] + "..." return {"name": name, "value": value} # If no teams yet, show empty placeholder for this game if not ordered_teams: embed.add_field( name=f"{game['game_name']} ({game['game_type'].title()})", value="No players yet", inline=False ) continue # For two teams, show A vs B with a center VS field; otherwise, list each team inline team_fields = [build_team_field(tn, members) for tn, members in ordered_teams] # Add a header field per game embed.add_field( name=f"đŸŽ¯ {game['game_name']} ({game['game_type'].title()})", value=f"Players: {len(players)} | Teams: {len(ordered_teams)}", inline=False ) if len(team_fields) == 1: f1 = team_fields[0] embed.add_field(name=f1["name"], value=f1["value"], inline=False) elif len(team_fields) >= 2: f1, f2 = team_fields[0], team_fields[1] embed.add_field(name=f1["name"], value=f1["value"], inline=True) embed.add_field(name="âš”ī¸ VS âš”ī¸", value="\u200b", inline=True) embed.add_field(name=f2["name"], value=f2["value"], inline=True) # If more teams exist, add them below for extra in team_fields[2:]: embed.add_field(name=extra["name"], value=extra["value"], inline=False) await ctx.send(embed=embed) except Exception as e: await ctx.send(f"❌ Error getting games: {str(e)}") @bot.hybrid_command(name='hoi4history', description='Show past games with optional filters') async def hoi4history(ctx, limit: Optional[int] = 10, player: Optional[discord.Member] = None, game_name: Optional[str] = None, game_type: Optional[str] = None): """Show past games with optional filters""" if limit > 50: limit = 50 # Prevent too many results try: async with db_pool.acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cursor: # Build dynamic query based on filters query = "SELECT * FROM games WHERE status = 'finished'" params = [] if game_name: query += " AND game_name LIKE %s" params.append(f"%{game_name}%") if game_type and game_type.lower() in ['standard', 'competitive']: query += " AND game_type = %s" params.append(game_type.lower()) if player: query += " AND JSON_CONTAINS(players, JSON_OBJECT('discord_id', %s))" params.append(player.id) query += " ORDER BY finished_at DESC LIMIT %s" params.append(limit) await cursor.execute(query, params) games = await cursor.fetchall() if not games: await ctx.send("📝 No finished games found with the specified filters!") return embed = discord.Embed( title="📚 HOI4 Game History", color=discord.Color.blue() ) if player: embed.description = f"Filtered by player: {player.display_name}" if game_name: embed.description = f"Filtered by game name: {game_name}" if game_type: embed.description = f"Filtered by type: {game_type.title()}" for game in games: players = json.loads(game['players']) if game['players'] else [] # Count teams and players teams = {} for p in players: team = p['team_name'] if team not in teams: teams[team] = 0 teams[team] += 1 # Format date finished_date = game['finished_at'].strftime("%m/%d/%Y %H:%M") if game['finished_at'] else "Unknown" # Winner indicator winner = game['winner_team'] if game['winner_team'] else "Unknown" game_info = f"**Winner:** {winner}\n" game_info += f"**Type:** {game['game_type'].title()}\n" game_info += f"**Players:** {len(players)} | **Teams:** {len(teams)}\n" game_info += f"**Finished:** {finished_date}" embed.add_field( name=f"🏆 {game['game_name']}", value=game_info, inline=False ) embed.set_footer(text=f"Showing {len(games)} of last {limit} games") await ctx.send(embed=embed) except Exception as e: await ctx.send(f"❌ Error getting game history: {str(e)}") @bot.hybrid_command(name='hoi4leaderboard', description='Show ELO leaderboard') async def hoi4leaderboard(ctx, game_type: Optional[str] = "standard", limit: Optional[int] = 10): """Show ELO leaderboard for standard or competitive""" if game_type.lower() not in ['standard', 'competitive']: await ctx.send("❌ Game type must be either 'standard' or 'competitive'") return if limit > 25: limit = 25 # Prevent too many results try: async with db_pool.acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cursor: # Get top players by ELO elo_field = f"{game_type.lower()}_elo" await cursor.execute( f"SELECT discord_id, username, {elo_field}, created_at FROM players ORDER BY {elo_field} DESC LIMIT %s", (limit,) ) players = await cursor.fetchall() # Also get some statistics await cursor.execute( f"SELECT COUNT(*) as total_players, AVG({elo_field}) as avg_elo, MAX({elo_field}) as max_elo, MIN({elo_field}) as min_elo FROM players" ) stats = await cursor.fetchone() if not players: await ctx.send("📝 No players found in the database!") return # Create leaderboard embed embed = discord.Embed( title=f"🏆 HOI4 {game_type.title()} Leaderboard", color=discord.Color.gold() ) # Add statistics embed.description = f"**Total Players:** {stats['total_players']} | **Average ELO:** {stats['avg_elo']:.0f}" leaderboard_text = "" medals = ["đŸĨ‡", "đŸĨˆ", "đŸĨ‰"] for i, player in enumerate(players, 1): # Get medal or rank number if i <= 3: rank_indicator = medals[i-1] else: rank_indicator = f"**{i}.**" elo_value = player[elo_field] username = player['username'] # Get additional player stats async with db_pool.acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cursor: # Count games played with proper result detection await cursor.execute( """SELECT COUNT(*) as games_played, SUM(CASE WHEN result_type = 'win' THEN 1 ELSE 0 END) as games_won, SUM(CASE WHEN result_type = 'draw' THEN 1 ELSE 0 END) as games_drawn FROM game_results WHERE discord_id = %s""", (player['discord_id'],) ) player_stats = await cursor.fetchone() games_played = player_stats['games_played'] or 0 games_won = player_stats['games_won'] or 0 games_drawn = player_stats['games_drawn'] or 0 win_rate = (games_won / games_played * 100) if games_played > 0 else 0 leaderboard_text += f"{rank_indicator} **{username}** - {elo_value} ELO\n" if games_drawn > 0: leaderboard_text += f" 📊 {games_played} games | {games_won}W-{games_drawn}D-{games_played - games_won - games_drawn}L | {win_rate:.1f}% win rate\n\n" else: leaderboard_text += f" 📊 {games_played} games | {win_rate:.1f}% win rate\n\n" embed.add_field( name="Rankings", value=leaderboard_text, inline=False ) embed.set_footer(text=f"ELO Range: {stats['min_elo']:.0f} - {stats['max_elo']:.0f}") await ctx.send(embed=embed) except Exception as e: await ctx.send(f"❌ Error getting leaderboard: {str(e)}") @bot.event async def on_command_error(ctx, error): """Handles command errors""" if isinstance(error, commands.CheckFailure): await ctx.send("❌ You don't have permission to use this command!") elif isinstance(error, commands.CommandNotFound): # Silently ignore command not found errors pass elif isinstance(error, commands.MissingRequiredArgument): await ctx.send(f"❌ Missing arguments! Command: `{ctx.command}`") elif isinstance(error, commands.BadArgument): await ctx.send("❌ Invalid argument!") else: # Log detailed error information print(f"❌ Unknown error in command '{ctx.command}': {type(error).__name__}: {error}") import traceback traceback.print_exc() # Send detailed error to user if owner if ctx.author.id == OWNER_ID: await ctx.send(f"❌ **Error Details (Owner only):**\n```python\n{type(error).__name__}: {str(error)[:1800]}\n```") else: await ctx.send("❌ An unknown error occurred!") async def main(): """Main function to start the bot""" # Load Discord token from environment variables token = os.getenv('DISCORD_TOKEN') if not token: print("❌ DISCORD_TOKEN environment variable not found!") print("Please set the DISCORD_TOKEN variable in Coolify or create a .env file") return if not DATABASE_URL: print("❌ DATABASE_URL environment variable not found!") print("Please set either DATABASE_URL or individual DB variables (DB_HOST, DB_NAME, DB_USER, DB_PASSWORD) in Coolify") print(f"Current values - HOST: {DB_HOST}, NAME: {DB_NAME}, USER: {DB_USER}, PASSWORD: {'***' if DB_PASSWORD else 'None'}") return try: print("🚀 Starting bot...") await bot.start(token) except discord.LoginFailure: print("❌ Invalid Discord token!") except Exception as e: print(f"❌ Error starting bot: {e}") finally: if db_pool: await db_pool.close() if __name__ == "__main__": asyncio.run(main())