1645 lines
66 KiB
Python
1645 lines
66 KiB
Python
import discord
|
||
from discord.ext import commands
|
||
import os
|
||
import asyncio
|
||
import logging
|
||
from dotenv import load_dotenv
|
||
import aiomysql
|
||
import json
|
||
from datetime import datetime
|
||
from typing import Optional, List, Dict
|
||
|
||
# Load environment variables
|
||
load_dotenv()
|
||
|
||
# Configure logging
|
||
logging.basicConfig(level=logging.INFO)
|
||
|
||
# Database configuration
|
||
DATABASE_URL = os.getenv('DATABASE_URL')
|
||
DB_HOST = os.getenv('DB_HOST')
|
||
DB_PORT = os.getenv('DB_PORT', '5432') # Default to PostgreSQL port
|
||
DB_NAME = os.getenv('DB_NAME')
|
||
DB_USER = os.getenv('DB_USER')
|
||
DB_PASSWORD = os.getenv('DB_PASSWORD')
|
||
|
||
# Build DATABASE_URL from individual components if not provided
|
||
if not DATABASE_URL and all([DB_HOST, DB_NAME, DB_USER, DB_PASSWORD]):
|
||
DATABASE_URL = f"mysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
|
||
print(f"📝 Built DATABASE_URL from individual environment variables")
|
||
|
||
# Parse MySQL connection details from DATABASE_URL
|
||
def parse_database_url(url):
|
||
"""Parse MySQL connection URL into components"""
|
||
if not url:
|
||
return None
|
||
|
||
# Remove mysql:// prefix
|
||
if url.startswith('mysql://'):
|
||
url = url[8:]
|
||
|
||
# Split user:pass@host:port/db
|
||
if '@' in url:
|
||
auth, host_db = url.split('@', 1)
|
||
if ':' in auth:
|
||
user, password = auth.split(':', 1)
|
||
else:
|
||
user, password = auth, ''
|
||
else:
|
||
return None
|
||
|
||
if '/' in host_db:
|
||
host_port, database = host_db.split('/', 1)
|
||
else:
|
||
return None
|
||
|
||
if ':' in host_port:
|
||
host, port = host_port.split(':', 1)
|
||
try:
|
||
port = int(port)
|
||
except ValueError:
|
||
port = 3306
|
||
else:
|
||
host, port = host_port, 3306
|
||
|
||
return {
|
||
'host': host,
|
||
'port': port,
|
||
'user': user,
|
||
'password': password,
|
||
'db': database
|
||
}
|
||
|
||
# Global database connection pool
|
||
db_pool = None
|
||
|
||
# Bot configuration
|
||
intents = discord.Intents.default()
|
||
intents.message_content = True
|
||
intents.guilds = True
|
||
intents.members = True
|
||
|
||
bot = commands.Bot(command_prefix='!', intents=intents)
|
||
|
||
@bot.event
|
||
async def on_ready():
|
||
"""Event triggered when the bot is ready"""
|
||
print(f'{bot.user} is online and ready!')
|
||
print(f'Bot ID: {bot.user.id}')
|
||
print(f'Discord.py Version: {discord.__version__}')
|
||
print('------')
|
||
|
||
# Initialize database
|
||
await init_database()
|
||
|
||
# Set bot status
|
||
await bot.change_presence(
|
||
activity=discord.Game(name="Hearts of Iron IV ELO"),
|
||
status=discord.Status.online
|
||
)
|
||
|
||
# Sync hybrid commands on startup
|
||
try:
|
||
synced = await bot.tree.sync()
|
||
print(f'Synced {len(synced)} hybrid commands')
|
||
except Exception as e:
|
||
print(f'Failed to sync commands: {e}')
|
||
|
||
@bot.event
|
||
async def on_guild_join(guild):
|
||
"""Event triggered when the bot joins a server"""
|
||
print(f'Bot joined server "{guild.name}" (ID: {guild.id})')
|
||
|
||
@bot.event
|
||
async def on_guild_remove(guild):
|
||
"""Event triggered when the bot leaves a server"""
|
||
print(f'Bot left server "{guild.name}" (ID: {guild.id})')
|
||
|
||
# Owner Configuration
|
||
OWNER_ID = 253922739709018114
|
||
|
||
# ELO Configuration
|
||
STARTING_ELO = 800
|
||
K_FACTOR = 32
|
||
T_LEVEL_MULTIPLIERS = {
|
||
1: 0.8, # T1 countries get less points
|
||
2: 1.0, # T2 countries get normal points
|
||
3: 1.2 # T3 countries get more points
|
||
}
|
||
|
||
# Preferred team emoji overrides (will be used if no guild/file match is found)
|
||
TEAM_EMOTE_OVERRIDES: Dict[str, str] = {
|
||
"axis": "<:fascism:1432391685127536762>",
|
||
"allies": "<:democracy:1432391686612586528>",
|
||
"comintern": "<:communism:1432391682267025479>",
|
||
"cuf": "<:ChineseUnitedFront:1432422469985112156>",
|
||
"default": "<:neutrality:1432391681059197102>",
|
||
}
|
||
|
||
# Role mapping for ELO tiers (IDs provided by user)
|
||
# Assumption: Top tier is >= 900 ELO rather than strictly >900,
|
||
# to avoid leaving 900 unassigned. Adjust if you want 900 handled differently.
|
||
STANDARD_ELO_ROLE_IDS = {
|
||
"gte_900": 1432368177014374497,
|
||
"851_899": 1432368177014374496,
|
||
"801_850": 1432368177014374495,
|
||
"eq_800": 1432368177014374494,
|
||
"751_799": 1432368177014374493,
|
||
"701_750": 1432368177014374492,
|
||
"lt_700": 1432368177014374491,
|
||
}
|
||
|
||
COMPETITIVE_ELO_ROLE_IDS = {
|
||
"gte_900": 1432368177030893672,
|
||
"851_899": 1432368177030893671,
|
||
"801_850": 1432368177030893670,
|
||
"eq_800": 1432368177030893669,
|
||
"751_799": 1432368177030893668,
|
||
"701_750": 1432368177014374499,
|
||
"lt_700": 1432368177014374498,
|
||
}
|
||
|
||
def _role_id_for_elo(elo: int, category: str) -> Optional[int]:
|
||
"""Return the role ID for the given ELO and category ('standard'|'competitive')."""
|
||
ids = STANDARD_ELO_ROLE_IDS if category == "standard" else COMPETITIVE_ELO_ROLE_IDS
|
||
if elo >= 900:
|
||
return ids["gte_900"]
|
||
if 851 <= elo <= 899:
|
||
return ids["851_899"]
|
||
if 801 <= elo <= 850:
|
||
return ids["801_850"]
|
||
if elo == 800:
|
||
return ids["eq_800"]
|
||
if 751 <= elo <= 799:
|
||
return ids["751_799"]
|
||
if 701 <= elo <= 750:
|
||
return ids["701_750"]
|
||
# < 700
|
||
return ids["lt_700"]
|
||
|
||
def _category_role_ids(category: str) -> List[int]:
|
||
ids = STANDARD_ELO_ROLE_IDS if category == "standard" else COMPETITIVE_ELO_ROLE_IDS
|
||
return list(ids.values())
|
||
|
||
async def update_member_elo_role(member: discord.Member, elo: int, category: str, reason: Optional[str] = None):
|
||
"""Ensure the member has exactly one rank role for the given category matching their ELO.
|
||
- category: 'standard' or 'competitive'
|
||
"""
|
||
if member is None or member.guild is None:
|
||
return
|
||
try:
|
||
guild = member.guild
|
||
target_role_id = _role_id_for_elo(elo, category)
|
||
if not target_role_id:
|
||
return
|
||
target_role = guild.get_role(int(target_role_id))
|
||
if not target_role:
|
||
return # Role not found in this guild
|
||
|
||
# Remove any other roles from the same category
|
||
category_ids = set(_category_role_ids(category))
|
||
roles_to_remove = [r for r in member.roles if r.id in category_ids and r.id != target_role.id]
|
||
if roles_to_remove:
|
||
await member.remove_roles(*roles_to_remove, reason=reason)
|
||
|
||
# Add the target role if missing
|
||
if target_role not in member.roles:
|
||
await member.add_roles(target_role, reason=reason)
|
||
except discord.Forbidden:
|
||
# Missing permissions or role hierarchy issue
|
||
logging.warning(f"Insufficient permissions to modify roles for {member} in category {category}")
|
||
except discord.HTTPException as e:
|
||
logging.warning(f"Failed to update roles for {member}: {e}")
|
||
|
||
# Emoji and formatting helpers
|
||
def _all_accessible_emojis(ctx: commands.Context) -> List[discord.Emoji]:
|
||
"""Return emojis from current guild plus all guilds the bot is in."""
|
||
try:
|
||
guild_emojis = list(ctx.guild.emojis) if ctx.guild else []
|
||
except Exception:
|
||
guild_emojis = []
|
||
try:
|
||
bot_emojis = list(bot.emojis)
|
||
except Exception:
|
||
bot_emojis = []
|
||
return guild_emojis + bot_emojis
|
||
|
||
def find_custom_emoji(ctx: commands.Context, keyword_variants: List[str]) -> Optional[str]:
|
||
"""Try to find a custom emoji by a list of keyword variants (case-insensitive). Returns str(emoji) or None."""
|
||
emojis = _all_accessible_emojis(ctx)
|
||
for kw in keyword_variants:
|
||
kw = kw.lower()
|
||
for e in emojis:
|
||
try:
|
||
if kw in (e.name or '').lower():
|
||
return str(e)
|
||
except Exception:
|
||
continue
|
||
# Fallback to markdown-defined emojis if available
|
||
try:
|
||
if EMOTE_MAP:
|
||
for kw in keyword_variants:
|
||
key = kw.lower()
|
||
# Exact name match
|
||
if key in EMOTE_MAP:
|
||
return EMOTE_MAP[key]
|
||
# Substring match
|
||
for name_lower, mention in EMOTE_MAP.items():
|
||
if key in name_lower:
|
||
return mention
|
||
except NameError:
|
||
# EMOTE_MAP not defined yet
|
||
pass
|
||
return None
|
||
|
||
def get_t_emoji(ctx: commands.Context, t_level: int) -> str:
|
||
"""Return a suitable emoji for a T level, preferring custom emojis if present."""
|
||
mapping = {
|
||
1: ["hoi4_t1", "t1", "tier1"],
|
||
2: ["hoi4_t2", "t2", "tier2"],
|
||
3: ["hoi4_t3", "t3", "tier3"],
|
||
}
|
||
custom = find_custom_emoji(ctx, mapping.get(t_level, []))
|
||
if custom:
|
||
return custom
|
||
# Fallback unicode
|
||
return {1: "🔹", 2: "🔸", 3: "🔺"}.get(t_level, "🔹")
|
||
|
||
def get_team_emoji(ctx: commands.Context, team_name: str) -> str:
|
||
"""Return a team emoji using user's preferred overrides, with guild/file matches first."""
|
||
name = (team_name or "").lower()
|
||
|
||
def pick(keywords: List[str], override_key: str) -> str:
|
||
# Try specific keywords (ideology-first), then generic ones
|
||
custom = find_custom_emoji(ctx, keywords)
|
||
if custom:
|
||
return custom
|
||
# As an extra attempt, try a couple generic HOI4 icons
|
||
custom = find_custom_emoji(ctx, ["eagle_hoi", "peace_hoi", "navy_hoi", "secretweapon_hoi"])
|
||
if custom:
|
||
return custom
|
||
# Fall back to explicit override mapping
|
||
return TEAM_EMOTE_OVERRIDES.get(override_key, TEAM_EMOTE_OVERRIDES.get("default", "🎖️"))
|
||
|
||
if any(k in name for k in ["axis", "achse"]):
|
||
return pick(["fascism", "axis", "hoi4_axis"], "axis")
|
||
if any(k in name for k in ["allies", "alliierten", "ally"]):
|
||
return pick(["democracy", "allies", "hoi4_allies"], "allies")
|
||
if any(k in name for k in ["comintern", "ussr", "soviet"]):
|
||
return pick(["communism", "comintern", "hoi4_comintern"], "comintern")
|
||
# Chinese United Front (optional special case)
|
||
if ("chinese united front" in name) or ("chinese" in name and "front" in name) or ("cuf" in name):
|
||
return pick(["ChineseUnitedFront", "chineseunitedfront", "cuf"], "cuf")
|
||
|
||
# Default/other
|
||
custom = find_custom_emoji(ctx, ["neutrality", "hoi4", "hearts_of_iron", "iron"])
|
||
if custom:
|
||
return custom
|
||
return TEAM_EMOTE_OVERRIDES.get("default", "🎖️")
|
||
|
||
def _flag_from_iso2(code: str) -> Optional[str]:
|
||
"""Return unicode flag from 2-letter ISO code (e.g., 'DE' -> 🇩🇪)."""
|
||
if not code or len(code) != 2:
|
||
return None
|
||
code = code.upper()
|
||
base = 0x1F1E6
|
||
try:
|
||
return chr(base + ord(code[0]) - ord('A')) + chr(base + ord(code[1]) - ord('A'))
|
||
except Exception:
|
||
return None
|
||
|
||
# Emotes markdown loader and map
|
||
def load_emote_markdown(path: Optional[str] = None) -> Dict[str, str]:
|
||
"""Parse emotes.markdown and return a mapping of lowercased emoji names to their mention strings.
|
||
Expected line format: <:Name:123456789012345678>
|
||
Lines that don't match are ignored."""
|
||
if path is None:
|
||
base_dir = os.path.dirname(os.path.abspath(__file__))
|
||
path = os.path.join(base_dir, 'emotes.markdown')
|
||
mapping: Dict[str, str] = {}
|
||
try:
|
||
with open(path, 'r', encoding='utf-8') as f:
|
||
for raw in f:
|
||
line = raw.strip()
|
||
if not line or not line.startswith('<:') or ':' not in line[2:]:
|
||
continue
|
||
# Format is <:NAME:ID>
|
||
try:
|
||
inner = line[2:-1] if line.endswith('>') else line[2:]
|
||
name, emoji_id = inner.split(':', 1)
|
||
name = name.strip()
|
||
mention = f"<:{name}:{emoji_id.strip('>')}>"
|
||
mapping[name.lower()] = mention
|
||
except Exception:
|
||
continue
|
||
except FileNotFoundError:
|
||
# Silent if not present
|
||
pass
|
||
except Exception as e:
|
||
print(f"⚠️ Failed to load emotes.markdown: {e}")
|
||
return mapping
|
||
|
||
# Load emotes mapping at import
|
||
EMOTE_MAP: Dict[str, str] = load_emote_markdown()
|
||
if EMOTE_MAP:
|
||
print(f"😀 Loaded {len(EMOTE_MAP)} custom emojis from emotes.markdown")
|
||
|
||
def load_country_tags(path: Optional[str] = None) -> Dict[str, str]:
|
||
"""Load HOI4 country tags mapping from tags.txt.
|
||
Supported formats per line:
|
||
TAG=Country Name | TAG:Country Name | TAG,Country Name | TAG Country Name
|
||
Lines starting with # are ignored.
|
||
Returns dict like { 'GER': 'Germany', ... }"""
|
||
if path is None:
|
||
base_dir = os.path.dirname(os.path.abspath(__file__))
|
||
path = os.path.join(base_dir, 'tags.txt')
|
||
mapping: Dict[str, str] = {}
|
||
try:
|
||
with open(path, 'r', encoding='utf-8') as f:
|
||
for raw in f:
|
||
line = raw.strip()
|
||
if not line or line.startswith('#'):
|
||
continue
|
||
tag = None
|
||
name = None
|
||
for sep in ['=', ';', ',', ':']:
|
||
if sep in line:
|
||
left, right = line.split(sep, 1)
|
||
tag = left.strip().upper()
|
||
name = right.strip()
|
||
break
|
||
if tag is None:
|
||
parts = line.split(None, 1)
|
||
if len(parts) == 2:
|
||
tag = parts[0].strip().upper()
|
||
name = parts[1].strip()
|
||
else:
|
||
tag = line.strip().upper()
|
||
name = line.strip()
|
||
if tag and name:
|
||
mapping[tag] = name
|
||
except FileNotFoundError:
|
||
print("ℹ️ tags.txt not found; proceeding without country tag labels")
|
||
except Exception as e:
|
||
print(f"⚠️ Failed to load tags.txt: {e}")
|
||
return mapping
|
||
|
||
# Loaded at import
|
||
COUNTRY_TAGS: Dict[str, str] = load_country_tags()
|
||
if COUNTRY_TAGS:
|
||
print(f"🗺️ Loaded {len(COUNTRY_TAGS)} HOI4 country tags")
|
||
|
||
def get_country_label(country_tag: Optional[str]) -> Optional[str]:
|
||
"""Return a display label like "[GER] Germany" if known, or "[GER]" if unknown."""
|
||
if not country_tag:
|
||
return None
|
||
tag = country_tag.strip().upper()
|
||
name = COUNTRY_TAGS.get(tag)
|
||
return f"[{tag}] {name}" if name else f"[{tag}]"
|
||
|
||
def get_country_emoji(ctx: commands.Context, country: Optional[str]) -> str:
|
||
"""Prefer custom emoji matching the HOI4 tag (e.g., ger, hoi4_ger). If parameter is ISO2, show unicode flag. Else empty."""
|
||
if not country:
|
||
return ""
|
||
c = country.strip()
|
||
# Try custom emoji lookups using tag variants
|
||
variants = [c, c.lower(), f"hoi4_{c.lower()}", f"country_{c.lower()}"]
|
||
custom = find_custom_emoji(ctx, variants)
|
||
if custom:
|
||
return custom
|
||
# If user passed ISO2, render unicode flag
|
||
if len(c) == 2:
|
||
flag = _flag_from_iso2(c)
|
||
if flag:
|
||
return flag
|
||
# Otherwise, no emoji fallback to avoid noisy globes
|
||
return ""
|
||
|
||
# Database Functions
|
||
# Database Functions
|
||
async def init_database():
|
||
"""Initialize database connection and create tables"""
|
||
global db_pool
|
||
try:
|
||
# Parse DATABASE_URL for MySQL connection
|
||
db_config = parse_database_url(DATABASE_URL)
|
||
if not db_config:
|
||
raise ValueError("Invalid DATABASE_URL format")
|
||
|
||
print(f"🔌 Connecting to MySQL: {db_config['host']}:{db_config['port']}/{db_config['db']}")
|
||
|
||
# Create MySQL connection pool
|
||
db_pool = await aiomysql.create_pool(
|
||
host=db_config['host'],
|
||
port=db_config['port'],
|
||
user=db_config['user'],
|
||
password=db_config['password'],
|
||
db=db_config['db'],
|
||
charset='utf8mb4',
|
||
autocommit=True,
|
||
maxsize=10
|
||
)
|
||
|
||
async with db_pool.acquire() as conn:
|
||
async with conn.cursor() as cursor:
|
||
# Create players table (MySQL syntax)
|
||
await cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS players (
|
||
id INT AUTO_INCREMENT PRIMARY KEY,
|
||
discord_id BIGINT UNIQUE NOT NULL,
|
||
username VARCHAR(255) NOT NULL,
|
||
standard_elo INT DEFAULT 800,
|
||
competitive_elo INT DEFAULT 800,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
|
||
)
|
||
''')
|
||
|
||
# Create games table (MySQL syntax)
|
||
await cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS games (
|
||
id INT AUTO_INCREMENT PRIMARY KEY,
|
||
game_name VARCHAR(255) NOT NULL,
|
||
game_type VARCHAR(50) NOT NULL,
|
||
status VARCHAR(50) DEFAULT 'setup',
|
||
players JSON NOT NULL,
|
||
winner_team VARCHAR(255),
|
||
notification_message_id BIGINT NULL,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
finished_at TIMESTAMP NULL
|
||
)
|
||
''')
|
||
|
||
# Add notification_message_id column if it doesn't exist (for existing databases)
|
||
try:
|
||
await cursor.execute('''
|
||
ALTER TABLE games
|
||
ADD COLUMN notification_message_id BIGINT NULL
|
||
''')
|
||
except:
|
||
# Column already exists, ignore error
|
||
pass
|
||
|
||
# Create game_results table (MySQL syntax)
|
||
await cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS game_results (
|
||
id INT AUTO_INCREMENT PRIMARY KEY,
|
||
game_id INT,
|
||
discord_id BIGINT NOT NULL,
|
||
team_name VARCHAR(255) NOT NULL,
|
||
t_level INT NOT NULL,
|
||
old_elo INT NOT NULL,
|
||
new_elo INT NOT NULL,
|
||
elo_change INT NOT NULL,
|
||
won BOOLEAN NOT NULL,
|
||
result_type VARCHAR(10) DEFAULT 'loss',
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
FOREIGN KEY (game_id) REFERENCES games(id)
|
||
)
|
||
''')
|
||
|
||
# Add result_type column if it doesn't exist (for existing databases)
|
||
try:
|
||
await cursor.execute('''
|
||
ALTER TABLE game_results
|
||
ADD COLUMN result_type VARCHAR(10) DEFAULT 'loss'
|
||
''')
|
||
except:
|
||
# Column already exists, ignore error
|
||
pass
|
||
|
||
print("✅ Database initialized successfully")
|
||
|
||
except Exception as e:
|
||
print(f"❌ Database initialization failed: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
async def get_or_create_player(discord_id: int, username: str) -> Dict:
|
||
"""Get or create a player in the database"""
|
||
async with db_pool.acquire() as conn:
|
||
async with conn.cursor(aiomysql.DictCursor) as cursor:
|
||
# Try to get existing player
|
||
await cursor.execute(
|
||
"SELECT * FROM players WHERE discord_id = %s", (discord_id,)
|
||
)
|
||
player = await cursor.fetchone()
|
||
|
||
if not player:
|
||
# Create new player
|
||
await cursor.execute(
|
||
"INSERT INTO players (discord_id, username) VALUES (%s, %s)",
|
||
(discord_id, username)
|
||
)
|
||
await cursor.execute(
|
||
"SELECT * FROM players WHERE discord_id = %s", (discord_id,)
|
||
)
|
||
player = await cursor.fetchone()
|
||
else:
|
||
# Update username if changed
|
||
await cursor.execute(
|
||
"UPDATE players SET username = %s, updated_at = CURRENT_TIMESTAMP WHERE discord_id = %s",
|
||
(username, discord_id)
|
||
)
|
||
|
||
return dict(player)
|
||
|
||
def calculate_elo_change(player_elo: int, opponent_avg_elo: int, result: str, t_level: int) -> int:
|
||
"""Calculate ELO change using standard ELO formula with T-level multiplier
|
||
result can be 'win', 'loss', or 'draw'
|
||
|
||
In draws:
|
||
- If you're expected to win (higher ELO), you lose points for drawing
|
||
- If you're expected to lose (lower ELO), you gain points for drawing
|
||
- The bigger the ELO difference, the bigger the swing
|
||
"""
|
||
expected_score = 1 / (1 + 10 ** ((opponent_avg_elo - player_elo) / 400))
|
||
|
||
if result == 'win':
|
||
actual_score = 1.0
|
||
elif result == 'draw':
|
||
actual_score = 0.5 # Draw = 50% score
|
||
else: # loss
|
||
actual_score = 0.0
|
||
|
||
# Calculate the base ELO change
|
||
base_change = K_FACTOR * (actual_score - expected_score)
|
||
|
||
# Apply T-level multiplier
|
||
t_multiplier = T_LEVEL_MULTIPLIERS.get(t_level, 1.0)
|
||
|
||
final_change = base_change * t_multiplier
|
||
|
||
# For debugging/logging purposes, let's see what happens in draws
|
||
if result == 'draw':
|
||
elo_diff = player_elo - opponent_avg_elo
|
||
expected_percentage = expected_score * 100
|
||
print(f"📊 Draw calculation: Player ELO {player_elo} vs Opponent Avg {opponent_avg_elo}")
|
||
print(f" ELO Difference: {elo_diff:+d} | Expected win chance: {expected_percentage:.1f}%")
|
||
print(f" ELO change: {final_change:+.1f} (T{t_level} multiplier: {t_multiplier})")
|
||
|
||
return round(final_change)
|
||
|
||
# Owner only decorator
|
||
def is_owner():
|
||
def predicate(ctx):
|
||
return ctx.author.id == OWNER_ID
|
||
return commands.check(predicate)
|
||
|
||
# Owner Commands
|
||
@bot.hybrid_command(name='reload', description='Reloads the bot and syncs slash commands (Owner only)')
|
||
@is_owner()
|
||
async def reload_bot(ctx):
|
||
"""Reloads the bot and syncs slash commands (Owner only)"""
|
||
try:
|
||
print(f"🔄 Reload command started by {ctx.author} (ID: {ctx.author.id})")
|
||
|
||
# Send initial message
|
||
embed = discord.Embed(
|
||
title="🔄 Bot Reload",
|
||
description="Reloading bot and syncing commands...",
|
||
color=discord.Color.yellow()
|
||
)
|
||
message = await ctx.send(embed=embed)
|
||
print("📤 Initial reload message sent")
|
||
|
||
# Sync slash commands
|
||
print("🔄 Starting command sync...")
|
||
synced = await bot.tree.sync()
|
||
print(f"✅ Synced {len(synced)} commands successfully")
|
||
|
||
# Update embed with success
|
||
embed = discord.Embed(
|
||
title="✅ Bot Reloaded Successfully",
|
||
description=f"Bot has been reloaded!\nSynced {len(synced)} slash commands.",
|
||
color=discord.Color.green()
|
||
)
|
||
embed.add_field(name="Servers", value=len(bot.guilds), inline=True)
|
||
embed.add_field(name="Latency", value=f"{round(bot.latency * 1000)}ms", inline=True)
|
||
embed.set_footer(text=f"Reloaded by {ctx.author}", icon_url=ctx.author.avatar.url if ctx.author.avatar else None)
|
||
|
||
await message.edit(embed=embed)
|
||
print("✅ Reload completed successfully")
|
||
|
||
except Exception as e:
|
||
print(f"❌ Reload failed with error: {type(e).__name__}: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
embed = discord.Embed(
|
||
title="❌ Reload Failed",
|
||
description=f"**Error Type:** {type(e).__name__}\n**Error:** {str(e)[:1500]}",
|
||
color=discord.Color.red()
|
||
)
|
||
await ctx.send(embed=embed)
|
||
|
||
# Game notification channel
|
||
GAME_NOTIFICATION_CHANNEL_ID = 1432368177685332030
|
||
|
||
async def create_game_notification_embed(game_data: Dict, players_data: List[Dict]) -> discord.Embed:
|
||
"""Create an embed for game notifications"""
|
||
embed = discord.Embed(
|
||
title=f"🎮 {game_data['game_name']}",
|
||
description=f"**Type:** {game_data['game_type'].title()}\n**Status:** Setup Phase",
|
||
color=discord.Color.blue()
|
||
)
|
||
|
||
if not players_data:
|
||
embed.add_field(name="Players", value="No players yet", inline=False)
|
||
return embed
|
||
|
||
# Group players by team
|
||
teams = {}
|
||
for p in players_data:
|
||
team = p['team_name']
|
||
if team not in teams:
|
||
teams[team] = []
|
||
teams[team].append(p)
|
||
|
||
# Add team fields
|
||
for team_name, members in teams.items():
|
||
team_emoji = "🎖️" # Simple fallback since we don't have ctx here
|
||
avg_elo = sum(m['current_elo'] for m in members) / len(members) if members else 0
|
||
|
||
field_name = f"{team_emoji} {team_name} (avg {avg_elo:.0f})"
|
||
|
||
lines = []
|
||
for m in sorted(members, key=lambda mm: (-mm.get('t_level', 2), mm['username'])):
|
||
t_level = m.get('t_level', 2)
|
||
t_emoji = {1: "🔹", 2: "🔸", 3: "🔺"}.get(t_level, "🔹")
|
||
|
||
country = m.get('country')
|
||
country_text = f" [{country}]" if country else ""
|
||
|
||
lines.append(f"{t_emoji} {m['username']}{country_text} ({m['current_elo']})")
|
||
|
||
embed.add_field(name=field_name, value="\n".join(lines), inline=True)
|
||
|
||
embed.set_footer(text=f"Players: {len(players_data)} | Teams: {len(teams)}")
|
||
return embed
|
||
|
||
async def update_game_notification(game_data: Dict, players_data: List[Dict]):
|
||
"""Update or create game notification in the notification channel"""
|
||
try:
|
||
channel = bot.get_channel(GAME_NOTIFICATION_CHANNEL_ID)
|
||
if not channel:
|
||
return
|
||
|
||
embed = await create_game_notification_embed(game_data, players_data)
|
||
|
||
if game_data.get('notification_message_id'):
|
||
# Try to edit existing message
|
||
try:
|
||
message = await channel.fetch_message(game_data['notification_message_id'])
|
||
await message.edit(embed=embed)
|
||
except (discord.NotFound, discord.HTTPException):
|
||
# Message not found, create new one
|
||
message = await channel.send(embed=embed)
|
||
# Update DB with new message ID
|
||
async with db_pool.acquire() as conn:
|
||
async with conn.cursor() as cursor:
|
||
await cursor.execute(
|
||
"UPDATE games SET notification_message_id = %s WHERE id = %s",
|
||
(message.id, game_data['id'])
|
||
)
|
||
else:
|
||
# Create new message
|
||
message = await channel.send(embed=embed)
|
||
# Store message ID in database
|
||
async with db_pool.acquire() as conn:
|
||
async with conn.cursor() as cursor:
|
||
await cursor.execute(
|
||
"UPDATE games SET notification_message_id = %s WHERE id = %s",
|
||
(message.id, game_data['id'])
|
||
)
|
||
except Exception as e:
|
||
logging.warning(f"Failed to update game notification: {e}")
|
||
|
||
# HOI4 ELO Commands
|
||
@bot.hybrid_command(name='hoi4create', description='Create a new HOI4 game')
|
||
async def hoi4create(ctx, game_type: str, game_name: str):
|
||
"""Create a new HOI4 game"""
|
||
if game_type.lower() not in ['standard', 'competitive']:
|
||
await ctx.send("❌ Game type must be either 'standard' or 'competitive'")
|
||
return
|
||
|
||
try:
|
||
async with db_pool.acquire() as conn:
|
||
async with conn.cursor(aiomysql.DictCursor) as cursor:
|
||
# Check if game name already exists and is active
|
||
await cursor.execute(
|
||
"SELECT * FROM games WHERE game_name = %s AND status = 'setup'",
|
||
(game_name,)
|
||
)
|
||
existing_game = await cursor.fetchone()
|
||
|
||
if existing_game:
|
||
await ctx.send(f"❌ A game with name '{game_name}' is already in setup phase!")
|
||
return
|
||
|
||
# Create new game
|
||
await cursor.execute(
|
||
"INSERT INTO games (game_name, game_type, status, players) VALUES (%s, %s, 'setup', %s)",
|
||
(game_name, game_type.lower(), '[]')
|
||
)
|
||
|
||
# Get the created game data for notification
|
||
await cursor.execute(
|
||
"SELECT * FROM games WHERE game_name = %s AND game_type = %s ORDER BY id DESC LIMIT 1",
|
||
(game_name, game_type.lower())
|
||
)
|
||
game_data = await cursor.fetchone()
|
||
|
||
embed = discord.Embed(
|
||
title="🎮 Game Created",
|
||
description=f"HOI4 {game_type.title()} game '{game_name}' has been created!",
|
||
color=discord.Color.green()
|
||
)
|
||
embed.add_field(name="Game Name", value=game_name, inline=True)
|
||
embed.add_field(name="Type", value=game_type.title(), inline=True)
|
||
embed.add_field(name="Status", value="Setup Phase", inline=True)
|
||
embed.set_footer(text="Use /hoi4setup to add players to this game")
|
||
|
||
await ctx.send(embed=embed)
|
||
|
||
# Create notification in notification channel
|
||
if game_data:
|
||
await update_game_notification(dict(game_data), [])
|
||
|
||
except Exception as e:
|
||
await ctx.send(f"❌ Error creating game: {str(e)}")
|
||
|
||
@bot.hybrid_command(name='hoi4delete', description='Delete a game lobby')
|
||
async def hoi4delete(ctx, game_name: str):
|
||
"""Delete a game lobby that is in setup phase"""
|
||
try:
|
||
async with db_pool.acquire() as conn:
|
||
async with conn.cursor(aiomysql.DictCursor) as cursor:
|
||
# Get the game
|
||
await cursor.execute(
|
||
"SELECT * FROM games WHERE game_name = %s AND status = 'setup'",
|
||
(game_name,)
|
||
)
|
||
game = await cursor.fetchone()
|
||
|
||
if not game:
|
||
await ctx.send(f"❌ No active game found with name '{game_name}' in setup phase!")
|
||
return
|
||
|
||
# Delete the game
|
||
await cursor.execute(
|
||
"DELETE FROM games WHERE id = %s",
|
||
(game['id'],)
|
||
)
|
||
|
||
embed = discord.Embed(
|
||
title="🗑️ Game Deleted",
|
||
description=f"Game '{game_name}' has been deleted!",
|
||
color=discord.Color.red()
|
||
)
|
||
embed.add_field(name="Game Name", value=game_name, inline=True)
|
||
embed.add_field(name="Type", value=game['game_type'].title(), inline=True)
|
||
embed.set_footer(text=f"Deleted by {ctx.author}")
|
||
|
||
await ctx.send(embed=embed)
|
||
|
||
except Exception as e:
|
||
await ctx.send(f"❌ Error deleting game: {str(e)}")
|
||
|
||
@bot.hybrid_command(name='hoi4remove', description='Remove a player from an existing game')
|
||
async def hoi4remove(ctx, game_name: str, user: discord.Member):
|
||
"""Remove a player from an existing game"""
|
||
try:
|
||
async with db_pool.acquire() as conn:
|
||
async with conn.cursor(aiomysql.DictCursor) as cursor:
|
||
# Get the game
|
||
await cursor.execute(
|
||
"SELECT * FROM games WHERE game_name = %s AND status = 'setup'",
|
||
(game_name,)
|
||
)
|
||
game = await cursor.fetchone()
|
||
|
||
if not game:
|
||
await ctx.send(f"❌ No game found with name '{game_name}' in setup phase!")
|
||
return
|
||
|
||
# Parse existing players
|
||
players = json.loads(game['players']) if game['players'] else []
|
||
|
||
# Find and remove the player
|
||
player_found = False
|
||
new_players = []
|
||
for p in players:
|
||
if p['discord_id'] == user.id:
|
||
player_found = True
|
||
# Skip this player (don't add to new_players)
|
||
else:
|
||
new_players.append(p)
|
||
|
||
if not player_found:
|
||
await ctx.send(f"❌ {user.display_name} is not in this game!")
|
||
return
|
||
|
||
# Update game with new player list
|
||
await cursor.execute(
|
||
"UPDATE games SET players = %s WHERE id = %s",
|
||
(json.dumps(new_players), game['id'])
|
||
)
|
||
|
||
embed = discord.Embed(
|
||
title="✅ Player Removed",
|
||
description=f"{user.display_name} has been removed from '{game_name}'!",
|
||
color=discord.Color.orange()
|
||
)
|
||
embed.add_field(name="Player", value=user.display_name, inline=True)
|
||
embed.add_field(name="Game", value=game_name, inline=True)
|
||
embed.add_field(name="Players Left", value=len(new_players), inline=True)
|
||
embed.set_footer(text=f"Removed by {ctx.author}")
|
||
|
||
await ctx.send(embed=embed)
|
||
|
||
# Update notification in notification channel
|
||
await update_game_notification(dict(game), new_players)
|
||
|
||
except Exception as e:
|
||
await ctx.send(f"❌ Error removing player: {str(e)}")
|
||
|
||
@bot.hybrid_command(name='hoi4setup', description='Add a player to an existing game')
|
||
async def hoi4setup(ctx, game_name: str, user: discord.Member, team_name: str, t_level: int, country: Optional[str] = None, modifier: Optional[str] = None):
|
||
"""Add a player to an existing game. Use modifier='--force' to bypass MP ban."""
|
||
if t_level not in [1, 2, 3]:
|
||
await ctx.send("❌ T-Level must be 1, 2, or 3")
|
||
return
|
||
|
||
# Check for MP ban role (unless --force is used)
|
||
MP_BAN_ROLE_ID = 1432368177052127353
|
||
if modifier != "--force":
|
||
mp_ban_role = discord.utils.get(user.roles, id=MP_BAN_ROLE_ID)
|
||
if mp_ban_role:
|
||
embed = discord.Embed(
|
||
title="🚫 Player Banned",
|
||
description=f"{user.display_name} is currently banned from multiplayer games!",
|
||
color=discord.Color.red()
|
||
)
|
||
embed.add_field(name="Banned Player", value=user.mention, inline=True)
|
||
embed.add_field(name="Ban Role", value=mp_ban_role.name, inline=True)
|
||
embed.set_footer(text="Contact an administrator if this is an error")
|
||
await ctx.send(embed=embed)
|
||
return
|
||
|
||
try:
|
||
async with db_pool.acquire() as conn:
|
||
async with conn.cursor(aiomysql.DictCursor) as cursor:
|
||
# Get the game
|
||
await cursor.execute(
|
||
"SELECT * FROM games WHERE game_name = %s AND status = 'setup'",
|
||
(game_name,)
|
||
)
|
||
game = await cursor.fetchone()
|
||
|
||
if not game:
|
||
await ctx.send(f"❌ No game found with name '{game_name}' in setup phase!")
|
||
return
|
||
|
||
# Get or create player
|
||
player = await get_or_create_player(user.id, user.display_name)
|
||
|
||
# Parse existing players
|
||
players = json.loads(game['players']) if game['players'] else []
|
||
|
||
# Check if player already in game
|
||
for p in players:
|
||
if p['discord_id'] == user.id:
|
||
await ctx.send(f"❌ {user.display_name} is already in this game!")
|
||
return
|
||
|
||
# Add player to game
|
||
player_data = {
|
||
'discord_id': user.id,
|
||
'username': user.display_name,
|
||
'team_name': team_name,
|
||
't_level': t_level,
|
||
'current_elo': player[f"{game['game_type']}_elo"],
|
||
'country': country.strip() if country else None
|
||
}
|
||
players.append(player_data)
|
||
|
||
# Update game
|
||
await cursor.execute(
|
||
"UPDATE games SET players = %s WHERE id = %s",
|
||
(json.dumps(players), game['id'])
|
||
)
|
||
|
||
embed = discord.Embed(
|
||
title="✅ Player Added",
|
||
description=f"{user.display_name} has been added to '{game_name}'!",
|
||
color=discord.Color.green()
|
||
)
|
||
embed.add_field(name="Player", value=user.display_name, inline=True)
|
||
embed.add_field(name="Team", value=team_name, inline=True)
|
||
embed.add_field(name="T-Level", value=f"T{t_level}", inline=True)
|
||
embed.add_field(name="Current ELO", value=player[f"{game['game_type']}_elo"], inline=True)
|
||
if country:
|
||
flag = get_country_emoji(ctx, country)
|
||
label = get_country_label(country)
|
||
value = f"{flag} {label}".strip()
|
||
embed.add_field(name="Country", value=value, inline=True)
|
||
embed.add_field(name="Players in Game", value=len(players), inline=True)
|
||
|
||
await ctx.send(embed=embed)
|
||
|
||
# Update notification in notification channel
|
||
await update_game_notification(dict(game), players)
|
||
|
||
except Exception as e:
|
||
await ctx.send(f"❌ Error adding player: {str(e)}")
|
||
|
||
@bot.hybrid_command(name='hoi4end', description='End a game and calculate ELO changes')
|
||
async def hoi4end(ctx, game_name: str, winner_team: str):
|
||
"""End a game and calculate ELO changes. Use 'draw' for ties."""
|
||
try:
|
||
async with db_pool.acquire() as conn:
|
||
async with conn.cursor(aiomysql.DictCursor) as cursor:
|
||
# Get the game
|
||
await cursor.execute(
|
||
"SELECT * FROM games WHERE game_name = %s AND status = 'setup'",
|
||
(game_name,)
|
||
)
|
||
game = await cursor.fetchone()
|
||
|
||
if not game:
|
||
await ctx.send(f"❌ No active game found with name '{game_name}'!")
|
||
return
|
||
|
||
players = json.loads(game['players']) if game['players'] else []
|
||
|
||
if len(players) < 2:
|
||
await ctx.send("❌ Game needs at least 2 players to end!")
|
||
return
|
||
|
||
# Check if winner team exists or if it's a draw
|
||
teams = {p['team_name'] for p in players}
|
||
is_draw = winner_team.lower() == 'draw'
|
||
|
||
if not is_draw and winner_team not in teams:
|
||
available_teams = ', '.join(teams)
|
||
await ctx.send(f"❌ Team '{winner_team}' not found in game! Available teams: {available_teams}, draw")
|
||
return
|
||
|
||
# Calculate team averages
|
||
team_elos = {}
|
||
team_players = {}
|
||
|
||
for player in players:
|
||
team = player['team_name']
|
||
if team not in team_elos:
|
||
team_elos[team] = []
|
||
team_players[team] = []
|
||
|
||
team_elos[team].append(player['current_elo'])
|
||
team_players[team].append(player)
|
||
|
||
# Calculate average ELOs for each team
|
||
team_averages = {team: sum(elos) / len(elos) for team, elos in team_elos.items()}
|
||
|
||
elo_changes = []
|
||
|
||
# Calculate ELO changes for each player
|
||
for player in players:
|
||
team = player['team_name']
|
||
|
||
# Determine result for this player
|
||
if is_draw:
|
||
result = 'draw'
|
||
elif team == winner_team:
|
||
result = 'win'
|
||
else:
|
||
result = 'loss'
|
||
|
||
# Calculate opponent average (average of all other teams)
|
||
opponent_elos = []
|
||
for other_team, elos in team_elos.items():
|
||
if other_team != team:
|
||
opponent_elos.extend(elos)
|
||
|
||
opponent_avg = sum(opponent_elos) / len(opponent_elos) if opponent_elos else player['current_elo']
|
||
|
||
elo_change = calculate_elo_change(
|
||
player['current_elo'],
|
||
opponent_avg,
|
||
result,
|
||
player['t_level']
|
||
)
|
||
|
||
new_elo = max(0, player['current_elo'] + elo_change) # Prevent negative ELO
|
||
|
||
elo_changes.append({
|
||
'discord_id': player['discord_id'],
|
||
'username': player['username'],
|
||
'team_name': team,
|
||
't_level': player['t_level'],
|
||
'old_elo': player['current_elo'],
|
||
'new_elo': new_elo,
|
||
'elo_change': elo_change,
|
||
'result': result
|
||
})
|
||
|
||
# Update player ELOs and save game results
|
||
for change in elo_changes:
|
||
# Update player ELO
|
||
elo_field = f"{game['game_type']}_elo"
|
||
await cursor.execute(
|
||
f"UPDATE players SET {elo_field} = %s, updated_at = CURRENT_TIMESTAMP WHERE discord_id = %s",
|
||
(change['new_elo'], change['discord_id'])
|
||
)
|
||
|
||
# Save game result
|
||
won = change['result'] == 'win'
|
||
await cursor.execute(
|
||
"""INSERT INTO game_results
|
||
(game_id, discord_id, team_name, t_level, old_elo, new_elo, elo_change, won, result_type)
|
||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)""",
|
||
(game['id'], change['discord_id'], change['team_name'],
|
||
change['t_level'], change['old_elo'], change['new_elo'],
|
||
change['elo_change'], won, change['result'])
|
||
)
|
||
|
||
# Mark game as finished
|
||
final_result = "Draw" if is_draw else winner_team
|
||
await cursor.execute(
|
||
"UPDATE games SET status = 'finished', winner_team = %s, finished_at = CURRENT_TIMESTAMP WHERE id = %s",
|
||
(final_result, game['id'])
|
||
)
|
||
|
||
# After DB updates, try to sync Discord roles for affected players (only for this game's category)
|
||
try:
|
||
guild = ctx.guild
|
||
if guild:
|
||
for change in elo_changes:
|
||
member = guild.get_member(change['discord_id'])
|
||
if member is None:
|
||
try:
|
||
member = await guild.fetch_member(change['discord_id'])
|
||
except Exception:
|
||
member = None
|
||
if member:
|
||
await update_member_elo_role(
|
||
member,
|
||
change['new_elo'],
|
||
game['game_type'],
|
||
reason=f"HOI4 {game['game_type']} ELO updated in '{game_name}'"
|
||
)
|
||
except Exception as e:
|
||
logging.warning(f"Role sync after game end failed: {e}")
|
||
|
||
# Create result embed
|
||
if is_draw:
|
||
embed = discord.Embed(
|
||
title="🤝 Game Finished!",
|
||
description=f"Game '{game_name}' has ended!\n**Result: Draw**",
|
||
color=discord.Color.orange()
|
||
)
|
||
else:
|
||
embed = discord.Embed(
|
||
title="🏆 Game Finished!",
|
||
description=f"Game '{game_name}' has ended!\n**Winner: {winner_team}**",
|
||
color=discord.Color.gold()
|
||
)
|
||
|
||
# Group results by team
|
||
teams_results = {}
|
||
for change in elo_changes:
|
||
team = change['team_name']
|
||
if team not in teams_results:
|
||
teams_results[team] = []
|
||
teams_results[team].append(change)
|
||
|
||
for team, team_changes in teams_results.items():
|
||
team_text = ""
|
||
for change in team_changes:
|
||
emoji = "📈" if change['elo_change'] > 0 else "📉" if change['elo_change'] < 0 else "➡️"
|
||
result_emoji = ""
|
||
if change['result'] == 'win':
|
||
result_emoji = "🏆"
|
||
elif change['result'] == 'draw':
|
||
result_emoji = "🤝"
|
||
else:
|
||
result_emoji = "💔"
|
||
|
||
team_text += f"{change['username']}: {change['old_elo']} → {change['new_elo']} ({change['elo_change']:+d}) {emoji}\n"
|
||
|
||
# Team header with appropriate icon
|
||
if is_draw:
|
||
team_header = f"🤝 Team {team}"
|
||
elif team == winner_team:
|
||
team_header = f"🏆 Team {team}"
|
||
else:
|
||
team_header = f"💔 Team {team}"
|
||
|
||
embed.add_field(
|
||
name=team_header,
|
||
value=team_text,
|
||
inline=False
|
||
)
|
||
|
||
embed.set_footer(text=f"Game Type: {game['game_type'].title()}")
|
||
|
||
await ctx.send(embed=embed)
|
||
|
||
# Post final game result to notification channel
|
||
try:
|
||
channel = bot.get_channel(GAME_NOTIFICATION_CHANNEL_ID)
|
||
if channel:
|
||
final_embed = discord.Embed(
|
||
title=f"🏁 Game Finished: {game_name}",
|
||
description=f"**Result:** {'Draw' if is_draw else f'{winner_team} Victory'}",
|
||
color=discord.Color.gold() if not is_draw else discord.Color.orange()
|
||
)
|
||
|
||
final_embed.add_field(name="Game Type", value=game['game_type'].title(), inline=True)
|
||
final_embed.add_field(name="Players", value=len(players), inline=True)
|
||
final_embed.add_field(name="Teams", value=len(teams), inline=True)
|
||
|
||
# Add team results
|
||
for team, team_changes in teams_results.items():
|
||
avg_change = sum(c['elo_change'] for c in team_changes) / len(team_changes)
|
||
emoji = "🏆" if team == winner_team and not is_draw else "🤝" if is_draw else "💔"
|
||
final_embed.add_field(
|
||
name=f"{emoji} {team}",
|
||
value=f"{len(team_changes)} players\nAvg ELO change: {avg_change:+.1f}",
|
||
inline=True
|
||
)
|
||
|
||
await channel.send(embed=final_embed)
|
||
except Exception as e:
|
||
logging.warning(f"Failed to post final game notification: {e}")
|
||
|
||
except Exception as e:
|
||
await ctx.send(f"❌ Error ending game: {str(e)}")
|
||
|
||
@bot.hybrid_command(name='hoi4stats', description='Show your HOI4 ELO statistics')
|
||
async def hoi4stats(ctx, user: Optional[discord.Member] = None):
|
||
"""Show HOI4 ELO statistics for a user"""
|
||
target_user = user or ctx.author
|
||
|
||
try:
|
||
player = await get_or_create_player(target_user.id, target_user.display_name)
|
||
|
||
# Get player rankings
|
||
async with db_pool.acquire() as conn:
|
||
async with conn.cursor(aiomysql.DictCursor) as cursor:
|
||
# Get standard rank
|
||
await cursor.execute(
|
||
"SELECT COUNT(*) + 1 as player_rank FROM players WHERE standard_elo > %s",
|
||
(player['standard_elo'],)
|
||
)
|
||
standard_rank_result = await cursor.fetchone()
|
||
standard_rank = standard_rank_result['player_rank']
|
||
|
||
# Get competitive rank
|
||
await cursor.execute(
|
||
"SELECT COUNT(*) + 1 as player_rank FROM players WHERE competitive_elo > %s",
|
||
(player['competitive_elo'],)
|
||
)
|
||
competitive_rank_result = await cursor.fetchone()
|
||
competitive_rank = competitive_rank_result['player_rank']
|
||
|
||
# Get total player count
|
||
await cursor.execute("SELECT COUNT(*) as total FROM players")
|
||
total_players_result = await cursor.fetchone()
|
||
total_players = total_players_result['total']
|
||
|
||
# Get game statistics with proper draw detection
|
||
await cursor.execute(
|
||
"""SELECT
|
||
COUNT(*) as total_games,
|
||
SUM(CASE WHEN result_type = 'win' THEN 1 ELSE 0 END) as games_won,
|
||
SUM(CASE WHEN result_type = 'draw' THEN 1 ELSE 0 END) as games_drawn,
|
||
SUM(CASE WHEN result_type = 'loss' THEN 1 ELSE 0 END) as games_lost
|
||
FROM game_results WHERE discord_id = %s""",
|
||
(target_user.id,)
|
||
)
|
||
game_stats = await cursor.fetchone()
|
||
|
||
total_games = game_stats['total_games'] or 0
|
||
games_won = game_stats['games_won'] or 0
|
||
games_drawn = game_stats['games_drawn'] or 0
|
||
games_lost = game_stats['games_lost'] or 0
|
||
win_rate = (games_won / total_games * 100) if total_games > 0 else 0
|
||
|
||
# Create rank indicators with medals
|
||
def get_rank_display(rank, total):
|
||
if rank == 1:
|
||
return f"🥇 #{rank} of {total}"
|
||
elif rank == 2:
|
||
return f"🥈 #{rank} of {total}"
|
||
elif rank == 3:
|
||
return f"🥉 #{rank} of {total}"
|
||
else:
|
||
return f"#{rank} of {total}"
|
||
|
||
embed = discord.Embed(
|
||
title=f"📊 HOI4 ELO Stats - {target_user.display_name}",
|
||
color=discord.Color.blue()
|
||
)
|
||
|
||
# ELO and Rankings
|
||
embed.add_field(
|
||
name="🎯 Standard ELO",
|
||
value=f"**{player['standard_elo']}** ELO\n{get_rank_display(standard_rank, total_players)}",
|
||
inline=True
|
||
)
|
||
embed.add_field(
|
||
name="🏆 Competitive ELO",
|
||
value=f"**{player['competitive_elo']}** ELO\n{get_rank_display(competitive_rank, total_players)}",
|
||
inline=True
|
||
)
|
||
embed.add_field(
|
||
name="📅 Player Since",
|
||
value=player['created_at'].strftime("%d/%m/%Y"),
|
||
inline=True
|
||
)
|
||
|
||
# Game Statistics
|
||
if total_games > 0:
|
||
embed.add_field(
|
||
name="🎮 Games Played",
|
||
value=f"**{total_games}** total games",
|
||
inline=True
|
||
)
|
||
|
||
if games_drawn > 0:
|
||
embed.add_field(
|
||
name="📊 W/D/L Record",
|
||
value=f"**{games_won}W** / **{games_drawn}D** / **{games_lost}L**",
|
||
inline=True
|
||
)
|
||
else:
|
||
embed.add_field(
|
||
name="📈 Win/Loss",
|
||
value=f"**{games_won}W** / **{games_lost}L**",
|
||
inline=True
|
||
)
|
||
|
||
embed.add_field(
|
||
name="📊 Win Rate",
|
||
value=f"**{win_rate:.1f}%**",
|
||
inline=True
|
||
)
|
||
else:
|
||
embed.add_field(
|
||
name="🎮 Games Played",
|
||
value="No games played yet",
|
||
inline=False
|
||
)
|
||
|
||
if target_user.avatar:
|
||
embed.set_thumbnail(url=target_user.avatar.url)
|
||
|
||
# Add percentile information
|
||
standard_percentile = ((total_players - standard_rank) / total_players * 100) if total_players > 0 else 0
|
||
competitive_percentile = ((total_players - competitive_rank) / total_players * 100) if total_players > 0 else 0
|
||
|
||
embed.set_footer(
|
||
text=f"Standard: Top {100-standard_percentile:.1f}% | Competitive: Top {100-competitive_percentile:.1f}%"
|
||
)
|
||
|
||
await ctx.send(embed=embed)
|
||
|
||
# Try to keep user's roles in sync for both categories when stats are viewed
|
||
try:
|
||
if ctx.guild and isinstance(target_user, discord.Member):
|
||
await update_member_elo_role(
|
||
target_user, player['standard_elo'], 'standard', reason='HOI4 stats viewed: role sync')
|
||
await update_member_elo_role(
|
||
target_user, player['competitive_elo'], 'competitive', reason='HOI4 stats viewed: role sync')
|
||
except Exception as e:
|
||
logging.warning(f"Role sync on stats failed: {e}")
|
||
|
||
except Exception as e:
|
||
await ctx.send(f"❌ Error getting stats: {str(e)}")
|
||
|
||
@bot.hybrid_command(name='hoi4games', description='Show active games as team showcases')
|
||
async def hoi4games(ctx):
|
||
"""Show all active games with teams presented side-by-side like a showcase."""
|
||
try:
|
||
async with db_pool.acquire() as conn:
|
||
async with conn.cursor(aiomysql.DictCursor) as cursor:
|
||
await cursor.execute(
|
||
"SELECT * FROM games WHERE status = 'setup' ORDER BY created_at DESC"
|
||
)
|
||
games = await cursor.fetchall()
|
||
|
||
if not games:
|
||
await ctx.send("📝 No active games found. Use `/hoi4create` to create a new game!")
|
||
return
|
||
|
||
embed = discord.Embed(
|
||
title="🎮 Active HOI4 Games",
|
||
description="Team lineups are shown side-by-side. Use /hoi4setup to add players.",
|
||
color=discord.Color.green()
|
||
)
|
||
|
||
# Limit total embed fields to avoid Discord limit (25). Each game uses up to 3 fields.
|
||
max_games = max(1, 25 // 3)
|
||
games = games[:max_games]
|
||
|
||
for game in games:
|
||
players = json.loads(game['players']) if game['players'] else []
|
||
|
||
# Build team structures
|
||
teams: Dict[str, List[Dict]] = {}
|
||
for p in players:
|
||
teams.setdefault(p['team_name'], []).append(p)
|
||
|
||
# Compute average ELO per team
|
||
team_avgs = {t: (sum(m['current_elo'] for m in mlist) / len(mlist)) if mlist else 0 for t, mlist in teams.items()}
|
||
|
||
# Sort teams by name to keep stable order
|
||
ordered_teams = sorted(teams.items(), key=lambda x: x[0].lower())
|
||
|
||
# Helper to format a team's field
|
||
def build_team_field(team_name: str, members: List[Dict]) -> Dict[str, str]:
|
||
t_emoji = get_team_emoji(ctx, team_name)
|
||
avg = team_avgs.get(team_name, 0)
|
||
name = f"{t_emoji} {team_name} (avg {avg:.0f})"
|
||
# Each player line: T-level emoji, name, elo
|
||
lines = []
|
||
for m in sorted(members, key=lambda mm: (-mm.get('t_level', 2), mm['username'].lower())):
|
||
te = get_t_emoji(ctx, int(m.get('t_level', 2)))
|
||
ctry = m.get('country')
|
||
flag = get_country_emoji(ctx, ctry) if ctry else ""
|
||
label = get_country_label(ctry) if ctry else None
|
||
parts = [te]
|
||
if flag:
|
||
parts.append(flag)
|
||
if label:
|
||
parts.append(label)
|
||
parts.append(m['username'])
|
||
lines.append(f"{' '.join(parts)} ({m['current_elo']})")
|
||
value = "\n".join(lines) if lines else "No players yet"
|
||
# Discord field value max ~1024 chars; trim if necessary
|
||
if len(value) > 1000:
|
||
value = value[:997] + "..."
|
||
return {"name": name, "value": value}
|
||
|
||
# If no teams yet, show empty placeholder for this game
|
||
if not ordered_teams:
|
||
embed.add_field(
|
||
name=f"{game['game_name']} ({game['game_type'].title()})",
|
||
value="No players yet",
|
||
inline=False
|
||
)
|
||
continue
|
||
|
||
# For two teams, show A vs B with a center VS field; otherwise, list each team inline
|
||
team_fields = [build_team_field(tn, members) for tn, members in ordered_teams]
|
||
|
||
# Add a header field per game
|
||
embed.add_field(
|
||
name=f"🎯 {game['game_name']} ({game['game_type'].title()})",
|
||
value=f"Players: {len(players)} | Teams: {len(ordered_teams)}",
|
||
inline=False
|
||
)
|
||
|
||
if len(team_fields) == 1:
|
||
f1 = team_fields[0]
|
||
embed.add_field(name=f1["name"], value=f1["value"], inline=False)
|
||
elif len(team_fields) >= 2:
|
||
f1, f2 = team_fields[0], team_fields[1]
|
||
embed.add_field(name=f1["name"], value=f1["value"], inline=True)
|
||
embed.add_field(name="⚔️ VS ⚔️", value="\u200b", inline=True)
|
||
embed.add_field(name=f2["name"], value=f2["value"], inline=True)
|
||
# If more teams exist, add them below
|
||
for extra in team_fields[2:]:
|
||
embed.add_field(name=extra["name"], value=extra["value"], inline=False)
|
||
|
||
await ctx.send(embed=embed)
|
||
|
||
except Exception as e:
|
||
await ctx.send(f"❌ Error getting games: {str(e)}")
|
||
|
||
@bot.hybrid_command(name='hoi4history', description='Show past games with optional filters')
|
||
async def hoi4history(ctx, limit: Optional[int] = 10, player: Optional[discord.Member] = None, game_name: Optional[str] = None, game_type: Optional[str] = None):
|
||
"""Show past games with optional filters"""
|
||
if limit > 50:
|
||
limit = 50 # Prevent too many results
|
||
|
||
try:
|
||
async with db_pool.acquire() as conn:
|
||
async with conn.cursor(aiomysql.DictCursor) as cursor:
|
||
# Build dynamic query based on filters
|
||
query = "SELECT * FROM games WHERE status = 'finished'"
|
||
params = []
|
||
|
||
if game_name:
|
||
query += " AND game_name LIKE %s"
|
||
params.append(f"%{game_name}%")
|
||
|
||
if game_type and game_type.lower() in ['standard', 'competitive']:
|
||
query += " AND game_type = %s"
|
||
params.append(game_type.lower())
|
||
|
||
if player:
|
||
query += " AND JSON_CONTAINS(players, JSON_OBJECT('discord_id', %s))"
|
||
params.append(player.id)
|
||
|
||
query += " ORDER BY finished_at DESC LIMIT %s"
|
||
params.append(limit)
|
||
|
||
await cursor.execute(query, params)
|
||
games = await cursor.fetchall()
|
||
|
||
if not games:
|
||
await ctx.send("📝 No finished games found with the specified filters!")
|
||
return
|
||
|
||
embed = discord.Embed(
|
||
title="📚 HOI4 Game History",
|
||
color=discord.Color.blue()
|
||
)
|
||
|
||
if player:
|
||
embed.description = f"Filtered by player: {player.display_name}"
|
||
if game_name:
|
||
embed.description = f"Filtered by game name: {game_name}"
|
||
if game_type:
|
||
embed.description = f"Filtered by type: {game_type.title()}"
|
||
|
||
for game in games:
|
||
players = json.loads(game['players']) if game['players'] else []
|
||
|
||
# Count teams and players
|
||
teams = {}
|
||
for p in players:
|
||
team = p['team_name']
|
||
if team not in teams:
|
||
teams[team] = 0
|
||
teams[team] += 1
|
||
|
||
# Format date
|
||
finished_date = game['finished_at'].strftime("%d/%m/%Y %H:%M") if game['finished_at'] else "Unknown"
|
||
|
||
# Winner indicator
|
||
winner = game['winner_team'] if game['winner_team'] else "Unknown"
|
||
|
||
game_info = f"**Winner:** {winner}\n"
|
||
game_info += f"**Type:** {game['game_type'].title()}\n"
|
||
game_info += f"**Players:** {len(players)} | **Teams:** {len(teams)}\n"
|
||
game_info += f"**Finished:** {finished_date}"
|
||
|
||
embed.add_field(
|
||
name=f"🏆 {game['game_name']}",
|
||
value=game_info,
|
||
inline=False
|
||
)
|
||
|
||
embed.set_footer(text=f"Showing {len(games)} of last {limit} games")
|
||
await ctx.send(embed=embed)
|
||
|
||
except Exception as e:
|
||
await ctx.send(f"❌ Error getting game history: {str(e)}")
|
||
|
||
@bot.hybrid_command(name='hoi4leaderboard', description='Show ELO leaderboard')
|
||
async def hoi4leaderboard(ctx, game_type: Optional[str] = "standard", limit: Optional[int] = 10):
|
||
"""Show ELO leaderboard for standard or competitive"""
|
||
if game_type.lower() not in ['standard', 'competitive']:
|
||
await ctx.send("❌ Game type must be either 'standard' or 'competitive'")
|
||
return
|
||
|
||
if limit > 25:
|
||
limit = 25 # Prevent too many results
|
||
|
||
try:
|
||
async with db_pool.acquire() as conn:
|
||
async with conn.cursor(aiomysql.DictCursor) as cursor:
|
||
# Get top players by ELO
|
||
elo_field = f"{game_type.lower()}_elo"
|
||
await cursor.execute(
|
||
f"SELECT discord_id, username, {elo_field}, created_at FROM players ORDER BY {elo_field} DESC LIMIT %s",
|
||
(limit,)
|
||
)
|
||
players = await cursor.fetchall()
|
||
|
||
# Also get some statistics
|
||
await cursor.execute(
|
||
f"SELECT COUNT(*) as total_players, AVG({elo_field}) as avg_elo, MAX({elo_field}) as max_elo, MIN({elo_field}) as min_elo FROM players"
|
||
)
|
||
stats = await cursor.fetchone()
|
||
|
||
if not players:
|
||
await ctx.send("📝 No players found in the database!")
|
||
return
|
||
|
||
# Create leaderboard embed
|
||
embed = discord.Embed(
|
||
title=f"🏆 HOI4 {game_type.title()} Leaderboard",
|
||
color=discord.Color.gold()
|
||
)
|
||
|
||
# Add statistics
|
||
embed.description = f"**Total Players:** {stats['total_players']} | **Average ELO:** {stats['avg_elo']:.0f}"
|
||
|
||
leaderboard_text = ""
|
||
medals = ["🥇", "🥈", "🥉"]
|
||
|
||
for i, player in enumerate(players, 1):
|
||
# Get medal or rank number
|
||
if i <= 3:
|
||
rank_indicator = medals[i-1]
|
||
else:
|
||
rank_indicator = f"**{i}.**"
|
||
|
||
elo_value = player[elo_field]
|
||
username = player['username']
|
||
|
||
# Get additional player stats
|
||
async with db_pool.acquire() as conn:
|
||
async with conn.cursor(aiomysql.DictCursor) as cursor:
|
||
# Count games played with proper result detection
|
||
await cursor.execute(
|
||
"""SELECT
|
||
COUNT(*) as games_played,
|
||
SUM(CASE WHEN result_type = 'win' THEN 1 ELSE 0 END) as games_won,
|
||
SUM(CASE WHEN result_type = 'draw' THEN 1 ELSE 0 END) as games_drawn
|
||
FROM game_results WHERE discord_id = %s""",
|
||
(player['discord_id'],)
|
||
)
|
||
player_stats = await cursor.fetchone()
|
||
|
||
games_played = player_stats['games_played'] or 0
|
||
games_won = player_stats['games_won'] or 0
|
||
games_drawn = player_stats['games_drawn'] or 0
|
||
win_rate = (games_won / games_played * 100) if games_played > 0 else 0
|
||
|
||
leaderboard_text += f"{rank_indicator} **{username}** - {elo_value} ELO\n"
|
||
if games_drawn > 0:
|
||
leaderboard_text += f" 📊 {games_played} games | {games_won}W-{games_drawn}D-{games_played - games_won - games_drawn}L | {win_rate:.1f}% win rate\n\n"
|
||
else:
|
||
leaderboard_text += f" 📊 {games_played} games | {win_rate:.1f}% win rate\n\n"
|
||
|
||
embed.add_field(
|
||
name="Rankings",
|
||
value=leaderboard_text,
|
||
inline=False
|
||
)
|
||
|
||
embed.set_footer(text=f"ELO Range: {stats['min_elo']:.0f} - {stats['max_elo']:.0f}")
|
||
await ctx.send(embed=embed)
|
||
|
||
except Exception as e:
|
||
await ctx.send(f"❌ Error getting leaderboard: {str(e)}")
|
||
|
||
@bot.event
|
||
async def on_command_error(ctx, error):
|
||
"""Handles command errors"""
|
||
if isinstance(error, commands.CheckFailure):
|
||
await ctx.send("❌ You don't have permission to use this command!")
|
||
elif isinstance(error, commands.CommandNotFound):
|
||
# Silently ignore command not found errors
|
||
pass
|
||
elif isinstance(error, commands.MissingRequiredArgument):
|
||
await ctx.send(f"❌ Missing arguments! Command: `{ctx.command}`")
|
||
elif isinstance(error, commands.BadArgument):
|
||
await ctx.send("❌ Invalid argument!")
|
||
else:
|
||
# Log detailed error information
|
||
print(f"❌ Unknown error in command '{ctx.command}': {type(error).__name__}: {error}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
# Send detailed error to user if owner
|
||
if ctx.author.id == OWNER_ID:
|
||
await ctx.send(f"❌ **Error Details (Owner only):**\n```python\n{type(error).__name__}: {str(error)[:1800]}\n```")
|
||
else:
|
||
await ctx.send("❌ An unknown error occurred!")
|
||
|
||
async def main():
|
||
"""Main function to start the bot"""
|
||
# Load Discord token from environment variables
|
||
token = os.getenv('DISCORD_TOKEN')
|
||
|
||
if not token:
|
||
print("❌ DISCORD_TOKEN environment variable not found!")
|
||
print("Please set the DISCORD_TOKEN variable in Coolify or create a .env file")
|
||
return
|
||
|
||
if not DATABASE_URL:
|
||
print("❌ DATABASE_URL environment variable not found!")
|
||
print("Please set either DATABASE_URL or individual DB variables (DB_HOST, DB_NAME, DB_USER, DB_PASSWORD) in Coolify")
|
||
print(f"Current values - HOST: {DB_HOST}, NAME: {DB_NAME}, USER: {DB_USER}, PASSWORD: {'***' if DB_PASSWORD else 'None'}")
|
||
return
|
||
|
||
try:
|
||
print("🚀 Starting bot...")
|
||
await bot.start(token)
|
||
except discord.LoginFailure:
|
||
print("❌ Invalid Discord token!")
|
||
except Exception as e:
|
||
print(f"❌ Error starting bot: {e}")
|
||
finally:
|
||
if db_pool:
|
||
await db_pool.close()
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main()) |