Files
hoi4botdc/app.py
2025-10-26 01:35:24 +02:00

667 lines
25 KiB
Python

import discord
from discord.ext import commands
import os
import asyncio
import logging
from dotenv import load_dotenv
import aiomysql
import json
from datetime import datetime
from typing import Optional, List, Dict
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(level=logging.INFO)
# Database configuration
DATABASE_URL = os.getenv('DATABASE_URL')
DB_HOST = os.getenv('DB_HOST')
DB_PORT = os.getenv('DB_PORT', '5432') # Default to PostgreSQL port
DB_NAME = os.getenv('DB_NAME')
DB_USER = os.getenv('DB_USER')
DB_PASSWORD = os.getenv('DB_PASSWORD')
# Build DATABASE_URL from individual components if not provided
if not DATABASE_URL and all([DB_HOST, DB_NAME, DB_USER, DB_PASSWORD]):
DATABASE_URL = f"mysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
print(f"📝 Built DATABASE_URL from individual environment variables")
# Parse MySQL connection details from DATABASE_URL
def parse_database_url(url):
"""Parse MySQL connection URL into components"""
if not url:
return None
# Remove mysql:// prefix
if url.startswith('mysql://'):
url = url[8:]
# Split user:pass@host:port/db
if '@' in url:
auth, host_db = url.split('@', 1)
if ':' in auth:
user, password = auth.split(':', 1)
else:
user, password = auth, ''
else:
return None
if '/' in host_db:
host_port, database = host_db.split('/', 1)
else:
return None
if ':' in host_port:
host, port = host_port.split(':', 1)
try:
port = int(port)
except ValueError:
port = 3306
else:
host, port = host_port, 3306
return {
'host': host,
'port': port,
'user': user,
'password': password,
'db': database
}
# Global database connection pool
db_pool = None
# Bot configuration
intents = discord.Intents.default()
intents.message_content = True
intents.guilds = True
intents.members = True
bot = commands.Bot(command_prefix='!', intents=intents)
@bot.event
async def on_ready():
"""Event triggered when the bot is ready"""
print(f'{bot.user} is online and ready!')
print(f'Bot ID: {bot.user.id}')
print(f'Discord.py Version: {discord.__version__}')
print('------')
# Initialize database
await init_database()
# Set bot status
await bot.change_presence(
activity=discord.Game(name="Hearts of Iron IV ELO"),
status=discord.Status.online
)
# Sync hybrid commands on startup
try:
synced = await bot.tree.sync()
print(f'Synced {len(synced)} hybrid commands')
except Exception as e:
print(f'Failed to sync commands: {e}')
@bot.event
async def on_guild_join(guild):
"""Event triggered when the bot joins a server"""
print(f'Bot joined server "{guild.name}" (ID: {guild.id})')
@bot.event
async def on_guild_remove(guild):
"""Event triggered when the bot leaves a server"""
print(f'Bot left server "{guild.name}" (ID: {guild.id})')
# Owner Configuration
OWNER_ID = 253922739709018114
# ELO Configuration
STARTING_ELO = 800
K_FACTOR = 32
T_LEVEL_MULTIPLIERS = {
1: 0.8, # T1 countries get less points
2: 1.0, # T2 countries get normal points
3: 1.2 # T3 countries get more points
}
# Database Functions
async def init_database():
"""Initialize database connection and create tables"""
global db_pool
try:
# Parse DATABASE_URL for MySQL connection
db_config = parse_database_url(DATABASE_URL)
if not db_config:
raise ValueError("Invalid DATABASE_URL format")
print(f"🔌 Connecting to MySQL: {db_config['host']}:{db_config['port']}/{db_config['db']}")
# Create MySQL connection pool
db_pool = await aiomysql.create_pool(
host=db_config['host'],
port=db_config['port'],
user=db_config['user'],
password=db_config['password'],
db=db_config['db'],
charset='utf8mb4',
autocommit=True,
maxsize=10
)
async with db_pool.acquire() as conn:
async with conn.cursor() as cursor:
# Create players table (MySQL syntax)
await cursor.execute('''
CREATE TABLE IF NOT EXISTS players (
id INT AUTO_INCREMENT PRIMARY KEY,
discord_id BIGINT UNIQUE NOT NULL,
username VARCHAR(255) NOT NULL,
standard_elo INT DEFAULT 800,
competitive_elo INT DEFAULT 800,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
)
''')
# Create games table (MySQL syntax)
await cursor.execute('''
CREATE TABLE IF NOT EXISTS games (
id INT AUTO_INCREMENT PRIMARY KEY,
game_name VARCHAR(255) NOT NULL,
game_type VARCHAR(50) NOT NULL,
status VARCHAR(50) DEFAULT 'setup',
players JSON NOT NULL,
winner_team VARCHAR(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
finished_at TIMESTAMP NULL
)
''')
# Create game_results table (MySQL syntax)
await cursor.execute('''
CREATE TABLE IF NOT EXISTS game_results (
id INT AUTO_INCREMENT PRIMARY KEY,
game_id INT,
discord_id BIGINT NOT NULL,
team_name VARCHAR(255) NOT NULL,
t_level INT NOT NULL,
old_elo INT NOT NULL,
new_elo INT NOT NULL,
elo_change INT NOT NULL,
won BOOLEAN NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (game_id) REFERENCES games(id)
)
''')
print("✅ Database initialized successfully")
except Exception as e:
print(f"❌ Database initialization failed: {e}")
import traceback
traceback.print_exc()
async def get_or_create_player(discord_id: int, username: str) -> Dict:
"""Get or create a player in the database"""
async with db_pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
# Try to get existing player
await cursor.execute(
"SELECT * FROM players WHERE discord_id = %s", (discord_id,)
)
player = await cursor.fetchone()
if not player:
# Create new player
await cursor.execute(
"INSERT INTO players (discord_id, username) VALUES (%s, %s)",
(discord_id, username)
)
await cursor.execute(
"SELECT * FROM players WHERE discord_id = %s", (discord_id,)
)
player = await cursor.fetchone()
else:
# Update username if changed
await cursor.execute(
"UPDATE players SET username = %s, updated_at = CURRENT_TIMESTAMP WHERE discord_id = %s",
(username, discord_id)
)
return dict(player)
def calculate_elo_change(player_elo: int, opponent_avg_elo: int, won: bool, t_level: int) -> int:
"""Calculate ELO change using standard ELO formula with T-level multiplier"""
expected_score = 1 / (1 + 10 ** ((opponent_avg_elo - player_elo) / 400))
actual_score = 1 if won else 0
base_change = K_FACTOR * (actual_score - expected_score)
t_multiplier = T_LEVEL_MULTIPLIERS.get(t_level, 1.0)
return round(base_change * t_multiplier)
# Owner only decorator
def is_owner():
def predicate(ctx):
return ctx.author.id == OWNER_ID
return commands.check(predicate)
# Owner Commands
@bot.hybrid_command(name='reload', description='Reloads the bot and syncs slash commands (Owner only)')
@is_owner()
async def reload_bot(ctx):
"""Reloads the bot and syncs slash commands (Owner only)"""
try:
print(f"🔄 Reload command started by {ctx.author} (ID: {ctx.author.id})")
# Send initial message
embed = discord.Embed(
title="🔄 Bot Reload",
description="Reloading bot and syncing commands...",
color=discord.Color.yellow()
)
message = await ctx.send(embed=embed)
print("📤 Initial reload message sent")
# Sync slash commands
print("🔄 Starting command sync...")
synced = await bot.tree.sync()
print(f"✅ Synced {len(synced)} commands successfully")
# Update embed with success
embed = discord.Embed(
title="✅ Bot Reloaded Successfully",
description=f"Bot has been reloaded!\nSynced {len(synced)} slash commands.",
color=discord.Color.green()
)
embed.add_field(name="Servers", value=len(bot.guilds), inline=True)
embed.add_field(name="Latency", value=f"{round(bot.latency * 1000)}ms", inline=True)
embed.set_footer(text=f"Reloaded by {ctx.author}", icon_url=ctx.author.avatar.url if ctx.author.avatar else None)
await message.edit(embed=embed)
print("✅ Reload completed successfully")
except Exception as e:
print(f"❌ Reload failed with error: {type(e).__name__}: {e}")
import traceback
traceback.print_exc()
embed = discord.Embed(
title="❌ Reload Failed",
description=f"**Error Type:** {type(e).__name__}\n**Error:** {str(e)[:1500]}",
color=discord.Color.red()
)
await ctx.send(embed=embed)
# HOI4 ELO Commands
@bot.hybrid_command(name='hoi4create', description='Create a new HOI4 game')
async def hoi4create(ctx, game_type: str, game_name: str):
"""Create a new HOI4 game"""
if game_type.lower() not in ['standard', 'competitive']:
await ctx.send("❌ Game type must be either 'standard' or 'competitive'")
return
try:
async with db_pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
# Check if game name already exists and is active
await cursor.execute(
"SELECT * FROM games WHERE game_name = %s AND status = 'setup'",
(game_name,)
)
existing_game = await cursor.fetchone()
if existing_game:
await ctx.send(f"❌ A game with name '{game_name}' is already in setup phase!")
return
# Create new game
await cursor.execute(
"INSERT INTO games (game_name, game_type, status, players) VALUES (%s, %s, 'setup', %s)",
(game_name, game_type.lower(), '[]')
)
embed = discord.Embed(
title="🎮 Game Created",
description=f"HOI4 {game_type.title()} game '{game_name}' has been created!",
color=discord.Color.green()
)
embed.add_field(name="Game Name", value=game_name, inline=True)
embed.add_field(name="Type", value=game_type.title(), inline=True)
embed.add_field(name="Status", value="Setup Phase", inline=True)
embed.set_footer(text="Use /hoi4setup to add players to this game")
await ctx.send(embed=embed)
except Exception as e:
await ctx.send(f"❌ Error creating game: {str(e)}")
@bot.hybrid_command(name='hoi4setup', description='Add a player to an existing game')
async def hoi4setup(ctx, game_name: str, user: discord.Member, team_name: str, t_level: int):
"""Add a player to an existing game"""
if t_level not in [1, 2, 3]:
await ctx.send("❌ T-Level must be 1, 2, or 3")
return
try:
async with db_pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
# Get the game
await cursor.execute(
"SELECT * FROM games WHERE game_name = %s AND status = 'setup'",
(game_name,)
)
game = await cursor.fetchone()
if not game:
await ctx.send(f"❌ No game found with name '{game_name}' in setup phase!")
return
# Get or create player
player = await get_or_create_player(user.id, user.display_name)
# Parse existing players
players = json.loads(game['players']) if game['players'] else []
# Check if player already in game
for p in players:
if p['discord_id'] == user.id:
await ctx.send(f"{user.display_name} is already in this game!")
return
# Add player to game
player_data = {
'discord_id': user.id,
'username': user.display_name,
'team_name': team_name,
't_level': t_level,
'current_elo': player[f"{game['game_type']}_elo"]
}
players.append(player_data)
# Update game
await cursor.execute(
"UPDATE games SET players = %s WHERE id = %s",
(json.dumps(players), game['id'])
)
embed = discord.Embed(
title="✅ Player Added",
description=f"{user.display_name} has been added to '{game_name}'!",
color=discord.Color.green()
)
embed.add_field(name="Player", value=user.display_name, inline=True)
embed.add_field(name="Team", value=team_name, inline=True)
embed.add_field(name="T-Level", value=f"T{t_level}", inline=True)
embed.add_field(name="Current ELO", value=player[f"{game['game_type']}_elo"], inline=True)
embed.add_field(name="Players in Game", value=len(players), inline=True)
await ctx.send(embed=embed)
except Exception as e:
await ctx.send(f"❌ Error adding player: {str(e)}")
@bot.hybrid_command(name='hoi4end', description='End a game and calculate ELO changes')
async def hoi4end(ctx, game_name: str, winner_team: str):
"""End a game and calculate ELO changes"""
try:
async with db_pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
# Get the game
await cursor.execute(
"SELECT * FROM games WHERE game_name = %s AND status = 'setup'",
(game_name,)
)
game = await cursor.fetchone()
if not game:
await ctx.send(f"❌ No active game found with name '{game_name}'!")
return
players = json.loads(game['players']) if game['players'] else []
if len(players) < 2:
await ctx.send("❌ Game needs at least 2 players to end!")
return
# Check if winner team exists
teams = {p['team_name'] for p in players}
if winner_team not in teams:
await ctx.send(f"❌ Team '{winner_team}' not found in game! Available teams: {', '.join(teams)}")
return
# Calculate team averages
team_elos = {}
team_players = {}
for player in players:
team = player['team_name']
if team not in team_elos:
team_elos[team] = []
team_players[team] = []
team_elos[team].append(player['current_elo'])
team_players[team].append(player)
# Calculate average ELOs for each team
team_averages = {team: sum(elos) / len(elos) for team, elos in team_elos.items()}
elo_changes = []
# Calculate ELO changes for each player
for player in players:
team = player['team_name']
won = team == winner_team
# Calculate opponent average (average of all other teams)
opponent_elos = []
for other_team, elos in team_elos.items():
if other_team != team:
opponent_elos.extend(elos)
opponent_avg = sum(opponent_elos) / len(opponent_elos) if opponent_elos else player['current_elo']
elo_change = calculate_elo_change(
player['current_elo'],
opponent_avg,
won,
player['t_level']
)
new_elo = max(0, player['current_elo'] + elo_change) # Prevent negative ELO
elo_changes.append({
'discord_id': player['discord_id'],
'username': player['username'],
'team_name': team,
't_level': player['t_level'],
'old_elo': player['current_elo'],
'new_elo': new_elo,
'elo_change': elo_change,
'won': won
})
# Update player ELOs and save game results
for change in elo_changes:
# Update player ELO
elo_field = f"{game['game_type']}_elo"
await cursor.execute(
f"UPDATE players SET {elo_field} = %s, updated_at = CURRENT_TIMESTAMP WHERE discord_id = %s",
(change['new_elo'], change['discord_id'])
)
# Save game result
await cursor.execute(
"""INSERT INTO game_results
(game_id, discord_id, team_name, t_level, old_elo, new_elo, elo_change, won)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)""",
(game['id'], change['discord_id'], change['team_name'],
change['t_level'], change['old_elo'], change['new_elo'],
change['elo_change'], change['won'])
)
# Mark game as finished
await cursor.execute(
"UPDATE games SET status = 'finished', winner_team = %s, finished_at = CURRENT_TIMESTAMP WHERE id = %s",
(winner_team, game['id'])
)
# Create result embed
embed = discord.Embed(
title="🏆 Game Finished!",
description=f"Game '{game_name}' has ended!\n**Winner: {winner_team}**",
color=discord.Color.gold()
)
# Group results by team
teams_results = {}
for change in elo_changes:
team = change['team_name']
if team not in teams_results:
teams_results[team] = []
teams_results[team].append(change)
for team, team_changes in teams_results.items():
team_text = ""
for change in team_changes:
emoji = "📈" if change['elo_change'] > 0 else "📉" if change['elo_change'] < 0 else "➡️"
team_text += f"{change['username']}: {change['old_elo']}{change['new_elo']} ({change['elo_change']:+d}) {emoji}\n"
embed.add_field(
name=f"{'🏆 ' if team == winner_team else ''}Team {team}",
value=team_text,
inline=False
)
embed.set_footer(text=f"Game Type: {game['game_type'].title()}")
await ctx.send(embed=embed)
except Exception as e:
await ctx.send(f"❌ Error ending game: {str(e)}")
@bot.hybrid_command(name='hoi4stats', description='Show your HOI4 ELO statistics')
async def hoi4stats(ctx, user: Optional[discord.Member] = None):
"""Show HOI4 ELO statistics for a user"""
target_user = user or ctx.author
try:
player = await get_or_create_player(target_user.id, target_user.display_name)
embed = discord.Embed(
title=f"📊 HOI4 ELO Stats - {target_user.display_name}",
color=discord.Color.blue()
)
embed.add_field(name="Standard ELO", value=f"🎯 {player['standard_elo']}", inline=True)
embed.add_field(name="Competitive ELO", value=f"🏆 {player['competitive_elo']}", inline=True)
embed.add_field(name="Player Since", value=player['created_at'].strftime("%m/%d/%Y"), inline=True)
if target_user.avatar:
embed.set_thumbnail(url=target_user.avatar.url)
await ctx.send(embed=embed)
except Exception as e:
await ctx.send(f"❌ Error getting stats: {str(e)}")
@bot.hybrid_command(name='hoi4games', description='Show active games')
async def hoi4games(ctx):
"""Show all active games"""
try:
async with db_pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
await cursor.execute(
"SELECT * FROM games WHERE status = 'setup' ORDER BY created_at DESC"
)
games = await cursor.fetchall()
if not games:
await ctx.send("📝 No active games found. Use `/hoi4create` to create a new game!")
return
embed = discord.Embed(
title="🎮 Active HOI4 Games",
color=discord.Color.green()
)
for game in games:
players = json.loads(game['players']) if game['players'] else []
player_count = len(players)
teams = {}
for player in players:
team = player['team_name']
if team not in teams:
teams[team] = 0
teams[team] += 1
team_info = ", ".join([f"{team} ({count})" for team, count in teams.items()]) if teams else "No players yet"
embed.add_field(
name=f"{game['game_name']} ({game['game_type'].title()})",
value=f"Players: {player_count}\nTeams: {team_info}",
inline=False
)
await ctx.send(embed=embed)
except Exception as e:
await ctx.send(f"❌ Error getting games: {str(e)}")
@bot.event
async def on_command_error(ctx, error):
"""Handles command errors"""
if isinstance(error, commands.CheckFailure):
await ctx.send("❌ You don't have permission to use this command!")
elif isinstance(error, commands.CommandNotFound):
# Silently ignore command not found errors
pass
elif isinstance(error, commands.MissingRequiredArgument):
await ctx.send(f"❌ Missing arguments! Command: `{ctx.command}`")
elif isinstance(error, commands.BadArgument):
await ctx.send("❌ Invalid argument!")
else:
# Log detailed error information
print(f"❌ Unknown error in command '{ctx.command}': {type(error).__name__}: {error}")
import traceback
traceback.print_exc()
# Send detailed error to user if owner
if ctx.author.id == OWNER_ID:
await ctx.send(f"❌ **Error Details (Owner only):**\n```python\n{type(error).__name__}: {str(error)[:1800]}\n```")
else:
await ctx.send("❌ An unknown error occurred!")
async def main():
"""Main function to start the bot"""
# Load Discord token from environment variables
token = os.getenv('DISCORD_TOKEN')
if not token:
print("❌ DISCORD_TOKEN environment variable not found!")
print("Please set the DISCORD_TOKEN variable in Coolify or create a .env file")
return
if not DATABASE_URL:
print("❌ DATABASE_URL environment variable not found!")
print("Please set either DATABASE_URL or individual DB variables (DB_HOST, DB_NAME, DB_USER, DB_PASSWORD) in Coolify")
print(f"Current values - HOST: {DB_HOST}, NAME: {DB_NAME}, USER: {DB_USER}, PASSWORD: {'***' if DB_PASSWORD else 'None'}")
return
try:
print("🚀 Starting bot...")
await bot.start(token)
except discord.LoginFailure:
print("❌ Invalid Discord token!")
except Exception as e:
print(f"❌ Error starting bot: {e}")
finally:
if db_pool:
await db_pool.close()
if __name__ == "__main__":
asyncio.run(main())