Files
hoi4botdc/app.py
SimolZimol 82fd5449ae modified: app.py
new file:   emotes.markdown
	new file:   tags.txt
2025-10-27 16:23:40 +01:00

1270 lines
50 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import discord
from discord.ext import commands
import os
import asyncio
import logging
from dotenv import load_dotenv
import aiomysql
import json
from datetime import datetime
from typing import Optional, List, Dict
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(level=logging.INFO)
# Database configuration
DATABASE_URL = os.getenv('DATABASE_URL')
DB_HOST = os.getenv('DB_HOST')
DB_PORT = os.getenv('DB_PORT', '5432') # Default to PostgreSQL port
DB_NAME = os.getenv('DB_NAME')
DB_USER = os.getenv('DB_USER')
DB_PASSWORD = os.getenv('DB_PASSWORD')
# Build DATABASE_URL from individual components if not provided
if not DATABASE_URL and all([DB_HOST, DB_NAME, DB_USER, DB_PASSWORD]):
DATABASE_URL = f"mysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
print(f"📝 Built DATABASE_URL from individual environment variables")
# Parse MySQL connection details from DATABASE_URL
def parse_database_url(url):
"""Parse MySQL connection URL into components"""
if not url:
return None
# Remove mysql:// prefix
if url.startswith('mysql://'):
url = url[8:]
# Split user:pass@host:port/db
if '@' in url:
auth, host_db = url.split('@', 1)
if ':' in auth:
user, password = auth.split(':', 1)
else:
user, password = auth, ''
else:
return None
if '/' in host_db:
host_port, database = host_db.split('/', 1)
else:
return None
if ':' in host_port:
host, port = host_port.split(':', 1)
try:
port = int(port)
except ValueError:
port = 3306
else:
host, port = host_port, 3306
return {
'host': host,
'port': port,
'user': user,
'password': password,
'db': database
}
# Global database connection pool
db_pool = None
# Bot configuration
intents = discord.Intents.default()
intents.message_content = True
intents.guilds = True
intents.members = True
bot = commands.Bot(command_prefix='!', intents=intents)
@bot.event
async def on_ready():
"""Event triggered when the bot is ready"""
print(f'{bot.user} is online and ready!')
print(f'Bot ID: {bot.user.id}')
print(f'Discord.py Version: {discord.__version__}')
print('------')
# Initialize database
await init_database()
# Set bot status
await bot.change_presence(
activity=discord.Game(name="Hearts of Iron IV ELO"),
status=discord.Status.online
)
# Sync hybrid commands on startup
try:
synced = await bot.tree.sync()
print(f'Synced {len(synced)} hybrid commands')
except Exception as e:
print(f'Failed to sync commands: {e}')
@bot.event
async def on_guild_join(guild):
"""Event triggered when the bot joins a server"""
print(f'Bot joined server "{guild.name}" (ID: {guild.id})')
@bot.event
async def on_guild_remove(guild):
"""Event triggered when the bot leaves a server"""
print(f'Bot left server "{guild.name}" (ID: {guild.id})')
# Owner Configuration
OWNER_ID = 253922739709018114
# ELO Configuration
STARTING_ELO = 800
K_FACTOR = 32
T_LEVEL_MULTIPLIERS = {
1: 0.8, # T1 countries get less points
2: 1.0, # T2 countries get normal points
3: 1.2 # T3 countries get more points
}
# Emoji and formatting helpers
def _all_accessible_emojis(ctx: commands.Context) -> List[discord.Emoji]:
"""Return emojis from current guild plus all guilds the bot is in."""
try:
guild_emojis = list(ctx.guild.emojis) if ctx.guild else []
except Exception:
guild_emojis = []
try:
bot_emojis = list(bot.emojis)
except Exception:
bot_emojis = []
return guild_emojis + bot_emojis
def find_custom_emoji(ctx: commands.Context, keyword_variants: List[str]) -> Optional[str]:
"""Try to find a custom emoji by a list of keyword variants (case-insensitive). Returns str(emoji) or None."""
emojis = _all_accessible_emojis(ctx)
for kw in keyword_variants:
kw = kw.lower()
for e in emojis:
try:
if kw in (e.name or '').lower():
return str(e)
except Exception:
continue
# Fallback to markdown-defined emojis if available
try:
if EMOTE_MAP:
for kw in keyword_variants:
key = kw.lower()
# Exact name match
if key in EMOTE_MAP:
return EMOTE_MAP[key]
# Substring match
for name_lower, mention in EMOTE_MAP.items():
if key in name_lower:
return mention
except NameError:
# EMOTE_MAP not defined yet
pass
return None
def get_t_emoji(ctx: commands.Context, t_level: int) -> str:
"""Return a suitable emoji for a T level, preferring custom emojis if present."""
mapping = {
1: ["hoi4_t1", "t1", "tier1"],
2: ["hoi4_t2", "t2", "tier2"],
3: ["hoi4_t3", "t3", "tier3"],
}
custom = find_custom_emoji(ctx, mapping.get(t_level, []))
if custom:
return custom
# Fallback unicode
return {1: "🔹", 2: "🔸", 3: "🔺"}.get(t_level, "🔹")
def get_team_emoji(ctx: commands.Context, team_name: str) -> str:
"""Return an emoji for common HOI4 team names like Axis/Allies/Comintern or a generic HOI4 emoji if available."""
name = (team_name or "").lower()
if any(k in name for k in ["axis", "achse"]):
custom = find_custom_emoji(ctx, ["axis", "hoi4_axis"])
return custom or ""
if any(k in name for k in ["allies", "alliierten", "ally"]):
custom = find_custom_emoji(ctx, ["allies", "hoi4_allies"])
return custom or "🔵"
if any(k in name for k in ["comintern", "ussr", "soviet"]):
custom = find_custom_emoji(ctx, ["comintern", "hoi4_comintern"])
return custom or "🔴"
# Generic HOI4 emoji or fallback
custom = find_custom_emoji(ctx, ["hoi4", "hearts_of_iron", "iron"])
if not custom:
custom = find_custom_emoji(ctx, ["eagle_hoi", "peace_hoi", "navy_hoi", "secretweapon_hoi"])
return custom or "🎖️"
def _flag_from_iso2(code: str) -> Optional[str]:
"""Return unicode flag from 2-letter ISO code (e.g., 'DE' -> 🇩🇪)."""
if not code or len(code) != 2:
return None
code = code.upper()
base = 0x1F1E6
try:
return chr(base + ord(code[0]) - ord('A')) + chr(base + ord(code[1]) - ord('A'))
except Exception:
return None
# Emotes markdown loader and map
def load_emote_markdown(path: Optional[str] = None) -> Dict[str, str]:
"""Parse emotes.markdown and return a mapping of lowercased emoji names to their mention strings.
Expected line format: <:Name:123456789012345678>
Lines that don't match are ignored."""
if path is None:
base_dir = os.path.dirname(os.path.abspath(__file__))
path = os.path.join(base_dir, 'emotes.markdown')
mapping: Dict[str, str] = {}
try:
with open(path, 'r', encoding='utf-8') as f:
for raw in f:
line = raw.strip()
if not line or not line.startswith('<:') or ':' not in line[2:]:
continue
# Format is <:NAME:ID>
try:
inner = line[2:-1] if line.endswith('>') else line[2:]
name, emoji_id = inner.split(':', 1)
name = name.strip()
mention = f"<:{name}:{emoji_id.strip('>')}>"
mapping[name.lower()] = mention
except Exception:
continue
except FileNotFoundError:
# Silent if not present
pass
except Exception as e:
print(f"⚠️ Failed to load emotes.markdown: {e}")
return mapping
# Load emotes mapping at import
EMOTE_MAP: Dict[str, str] = load_emote_markdown()
if EMOTE_MAP:
print(f"😀 Loaded {len(EMOTE_MAP)} custom emojis from emotes.markdown")
def load_country_tags(path: Optional[str] = None) -> Dict[str, str]:
"""Load HOI4 country tags mapping from tags.txt.
Supported formats per line:
TAG=Country Name | TAG:Country Name | TAG,Country Name | TAG Country Name
Lines starting with # are ignored.
Returns dict like { 'GER': 'Germany', ... }"""
if path is None:
base_dir = os.path.dirname(os.path.abspath(__file__))
path = os.path.join(base_dir, 'tags.txt')
mapping: Dict[str, str] = {}
try:
with open(path, 'r', encoding='utf-8') as f:
for raw in f:
line = raw.strip()
if not line or line.startswith('#'):
continue
tag = None
name = None
for sep in ['=', ';', ',', ':']:
if sep in line:
left, right = line.split(sep, 1)
tag = left.strip().upper()
name = right.strip()
break
if tag is None:
parts = line.split(None, 1)
if len(parts) == 2:
tag = parts[0].strip().upper()
name = parts[1].strip()
else:
tag = line.strip().upper()
name = line.strip()
if tag and name:
mapping[tag] = name
except FileNotFoundError:
print(" tags.txt not found; proceeding without country tag labels")
except Exception as e:
print(f"⚠️ Failed to load tags.txt: {e}")
return mapping
# Loaded at import
COUNTRY_TAGS: Dict[str, str] = load_country_tags()
if COUNTRY_TAGS:
print(f"🗺️ Loaded {len(COUNTRY_TAGS)} HOI4 country tags")
def get_country_label(country_tag: Optional[str]) -> Optional[str]:
"""Return a display label like "[GER] Germany" if known, or "[GER]" if unknown."""
if not country_tag:
return None
tag = country_tag.strip().upper()
name = COUNTRY_TAGS.get(tag)
return f"[{tag}] {name}" if name else f"[{tag}]"
def get_country_emoji(ctx: commands.Context, country: Optional[str]) -> str:
"""Prefer custom emoji matching the HOI4 tag (e.g., ger, hoi4_ger). If parameter is ISO2, show unicode flag. Else empty."""
if not country:
return ""
c = country.strip()
# Try custom emoji lookups using tag variants
variants = [c, c.lower(), f"hoi4_{c.lower()}", f"country_{c.lower()}"]
custom = find_custom_emoji(ctx, variants)
if custom:
return custom
# If user passed ISO2, render unicode flag
if len(c) == 2:
flag = _flag_from_iso2(c)
if flag:
return flag
# Otherwise, no emoji fallback to avoid noisy globes
return ""
# Database Functions
# Database Functions
async def init_database():
"""Initialize database connection and create tables"""
global db_pool
try:
# Parse DATABASE_URL for MySQL connection
db_config = parse_database_url(DATABASE_URL)
if not db_config:
raise ValueError("Invalid DATABASE_URL format")
print(f"🔌 Connecting to MySQL: {db_config['host']}:{db_config['port']}/{db_config['db']}")
# Create MySQL connection pool
db_pool = await aiomysql.create_pool(
host=db_config['host'],
port=db_config['port'],
user=db_config['user'],
password=db_config['password'],
db=db_config['db'],
charset='utf8mb4',
autocommit=True,
maxsize=10
)
async with db_pool.acquire() as conn:
async with conn.cursor() as cursor:
# Create players table (MySQL syntax)
await cursor.execute('''
CREATE TABLE IF NOT EXISTS players (
id INT AUTO_INCREMENT PRIMARY KEY,
discord_id BIGINT UNIQUE NOT NULL,
username VARCHAR(255) NOT NULL,
standard_elo INT DEFAULT 800,
competitive_elo INT DEFAULT 800,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
)
''')
# Create games table (MySQL syntax)
await cursor.execute('''
CREATE TABLE IF NOT EXISTS games (
id INT AUTO_INCREMENT PRIMARY KEY,
game_name VARCHAR(255) NOT NULL,
game_type VARCHAR(50) NOT NULL,
status VARCHAR(50) DEFAULT 'setup',
players JSON NOT NULL,
winner_team VARCHAR(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
finished_at TIMESTAMP NULL
)
''')
# Create game_results table (MySQL syntax)
await cursor.execute('''
CREATE TABLE IF NOT EXISTS game_results (
id INT AUTO_INCREMENT PRIMARY KEY,
game_id INT,
discord_id BIGINT NOT NULL,
team_name VARCHAR(255) NOT NULL,
t_level INT NOT NULL,
old_elo INT NOT NULL,
new_elo INT NOT NULL,
elo_change INT NOT NULL,
won BOOLEAN NOT NULL,
result_type VARCHAR(10) DEFAULT 'loss',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (game_id) REFERENCES games(id)
)
''')
# Add result_type column if it doesn't exist (for existing databases)
try:
await cursor.execute('''
ALTER TABLE game_results
ADD COLUMN result_type VARCHAR(10) DEFAULT 'loss'
''')
except:
# Column already exists, ignore error
pass
print("✅ Database initialized successfully")
except Exception as e:
print(f"❌ Database initialization failed: {e}")
import traceback
traceback.print_exc()
async def get_or_create_player(discord_id: int, username: str) -> Dict:
"""Get or create a player in the database"""
async with db_pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
# Try to get existing player
await cursor.execute(
"SELECT * FROM players WHERE discord_id = %s", (discord_id,)
)
player = await cursor.fetchone()
if not player:
# Create new player
await cursor.execute(
"INSERT INTO players (discord_id, username) VALUES (%s, %s)",
(discord_id, username)
)
await cursor.execute(
"SELECT * FROM players WHERE discord_id = %s", (discord_id,)
)
player = await cursor.fetchone()
else:
# Update username if changed
await cursor.execute(
"UPDATE players SET username = %s, updated_at = CURRENT_TIMESTAMP WHERE discord_id = %s",
(username, discord_id)
)
return dict(player)
def calculate_elo_change(player_elo: int, opponent_avg_elo: int, result: str, t_level: int) -> int:
"""Calculate ELO change using standard ELO formula with T-level multiplier
result can be 'win', 'loss', or 'draw'
In draws:
- If you're expected to win (higher ELO), you lose points for drawing
- If you're expected to lose (lower ELO), you gain points for drawing
- The bigger the ELO difference, the bigger the swing
"""
expected_score = 1 / (1 + 10 ** ((opponent_avg_elo - player_elo) / 400))
if result == 'win':
actual_score = 1.0
elif result == 'draw':
actual_score = 0.5 # Draw = 50% score
else: # loss
actual_score = 0.0
# Calculate the base ELO change
base_change = K_FACTOR * (actual_score - expected_score)
# Apply T-level multiplier
t_multiplier = T_LEVEL_MULTIPLIERS.get(t_level, 1.0)
final_change = base_change * t_multiplier
# For debugging/logging purposes, let's see what happens in draws
if result == 'draw':
elo_diff = player_elo - opponent_avg_elo
expected_percentage = expected_score * 100
print(f"📊 Draw calculation: Player ELO {player_elo} vs Opponent Avg {opponent_avg_elo}")
print(f" ELO Difference: {elo_diff:+d} | Expected win chance: {expected_percentage:.1f}%")
print(f" ELO change: {final_change:+.1f} (T{t_level} multiplier: {t_multiplier})")
return round(final_change)
# Owner only decorator
def is_owner():
def predicate(ctx):
return ctx.author.id == OWNER_ID
return commands.check(predicate)
# Owner Commands
@bot.hybrid_command(name='reload', description='Reloads the bot and syncs slash commands (Owner only)')
@is_owner()
async def reload_bot(ctx):
"""Reloads the bot and syncs slash commands (Owner only)"""
try:
print(f"🔄 Reload command started by {ctx.author} (ID: {ctx.author.id})")
# Send initial message
embed = discord.Embed(
title="🔄 Bot Reload",
description="Reloading bot and syncing commands...",
color=discord.Color.yellow()
)
message = await ctx.send(embed=embed)
print("📤 Initial reload message sent")
# Sync slash commands
print("🔄 Starting command sync...")
synced = await bot.tree.sync()
print(f"✅ Synced {len(synced)} commands successfully")
# Update embed with success
embed = discord.Embed(
title="✅ Bot Reloaded Successfully",
description=f"Bot has been reloaded!\nSynced {len(synced)} slash commands.",
color=discord.Color.green()
)
embed.add_field(name="Servers", value=len(bot.guilds), inline=True)
embed.add_field(name="Latency", value=f"{round(bot.latency * 1000)}ms", inline=True)
embed.set_footer(text=f"Reloaded by {ctx.author}", icon_url=ctx.author.avatar.url if ctx.author.avatar else None)
await message.edit(embed=embed)
print("✅ Reload completed successfully")
except Exception as e:
print(f"❌ Reload failed with error: {type(e).__name__}: {e}")
import traceback
traceback.print_exc()
embed = discord.Embed(
title="❌ Reload Failed",
description=f"**Error Type:** {type(e).__name__}\n**Error:** {str(e)[:1500]}",
color=discord.Color.red()
)
await ctx.send(embed=embed)
# HOI4 ELO Commands
@bot.hybrid_command(name='hoi4create', description='Create a new HOI4 game')
async def hoi4create(ctx, game_type: str, game_name: str):
"""Create a new HOI4 game"""
if game_type.lower() not in ['standard', 'competitive']:
await ctx.send("❌ Game type must be either 'standard' or 'competitive'")
return
try:
async with db_pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
# Check if game name already exists and is active
await cursor.execute(
"SELECT * FROM games WHERE game_name = %s AND status = 'setup'",
(game_name,)
)
existing_game = await cursor.fetchone()
if existing_game:
await ctx.send(f"❌ A game with name '{game_name}' is already in setup phase!")
return
# Create new game
await cursor.execute(
"INSERT INTO games (game_name, game_type, status, players) VALUES (%s, %s, 'setup', %s)",
(game_name, game_type.lower(), '[]')
)
embed = discord.Embed(
title="🎮 Game Created",
description=f"HOI4 {game_type.title()} game '{game_name}' has been created!",
color=discord.Color.green()
)
embed.add_field(name="Game Name", value=game_name, inline=True)
embed.add_field(name="Type", value=game_type.title(), inline=True)
embed.add_field(name="Status", value="Setup Phase", inline=True)
embed.set_footer(text="Use /hoi4setup to add players to this game")
await ctx.send(embed=embed)
except Exception as e:
await ctx.send(f"❌ Error creating game: {str(e)}")
@bot.hybrid_command(name='hoi4setup', description='Add a player to an existing game')
async def hoi4setup(ctx, game_name: str, user: discord.Member, team_name: str, t_level: int, country: Optional[str] = None):
"""Add a player to an existing game"""
if t_level not in [1, 2, 3]:
await ctx.send("❌ T-Level must be 1, 2, or 3")
return
try:
async with db_pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
# Get the game
await cursor.execute(
"SELECT * FROM games WHERE game_name = %s AND status = 'setup'",
(game_name,)
)
game = await cursor.fetchone()
if not game:
await ctx.send(f"❌ No game found with name '{game_name}' in setup phase!")
return
# Get or create player
player = await get_or_create_player(user.id, user.display_name)
# Parse existing players
players = json.loads(game['players']) if game['players'] else []
# Check if player already in game
for p in players:
if p['discord_id'] == user.id:
await ctx.send(f"{user.display_name} is already in this game!")
return
# Add player to game
player_data = {
'discord_id': user.id,
'username': user.display_name,
'team_name': team_name,
't_level': t_level,
'current_elo': player[f"{game['game_type']}_elo"],
'country': country.strip() if country else None
}
players.append(player_data)
# Update game
await cursor.execute(
"UPDATE games SET players = %s WHERE id = %s",
(json.dumps(players), game['id'])
)
embed = discord.Embed(
title="✅ Player Added",
description=f"{user.display_name} has been added to '{game_name}'!",
color=discord.Color.green()
)
embed.add_field(name="Player", value=user.display_name, inline=True)
embed.add_field(name="Team", value=team_name, inline=True)
embed.add_field(name="T-Level", value=f"T{t_level}", inline=True)
embed.add_field(name="Current ELO", value=player[f"{game['game_type']}_elo"], inline=True)
if country:
flag = get_country_emoji(ctx, country)
label = get_country_label(country)
value = f"{flag} {label}".strip()
embed.add_field(name="Country", value=value, inline=True)
embed.add_field(name="Players in Game", value=len(players), inline=True)
await ctx.send(embed=embed)
except Exception as e:
await ctx.send(f"❌ Error adding player: {str(e)}")
@bot.hybrid_command(name='hoi4end', description='End a game and calculate ELO changes')
async def hoi4end(ctx, game_name: str, winner_team: str):
"""End a game and calculate ELO changes. Use 'draw' for ties."""
try:
async with db_pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
# Get the game
await cursor.execute(
"SELECT * FROM games WHERE game_name = %s AND status = 'setup'",
(game_name,)
)
game = await cursor.fetchone()
if not game:
await ctx.send(f"❌ No active game found with name '{game_name}'!")
return
players = json.loads(game['players']) if game['players'] else []
if len(players) < 2:
await ctx.send("❌ Game needs at least 2 players to end!")
return
# Check if winner team exists or if it's a draw
teams = {p['team_name'] for p in players}
is_draw = winner_team.lower() == 'draw'
if not is_draw and winner_team not in teams:
available_teams = ', '.join(teams)
await ctx.send(f"❌ Team '{winner_team}' not found in game! Available teams: {available_teams}, draw")
return
# Calculate team averages
team_elos = {}
team_players = {}
for player in players:
team = player['team_name']
if team not in team_elos:
team_elos[team] = []
team_players[team] = []
team_elos[team].append(player['current_elo'])
team_players[team].append(player)
# Calculate average ELOs for each team
team_averages = {team: sum(elos) / len(elos) for team, elos in team_elos.items()}
elo_changes = []
# Calculate ELO changes for each player
for player in players:
team = player['team_name']
# Determine result for this player
if is_draw:
result = 'draw'
elif team == winner_team:
result = 'win'
else:
result = 'loss'
# Calculate opponent average (average of all other teams)
opponent_elos = []
for other_team, elos in team_elos.items():
if other_team != team:
opponent_elos.extend(elos)
opponent_avg = sum(opponent_elos) / len(opponent_elos) if opponent_elos else player['current_elo']
elo_change = calculate_elo_change(
player['current_elo'],
opponent_avg,
result,
player['t_level']
)
new_elo = max(0, player['current_elo'] + elo_change) # Prevent negative ELO
elo_changes.append({
'discord_id': player['discord_id'],
'username': player['username'],
'team_name': team,
't_level': player['t_level'],
'old_elo': player['current_elo'],
'new_elo': new_elo,
'elo_change': elo_change,
'result': result
})
# Update player ELOs and save game results
for change in elo_changes:
# Update player ELO
elo_field = f"{game['game_type']}_elo"
await cursor.execute(
f"UPDATE players SET {elo_field} = %s, updated_at = CURRENT_TIMESTAMP WHERE discord_id = %s",
(change['new_elo'], change['discord_id'])
)
# Save game result
won = change['result'] == 'win'
await cursor.execute(
"""INSERT INTO game_results
(game_id, discord_id, team_name, t_level, old_elo, new_elo, elo_change, won, result_type)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)""",
(game['id'], change['discord_id'], change['team_name'],
change['t_level'], change['old_elo'], change['new_elo'],
change['elo_change'], won, change['result'])
)
# Mark game as finished
final_result = "Draw" if is_draw else winner_team
await cursor.execute(
"UPDATE games SET status = 'finished', winner_team = %s, finished_at = CURRENT_TIMESTAMP WHERE id = %s",
(final_result, game['id'])
)
# Create result embed
if is_draw:
embed = discord.Embed(
title="🤝 Game Finished!",
description=f"Game '{game_name}' has ended!\n**Result: Draw**",
color=discord.Color.orange()
)
else:
embed = discord.Embed(
title="🏆 Game Finished!",
description=f"Game '{game_name}' has ended!\n**Winner: {winner_team}**",
color=discord.Color.gold()
)
# Group results by team
teams_results = {}
for change in elo_changes:
team = change['team_name']
if team not in teams_results:
teams_results[team] = []
teams_results[team].append(change)
for team, team_changes in teams_results.items():
team_text = ""
for change in team_changes:
emoji = "📈" if change['elo_change'] > 0 else "📉" if change['elo_change'] < 0 else "➡️"
result_emoji = ""
if change['result'] == 'win':
result_emoji = "🏆"
elif change['result'] == 'draw':
result_emoji = "🤝"
else:
result_emoji = "💔"
team_text += f"{change['username']}: {change['old_elo']}{change['new_elo']} ({change['elo_change']:+d}) {emoji}\n"
# Team header with appropriate icon
if is_draw:
team_header = f"🤝 Team {team}"
elif team == winner_team:
team_header = f"🏆 Team {team}"
else:
team_header = f"💔 Team {team}"
embed.add_field(
name=team_header,
value=team_text,
inline=False
)
embed.set_footer(text=f"Game Type: {game['game_type'].title()}")
await ctx.send(embed=embed)
except Exception as e:
await ctx.send(f"❌ Error ending game: {str(e)}")
@bot.hybrid_command(name='hoi4stats', description='Show your HOI4 ELO statistics')
async def hoi4stats(ctx, user: Optional[discord.Member] = None):
"""Show HOI4 ELO statistics for a user"""
target_user = user or ctx.author
try:
player = await get_or_create_player(target_user.id, target_user.display_name)
# Get player rankings
async with db_pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
# Get standard rank
await cursor.execute(
"SELECT COUNT(*) + 1 as player_rank FROM players WHERE standard_elo > %s",
(player['standard_elo'],)
)
standard_rank_result = await cursor.fetchone()
standard_rank = standard_rank_result['player_rank']
# Get competitive rank
await cursor.execute(
"SELECT COUNT(*) + 1 as player_rank FROM players WHERE competitive_elo > %s",
(player['competitive_elo'],)
)
competitive_rank_result = await cursor.fetchone()
competitive_rank = competitive_rank_result['player_rank']
# Get total player count
await cursor.execute("SELECT COUNT(*) as total FROM players")
total_players_result = await cursor.fetchone()
total_players = total_players_result['total']
# Get game statistics with proper draw detection
await cursor.execute(
"""SELECT
COUNT(*) as total_games,
SUM(CASE WHEN result_type = 'win' THEN 1 ELSE 0 END) as games_won,
SUM(CASE WHEN result_type = 'draw' THEN 1 ELSE 0 END) as games_drawn,
SUM(CASE WHEN result_type = 'loss' THEN 1 ELSE 0 END) as games_lost
FROM game_results WHERE discord_id = %s""",
(target_user.id,)
)
game_stats = await cursor.fetchone()
total_games = game_stats['total_games'] or 0
games_won = game_stats['games_won'] or 0
games_drawn = game_stats['games_drawn'] or 0
games_lost = game_stats['games_lost'] or 0
win_rate = (games_won / total_games * 100) if total_games > 0 else 0
# Create rank indicators with medals
def get_rank_display(rank, total):
if rank == 1:
return f"🥇 #{rank} of {total}"
elif rank == 2:
return f"🥈 #{rank} of {total}"
elif rank == 3:
return f"🥉 #{rank} of {total}"
else:
return f"#{rank} of {total}"
embed = discord.Embed(
title=f"📊 HOI4 ELO Stats - {target_user.display_name}",
color=discord.Color.blue()
)
# ELO and Rankings
embed.add_field(
name="🎯 Standard ELO",
value=f"**{player['standard_elo']}** ELO\n{get_rank_display(standard_rank, total_players)}",
inline=True
)
embed.add_field(
name="🏆 Competitive ELO",
value=f"**{player['competitive_elo']}** ELO\n{get_rank_display(competitive_rank, total_players)}",
inline=True
)
embed.add_field(
name="📅 Player Since",
value=player['created_at'].strftime("%m/%d/%Y"),
inline=True
)
# Game Statistics
if total_games > 0:
embed.add_field(
name="🎮 Games Played",
value=f"**{total_games}** total games",
inline=True
)
if games_drawn > 0:
embed.add_field(
name="📊 W/D/L Record",
value=f"**{games_won}W** / **{games_drawn}D** / **{games_lost}L**",
inline=True
)
else:
embed.add_field(
name="📈 Win/Loss",
value=f"**{games_won}W** / **{games_lost}L**",
inline=True
)
embed.add_field(
name="📊 Win Rate",
value=f"**{win_rate:.1f}%**",
inline=True
)
else:
embed.add_field(
name="🎮 Games Played",
value="No games played yet",
inline=False
)
if target_user.avatar:
embed.set_thumbnail(url=target_user.avatar.url)
# Add percentile information
standard_percentile = ((total_players - standard_rank) / total_players * 100) if total_players > 0 else 0
competitive_percentile = ((total_players - competitive_rank) / total_players * 100) if total_players > 0 else 0
embed.set_footer(
text=f"Standard: Top {100-standard_percentile:.1f}% | Competitive: Top {100-competitive_percentile:.1f}%"
)
await ctx.send(embed=embed)
except Exception as e:
await ctx.send(f"❌ Error getting stats: {str(e)}")
@bot.hybrid_command(name='hoi4games', description='Show active games as team showcases')
async def hoi4games(ctx):
"""Show all active games with teams presented side-by-side like a showcase."""
try:
async with db_pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
await cursor.execute(
"SELECT * FROM games WHERE status = 'setup' ORDER BY created_at DESC"
)
games = await cursor.fetchall()
if not games:
await ctx.send("📝 No active games found. Use `/hoi4create` to create a new game!")
return
embed = discord.Embed(
title="🎮 Active HOI4 Games",
description="Team lineups are shown side-by-side. Use /hoi4setup to add players.",
color=discord.Color.green()
)
# Limit total embed fields to avoid Discord limit (25). Each game uses up to 3 fields.
max_games = max(1, 25 // 3)
games = games[:max_games]
for game in games:
players = json.loads(game['players']) if game['players'] else []
# Build team structures
teams: Dict[str, List[Dict]] = {}
for p in players:
teams.setdefault(p['team_name'], []).append(p)
# Compute average ELO per team
team_avgs = {t: (sum(m['current_elo'] for m in mlist) / len(mlist)) if mlist else 0 for t, mlist in teams.items()}
# Sort teams by name to keep stable order
ordered_teams = sorted(teams.items(), key=lambda x: x[0].lower())
# Helper to format a team's field
def build_team_field(team_name: str, members: List[Dict]) -> Dict[str, str]:
t_emoji = get_team_emoji(ctx, team_name)
avg = team_avgs.get(team_name, 0)
name = f"{t_emoji} {team_name} (avg {avg:.0f})"
# Each player line: T-level emoji, name, elo
lines = []
for m in sorted(members, key=lambda mm: (-mm.get('t_level', 2), mm['username'].lower())):
te = get_t_emoji(ctx, int(m.get('t_level', 2)))
ctry = m.get('country')
flag = get_country_emoji(ctx, ctry) if ctry else ""
label = get_country_label(ctry) if ctry else None
parts = [te]
if flag:
parts.append(flag)
if label:
parts.append(label)
parts.append(m['username'])
lines.append(f"{' '.join(parts)} ({m['current_elo']})")
value = "\n".join(lines) if lines else "No players yet"
# Discord field value max ~1024 chars; trim if necessary
if len(value) > 1000:
value = value[:997] + "..."
return {"name": name, "value": value}
# If no teams yet, show empty placeholder for this game
if not ordered_teams:
embed.add_field(
name=f"{game['game_name']} ({game['game_type'].title()})",
value="No players yet",
inline=False
)
continue
# For two teams, show A vs B with a center VS field; otherwise, list each team inline
team_fields = [build_team_field(tn, members) for tn, members in ordered_teams]
# Add a header field per game
embed.add_field(
name=f"🎯 {game['game_name']} ({game['game_type'].title()})",
value=f"Players: {len(players)} | Teams: {len(ordered_teams)}",
inline=False
)
if len(team_fields) == 1:
f1 = team_fields[0]
embed.add_field(name=f1["name"], value=f1["value"], inline=False)
elif len(team_fields) >= 2:
f1, f2 = team_fields[0], team_fields[1]
embed.add_field(name=f1["name"], value=f1["value"], inline=True)
embed.add_field(name="⚔️ VS ⚔️", value="\u200b", inline=True)
embed.add_field(name=f2["name"], value=f2["value"], inline=True)
# If more teams exist, add them below
for extra in team_fields[2:]:
embed.add_field(name=extra["name"], value=extra["value"], inline=False)
await ctx.send(embed=embed)
except Exception as e:
await ctx.send(f"❌ Error getting games: {str(e)}")
@bot.hybrid_command(name='hoi4history', description='Show past games with optional filters')
async def hoi4history(ctx, limit: Optional[int] = 10, player: Optional[discord.Member] = None, game_name: Optional[str] = None, game_type: Optional[str] = None):
"""Show past games with optional filters"""
if limit > 50:
limit = 50 # Prevent too many results
try:
async with db_pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
# Build dynamic query based on filters
query = "SELECT * FROM games WHERE status = 'finished'"
params = []
if game_name:
query += " AND game_name LIKE %s"
params.append(f"%{game_name}%")
if game_type and game_type.lower() in ['standard', 'competitive']:
query += " AND game_type = %s"
params.append(game_type.lower())
if player:
query += " AND JSON_CONTAINS(players, JSON_OBJECT('discord_id', %s))"
params.append(player.id)
query += " ORDER BY finished_at DESC LIMIT %s"
params.append(limit)
await cursor.execute(query, params)
games = await cursor.fetchall()
if not games:
await ctx.send("📝 No finished games found with the specified filters!")
return
embed = discord.Embed(
title="📚 HOI4 Game History",
color=discord.Color.blue()
)
if player:
embed.description = f"Filtered by player: {player.display_name}"
if game_name:
embed.description = f"Filtered by game name: {game_name}"
if game_type:
embed.description = f"Filtered by type: {game_type.title()}"
for game in games:
players = json.loads(game['players']) if game['players'] else []
# Count teams and players
teams = {}
for p in players:
team = p['team_name']
if team not in teams:
teams[team] = 0
teams[team] += 1
# Format date
finished_date = game['finished_at'].strftime("%m/%d/%Y %H:%M") if game['finished_at'] else "Unknown"
# Winner indicator
winner = game['winner_team'] if game['winner_team'] else "Unknown"
game_info = f"**Winner:** {winner}\n"
game_info += f"**Type:** {game['game_type'].title()}\n"
game_info += f"**Players:** {len(players)} | **Teams:** {len(teams)}\n"
game_info += f"**Finished:** {finished_date}"
embed.add_field(
name=f"🏆 {game['game_name']}",
value=game_info,
inline=False
)
embed.set_footer(text=f"Showing {len(games)} of last {limit} games")
await ctx.send(embed=embed)
except Exception as e:
await ctx.send(f"❌ Error getting game history: {str(e)}")
@bot.hybrid_command(name='hoi4leaderboard', description='Show ELO leaderboard')
async def hoi4leaderboard(ctx, game_type: Optional[str] = "standard", limit: Optional[int] = 10):
"""Show ELO leaderboard for standard or competitive"""
if game_type.lower() not in ['standard', 'competitive']:
await ctx.send("❌ Game type must be either 'standard' or 'competitive'")
return
if limit > 25:
limit = 25 # Prevent too many results
try:
async with db_pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
# Get top players by ELO
elo_field = f"{game_type.lower()}_elo"
await cursor.execute(
f"SELECT discord_id, username, {elo_field}, created_at FROM players ORDER BY {elo_field} DESC LIMIT %s",
(limit,)
)
players = await cursor.fetchall()
# Also get some statistics
await cursor.execute(
f"SELECT COUNT(*) as total_players, AVG({elo_field}) as avg_elo, MAX({elo_field}) as max_elo, MIN({elo_field}) as min_elo FROM players"
)
stats = await cursor.fetchone()
if not players:
await ctx.send("📝 No players found in the database!")
return
# Create leaderboard embed
embed = discord.Embed(
title=f"🏆 HOI4 {game_type.title()} Leaderboard",
color=discord.Color.gold()
)
# Add statistics
embed.description = f"**Total Players:** {stats['total_players']} | **Average ELO:** {stats['avg_elo']:.0f}"
leaderboard_text = ""
medals = ["🥇", "🥈", "🥉"]
for i, player in enumerate(players, 1):
# Get medal or rank number
if i <= 3:
rank_indicator = medals[i-1]
else:
rank_indicator = f"**{i}.**"
elo_value = player[elo_field]
username = player['username']
# Get additional player stats
async with db_pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
# Count games played with proper result detection
await cursor.execute(
"""SELECT
COUNT(*) as games_played,
SUM(CASE WHEN result_type = 'win' THEN 1 ELSE 0 END) as games_won,
SUM(CASE WHEN result_type = 'draw' THEN 1 ELSE 0 END) as games_drawn
FROM game_results WHERE discord_id = %s""",
(player['discord_id'],)
)
player_stats = await cursor.fetchone()
games_played = player_stats['games_played'] or 0
games_won = player_stats['games_won'] or 0
games_drawn = player_stats['games_drawn'] or 0
win_rate = (games_won / games_played * 100) if games_played > 0 else 0
leaderboard_text += f"{rank_indicator} **{username}** - {elo_value} ELO\n"
if games_drawn > 0:
leaderboard_text += f" 📊 {games_played} games | {games_won}W-{games_drawn}D-{games_played - games_won - games_drawn}L | {win_rate:.1f}% win rate\n\n"
else:
leaderboard_text += f" 📊 {games_played} games | {win_rate:.1f}% win rate\n\n"
embed.add_field(
name="Rankings",
value=leaderboard_text,
inline=False
)
embed.set_footer(text=f"ELO Range: {stats['min_elo']:.0f} - {stats['max_elo']:.0f}")
await ctx.send(embed=embed)
except Exception as e:
await ctx.send(f"❌ Error getting leaderboard: {str(e)}")
@bot.event
async def on_command_error(ctx, error):
"""Handles command errors"""
if isinstance(error, commands.CheckFailure):
await ctx.send("❌ You don't have permission to use this command!")
elif isinstance(error, commands.CommandNotFound):
# Silently ignore command not found errors
pass
elif isinstance(error, commands.MissingRequiredArgument):
await ctx.send(f"❌ Missing arguments! Command: `{ctx.command}`")
elif isinstance(error, commands.BadArgument):
await ctx.send("❌ Invalid argument!")
else:
# Log detailed error information
print(f"❌ Unknown error in command '{ctx.command}': {type(error).__name__}: {error}")
import traceback
traceback.print_exc()
# Send detailed error to user if owner
if ctx.author.id == OWNER_ID:
await ctx.send(f"❌ **Error Details (Owner only):**\n```python\n{type(error).__name__}: {str(error)[:1800]}\n```")
else:
await ctx.send("❌ An unknown error occurred!")
async def main():
"""Main function to start the bot"""
# Load Discord token from environment variables
token = os.getenv('DISCORD_TOKEN')
if not token:
print("❌ DISCORD_TOKEN environment variable not found!")
print("Please set the DISCORD_TOKEN variable in Coolify or create a .env file")
return
if not DATABASE_URL:
print("❌ DATABASE_URL environment variable not found!")
print("Please set either DATABASE_URL or individual DB variables (DB_HOST, DB_NAME, DB_USER, DB_PASSWORD) in Coolify")
print(f"Current values - HOST: {DB_HOST}, NAME: {DB_NAME}, USER: {DB_USER}, PASSWORD: {'***' if DB_PASSWORD else 'None'}")
return
try:
print("🚀 Starting bot...")
await bot.start(token)
except discord.LoginFailure:
print("❌ Invalid Discord token!")
except Exception as e:
print(f"❌ Error starting bot: {e}")
finally:
if db_pool:
await db_pool.close()
if __name__ == "__main__":
asyncio.run(main())