Added admin panel.

This commit is contained in:
2026-01-11 18:50:26 -06:00
parent e50b2f31d3
commit a97912188e
109 changed files with 6651 additions and 249 deletions

View File

@ -5,6 +5,7 @@ from app.models.bet import Bet, BetProposal, BetCategory, BetStatus, BetVisibili
from app.models.sport_event import SportEvent, SportType, EventStatus
from app.models.spread_bet import SpreadBet, SpreadBetStatus, TeamSide
from app.models.admin_settings import AdminSettings
from app.models.admin_audit_log import AdminAuditLog
from app.models.match_comment import MatchComment
from app.models.event_comment import EventComment
from app.models.gamification import (
@ -40,6 +41,7 @@ __all__ = [
"SpreadBetStatus",
"TeamSide",
"AdminSettings",
"AdminAuditLog",
"MatchComment",
"EventComment",
# Gamification

View File

@ -0,0 +1,54 @@
from sqlalchemy import String, DateTime, Integer, Text, ForeignKey
from sqlalchemy.orm import Mapped, mapped_column, relationship
from datetime import datetime
from app.database import Base
class AdminAuditLog(Base):
"""
Audit log for tracking all admin actions on the platform.
Every admin action should be logged for accountability and debugging.
"""
__tablename__ = "admin_audit_logs"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
# Who performed the action
admin_id: Mapped[int] = mapped_column(Integer, ForeignKey("users.id"), nullable=False)
admin_username: Mapped[str] = mapped_column(String(50), nullable=False)
# What action was performed
action: Mapped[str] = mapped_column(String(50), nullable=False, index=True)
# Action codes:
# - DATA_WIPE: Database wipe executed
# - DATA_SEED: Database seeded with test data
# - SIMULATION_START: Activity simulation started
# - SIMULATION_STOP: Activity simulation stopped
# - USER_STATUS_CHANGE: User enabled/disabled
# - USER_BALANCE_ADJUST: User balance adjusted
# - USER_ADMIN_GRANT: Admin privileges granted
# - USER_ADMIN_REVOKE: Admin privileges revoked
# - USER_UPDATE: User details updated
# - SETTINGS_UPDATE: Platform settings changed
# - EVENT_CREATE: Sport event created
# - EVENT_UPDATE: Sport event updated
# - EVENT_DELETE: Sport event deleted
# Target of the action (if applicable)
target_type: Mapped[str | None] = mapped_column(String(50), nullable=True) # e.g., "user", "event", "bet"
target_id: Mapped[int | None] = mapped_column(Integer, nullable=True)
# Description of the action
description: Mapped[str] = mapped_column(String(500), nullable=False)
# Additional details as JSON string
details: Mapped[str | None] = mapped_column(Text, nullable=True)
# IP address of the admin (for security)
ip_address: Mapped[str | None] = mapped_column(String(45), nullable=True)
# Timestamp
created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow, index=True)
# Relationship to admin user
admin: Mapped["User"] = relationship("User", foreign_keys=[admin_id])

View File

@ -15,6 +15,8 @@ class TransactionType(enum.Enum):
BET_CANCELLED = "bet_cancelled"
ESCROW_LOCK = "escrow_lock"
ESCROW_RELEASE = "escrow_release"
ADMIN_CREDIT = "admin_credit"
ADMIN_DEBIT = "admin_debit"
class TransactionStatus(enum.Enum):

View File

@ -1,19 +1,54 @@
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi import APIRouter, Depends, HTTPException, status, Request, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update
from typing import List
from sqlalchemy import select, func
from sqlalchemy.orm import selectinload
from typing import Optional
from datetime import datetime
from decimal import Decimal
from app.database import get_db
from app.models import User, SportEvent, SpreadBet, AdminSettings, EventStatus
from app.models import (
User, UserStatus, SportEvent, SpreadBet, AdminSettings, EventStatus,
Wallet, Transaction, TransactionType, TransactionStatus, AdminAuditLog, Bet
)
from app.schemas.sport_event import SportEventCreate, SportEventUpdate, SportEvent as SportEventSchema
from app.schemas.admin import (
AuditLogResponse, AuditLogListResponse,
WipePreviewResponse, WipeRequest, WipeResponse,
SeedRequest, SeedResponse,
SimulationConfig, SimulationStatusResponse, SimulationStartRequest, SimulationStartResponse, SimulationStopResponse,
AdminUserListItem, AdminUserListResponse, AdminUserDetailResponse,
AdminUserUpdateRequest, AdminUserStatusRequest,
AdminBalanceAdjustRequest, AdminBalanceAdjustResponse,
AdminDashboardStats,
)
from app.routers.auth import get_current_user
from app.services.audit_service import (
AuditService, log_event_create, log_event_update, log_event_delete,
log_user_status_change, log_user_balance_adjust, log_user_admin_change,
log_user_update, log_settings_update, log_simulation_start, log_simulation_stop
)
from app.services.wiper_service import WiperService
from app.services.seeder_service import SeederService
from app.services.simulation_service import simulation_manager
router = APIRouter(prefix="/api/v1/admin", tags=["admin"])
# Dependency to check if user is admin
# ============================================================
# Helper to get client IP
# ============================================================
def get_client_ip(request: Request) -> Optional[str]:
"""Extract client IP from request."""
forwarded = request.headers.get("X-Forwarded-For")
if forwarded:
return forwarded.split(",")[0].strip()
return request.client.host if request.client else None
# ============================================================
# Admin Dependency
# ============================================================
async def get_admin_user(current_user: User = Depends(get_current_user)) -> User:
if not current_user.is_admin:
raise HTTPException(
@ -23,6 +58,506 @@ async def get_admin_user(current_user: User = Depends(get_current_user)) -> User
return current_user
# ============================================================
# Dashboard Stats
# ============================================================
@router.get("/dashboard", response_model=AdminDashboardStats)
async def get_dashboard_stats(
db: AsyncSession = Depends(get_db),
admin: User = Depends(get_admin_user)
):
"""Get dashboard statistics for admin panel."""
# User counts
total_users = (await db.execute(select(func.count(User.id)))).scalar() or 0
active_users = (await db.execute(
select(func.count(User.id)).where(User.status == UserStatus.ACTIVE)
)).scalar() or 0
suspended_users = (await db.execute(
select(func.count(User.id)).where(User.status == UserStatus.SUSPENDED)
)).scalar() or 0
admin_users = (await db.execute(
select(func.count(User.id)).where(User.is_admin == True)
)).scalar() or 0
# Event counts
total_events = (await db.execute(select(func.count(SportEvent.id)))).scalar() or 0
upcoming_events = (await db.execute(
select(func.count(SportEvent.id)).where(SportEvent.status == EventStatus.UPCOMING)
)).scalar() or 0
live_events = (await db.execute(
select(func.count(SportEvent.id)).where(SportEvent.status == EventStatus.LIVE)
)).scalar() or 0
# Bet counts
total_bets = (await db.execute(select(func.count(SpreadBet.id)))).scalar() or 0
open_bets = (await db.execute(
select(func.count(SpreadBet.id)).where(SpreadBet.status == "open")
)).scalar() or 0
matched_bets = (await db.execute(
select(func.count(SpreadBet.id)).where(SpreadBet.status == "matched")
)).scalar() or 0
# Volume calculations
total_volume_result = await db.execute(
select(func.sum(SpreadBet.stake_amount))
)
total_volume = total_volume_result.scalar() or Decimal("0.00")
escrow_result = await db.execute(select(func.sum(Wallet.escrow)))
escrow_locked = escrow_result.scalar() or Decimal("0.00")
return AdminDashboardStats(
total_users=total_users,
active_users=active_users,
suspended_users=suspended_users,
admin_users=admin_users,
total_events=total_events,
upcoming_events=upcoming_events,
live_events=live_events,
total_bets=total_bets,
open_bets=open_bets,
matched_bets=matched_bets,
total_volume=total_volume,
escrow_locked=escrow_locked,
simulation_running=simulation_manager.is_running,
)
# ============================================================
# Audit Logs
# ============================================================
@router.get("/audit-logs", response_model=AuditLogListResponse)
async def get_audit_logs(
page: int = Query(1, ge=1),
page_size: int = Query(50, ge=1, le=100),
action: Optional[str] = None,
admin_id: Optional[int] = None,
target_type: Optional[str] = None,
db: AsyncSession = Depends(get_db),
admin: User = Depends(get_admin_user)
):
"""Get paginated audit logs with optional filters."""
logs, total = await AuditService.get_logs(
db=db,
page=page,
page_size=page_size,
action_filter=action,
admin_id_filter=admin_id,
target_type_filter=target_type,
)
return AuditLogListResponse(
logs=[AuditLogResponse.model_validate(log) for log in logs],
total=total,
page=page,
page_size=page_size,
)
# ============================================================
# Data Wiper
# ============================================================
@router.get("/data/wipe/preview", response_model=WipePreviewResponse)
async def preview_wipe(
db: AsyncSession = Depends(get_db),
admin: User = Depends(get_admin_user)
):
"""Preview what will be deleted in a wipe operation."""
return await WiperService.get_preview(db)
@router.post("/data/wipe", response_model=WipeResponse)
async def execute_wipe(
request: WipeRequest,
db: AsyncSession = Depends(get_db),
admin: User = Depends(get_admin_user),
req: Request = None,
):
"""Execute a database wipe. Requires confirmation phrase."""
try:
ip = get_client_ip(req) if req else None
return await WiperService.execute_wipe(db, admin, request, ip)
except ValueError as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
# ============================================================
# Data Seeder
# ============================================================
@router.post("/data/seed", response_model=SeedResponse)
async def seed_database(
request: SeedRequest,
db: AsyncSession = Depends(get_db),
admin: User = Depends(get_admin_user),
req: Request = None,
):
"""Seed the database with test data."""
ip = get_client_ip(req) if req else None
return await SeederService.seed(db, admin, request, ip)
# ============================================================
# Simulation Control
# ============================================================
@router.get("/simulation/status", response_model=SimulationStatusResponse)
async def get_simulation_status(
admin: User = Depends(get_admin_user)
):
"""Get current simulation status."""
return simulation_manager.get_status()
@router.post("/simulation/start", response_model=SimulationStartResponse)
async def start_simulation(
request: SimulationStartRequest = None,
db: AsyncSession = Depends(get_db),
admin: User = Depends(get_admin_user),
req: Request = None,
):
"""Start the activity simulation."""
config = request.config if request else None
if simulation_manager.is_running:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Simulation is already running"
)
success = await simulation_manager.start(admin.username, config)
if not success:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to start simulation"
)
# Log the action
ip = get_client_ip(req) if req else None
await log_simulation_start(
db=db,
admin=admin,
config=config.model_dump() if config else {},
ip_address=ip,
)
await db.commit()
return SimulationStartResponse(
success=True,
message="Simulation started successfully",
status=simulation_manager.get_status(),
)
@router.post("/simulation/stop", response_model=SimulationStopResponse)
async def stop_simulation(
db: AsyncSession = Depends(get_db),
admin: User = Depends(get_admin_user),
req: Request = None,
):
"""Stop the activity simulation."""
if not simulation_manager.is_running:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Simulation is not running"
)
iterations, duration = await simulation_manager.stop()
# Log the action
ip = get_client_ip(req) if req else None
await log_simulation_stop(
db=db,
admin=admin,
iterations=iterations,
duration_seconds=duration,
ip_address=ip,
)
await db.commit()
return SimulationStopResponse(
success=True,
message="Simulation stopped successfully",
total_iterations=iterations,
ran_for_seconds=duration,
)
# ============================================================
# User Management
# ============================================================
@router.get("/users", response_model=AdminUserListResponse)
async def list_users(
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
search: Optional[str] = None,
status_filter: Optional[str] = None,
is_admin: Optional[bool] = None,
db: AsyncSession = Depends(get_db),
admin: User = Depends(get_admin_user)
):
"""Get paginated list of users."""
query = select(User).options(selectinload(User.wallet))
# Apply filters
if search:
search_term = f"%{search}%"
query = query.where(
(User.username.ilike(search_term)) |
(User.email.ilike(search_term)) |
(User.display_name.ilike(search_term))
)
if status_filter:
query = query.where(User.status == UserStatus(status_filter))
if is_admin is not None:
query = query.where(User.is_admin == is_admin)
# Get total count
count_query = select(func.count()).select_from(query.subquery())
total = (await db.execute(count_query)).scalar() or 0
# Apply pagination
query = query.order_by(User.created_at.desc())
query = query.offset((page - 1) * page_size).limit(page_size)
result = await db.execute(query)
users = result.scalars().all()
user_items = []
for user in users:
wallet = user.wallet
user_items.append(AdminUserListItem(
id=user.id,
email=user.email,
username=user.username,
display_name=user.display_name,
is_admin=user.is_admin,
status=user.status.value,
balance=wallet.balance if wallet else Decimal("0.00"),
escrow=wallet.escrow if wallet else Decimal("0.00"),
total_bets=user.total_bets,
wins=user.wins,
losses=user.losses,
win_rate=user.win_rate,
created_at=user.created_at,
))
return AdminUserListResponse(
users=user_items,
total=total,
page=page,
page_size=page_size,
)
@router.get("/users/{user_id}", response_model=AdminUserDetailResponse)
async def get_user_detail(
user_id: int,
db: AsyncSession = Depends(get_db),
admin: User = Depends(get_admin_user)
):
"""Get detailed user information."""
result = await db.execute(
select(User).options(selectinload(User.wallet)).where(User.id == user_id)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="User not found")
wallet = user.wallet
# Get additional counts
open_bets = (await db.execute(
select(func.count(SpreadBet.id)).where(
(SpreadBet.creator_id == user_id) & (SpreadBet.status == "open")
)
)).scalar() or 0
matched_bets = (await db.execute(
select(func.count(SpreadBet.id)).where(
((SpreadBet.creator_id == user_id) | (SpreadBet.taker_id == user_id)) &
(SpreadBet.status == "matched")
)
)).scalar() or 0
transaction_count = (await db.execute(
select(func.count(Transaction.id)).where(Transaction.user_id == user_id)
)).scalar() or 0
return AdminUserDetailResponse(
id=user.id,
email=user.email,
username=user.username,
display_name=user.display_name,
avatar_url=user.avatar_url,
bio=user.bio,
is_admin=user.is_admin,
status=user.status.value,
created_at=user.created_at,
updated_at=user.updated_at,
balance=wallet.balance if wallet else Decimal("0.00"),
escrow=wallet.escrow if wallet else Decimal("0.00"),
total_bets=user.total_bets,
wins=user.wins,
losses=user.losses,
win_rate=user.win_rate,
open_bets_count=open_bets,
matched_bets_count=matched_bets,
transaction_count=transaction_count,
)
@router.patch("/users/{user_id}")
async def update_user(
user_id: int,
request: AdminUserUpdateRequest,
db: AsyncSession = Depends(get_db),
admin: User = Depends(get_admin_user),
req: Request = None,
):
"""Update user details."""
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="User not found")
changes = {}
ip = get_client_ip(req) if req else None
if request.display_name is not None and request.display_name != user.display_name:
changes["display_name"] = {"old": user.display_name, "new": request.display_name}
user.display_name = request.display_name
if request.email is not None and request.email != user.email:
# Check if email already exists
existing = await db.execute(select(User).where(User.email == request.email))
if existing.scalar_one_or_none():
raise HTTPException(status_code=400, detail="Email already in use")
changes["email"] = {"old": user.email, "new": request.email}
user.email = request.email
if request.is_admin is not None and request.is_admin != user.is_admin:
# Cannot remove own admin status
if user.id == admin.id and not request.is_admin:
raise HTTPException(status_code=400, detail="Cannot remove your own admin privileges")
await log_user_admin_change(db, admin, user, request.is_admin, ip)
changes["is_admin"] = {"old": user.is_admin, "new": request.is_admin}
user.is_admin = request.is_admin
if changes:
await log_user_update(db, admin, user, changes, ip)
await db.commit()
return {"message": "User updated successfully", "changes": changes}
@router.patch("/users/{user_id}/status")
async def change_user_status(
user_id: int,
request: AdminUserStatusRequest,
db: AsyncSession = Depends(get_db),
admin: User = Depends(get_admin_user),
req: Request = None,
):
"""Enable or disable a user."""
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="User not found")
# Cannot suspend yourself
if user.id == admin.id:
raise HTTPException(status_code=400, detail="Cannot change your own status")
old_status = user.status.value
new_status = UserStatus(request.status)
if user.status == new_status:
return {"message": "Status unchanged"}
user.status = new_status
ip = get_client_ip(req) if req else None
await log_user_status_change(db, admin, user, old_status, request.status, request.reason, ip)
await db.commit()
return {"message": f"User status changed to {request.status}"}
@router.post("/users/{user_id}/balance", response_model=AdminBalanceAdjustResponse)
async def adjust_user_balance(
user_id: int,
request: AdminBalanceAdjustRequest,
db: AsyncSession = Depends(get_db),
admin: User = Depends(get_admin_user),
req: Request = None,
):
"""Adjust user balance (add or subtract funds)."""
result = await db.execute(
select(User).options(selectinload(User.wallet)).where(User.id == user_id)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="User not found")
wallet = user.wallet
if not wallet:
raise HTTPException(status_code=400, detail="User has no wallet")
previous_balance = wallet.balance
new_balance = previous_balance + request.amount
# Validate new balance
if new_balance < Decimal("0.00"):
raise HTTPException(
status_code=400,
detail=f"Cannot reduce balance below $0. Current: ${previous_balance}, Adjustment: ${request.amount}"
)
if new_balance < wallet.escrow:
raise HTTPException(
status_code=400,
detail=f"Cannot reduce balance below escrow amount (${wallet.escrow})"
)
wallet.balance = new_balance
# Create transaction record
tx_type = TransactionType.ADMIN_CREDIT if request.amount > 0 else TransactionType.ADMIN_DEBIT
transaction = Transaction(
user_id=user.id,
wallet_id=wallet.id,
type=tx_type,
amount=request.amount,
balance_after=new_balance,
description=f"Admin adjustment: {request.reason}",
status=TransactionStatus.COMPLETED,
)
db.add(transaction)
await db.flush()
ip = get_client_ip(req) if req else None
await log_user_balance_adjust(
db, admin, user,
float(previous_balance), float(new_balance), float(request.amount),
request.reason, transaction.id, ip
)
await db.commit()
return AdminBalanceAdjustResponse(
success=True,
user_id=user.id,
username=user.username,
previous_balance=previous_balance,
adjustment=request.amount,
new_balance=new_balance,
reason=request.reason,
transaction_id=transaction.id,
)
# ============================================================
# Settings Management (existing endpoints, enhanced with audit)
# ============================================================
@router.get("/settings")
async def get_admin_settings(
db: AsyncSession = Depends(get_db),
@ -42,7 +577,8 @@ async def get_admin_settings(
async def update_admin_settings(
updates: dict,
db: AsyncSession = Depends(get_db),
admin: User = Depends(get_admin_user)
admin: User = Depends(get_admin_user),
req: Request = None,
):
result = await db.execute(select(AdminSettings).limit(1))
settings = result.scalar_one_or_none()
@ -50,32 +586,49 @@ async def update_admin_settings(
settings = AdminSettings()
db.add(settings)
changes = {}
for key, value in updates.items():
if hasattr(settings, key):
setattr(settings, key, value)
old_value = getattr(settings, key)
if old_value != value:
changes[key] = {"old": str(old_value), "new": str(value)}
setattr(settings, key, value)
if changes:
ip = get_client_ip(req) if req else None
await log_settings_update(db, admin, changes, ip)
await db.commit()
await db.refresh(settings)
return settings
# ============================================================
# Event Management (existing endpoints, enhanced with audit)
# ============================================================
@router.post("/events", response_model=SportEventSchema)
async def create_event(
event_data: SportEventCreate,
db: AsyncSession = Depends(get_db),
admin: User = Depends(get_admin_user)
admin: User = Depends(get_admin_user),
req: Request = None,
):
event = SportEvent(
**event_data.model_dump(),
created_by=admin.id
)
db.add(event)
await db.flush()
ip = get_client_ip(req) if req else None
await log_event_create(db, admin, event.id, f"{event.home_team} vs {event.away_team}", ip)
await db.commit()
await db.refresh(event)
return event
@router.get("/events", response_model=List[SportEventSchema])
@router.get("/events", response_model=list[SportEventSchema])
async def list_events(
skip: int = 0,
limit: int = 50,
@ -93,7 +646,8 @@ async def update_event(
event_id: int,
updates: SportEventUpdate,
db: AsyncSession = Depends(get_db),
admin: User = Depends(get_admin_user)
admin: User = Depends(get_admin_user),
req: Request = None,
):
result = await db.execute(select(SportEvent).where(SportEvent.id == event_id))
event = result.scalar_one_or_none()
@ -101,8 +655,16 @@ async def update_event(
raise HTTPException(status_code=404, detail="Event not found")
update_data = updates.model_dump(exclude_unset=True)
changes = {}
for key, value in update_data.items():
setattr(event, key, value)
old_value = getattr(event, key)
if old_value != value:
changes[key] = {"old": str(old_value), "new": str(value)}
setattr(event, key, value)
if changes:
ip = get_client_ip(req) if req else None
await log_event_update(db, admin, event.id, f"{event.home_team} vs {event.away_team}", changes, ip)
await db.commit()
await db.refresh(event)
@ -113,7 +675,8 @@ async def update_event(
async def delete_event(
event_id: int,
db: AsyncSession = Depends(get_db),
admin: User = Depends(get_admin_user)
admin: User = Depends(get_admin_user),
req: Request = None,
):
result = await db.execute(select(SportEvent).where(SportEvent.id == event_id))
event = result.scalar_one_or_none()
@ -133,6 +696,11 @@ async def delete_event(
detail="Cannot delete event with matched bets"
)
event_title = f"{event.home_team} vs {event.away_team}"
ip = get_client_ip(req) if req else None
await log_event_delete(db, admin, event_id, event_title, ip)
await db.delete(event)
await db.commit()
return {"message": "Event deleted"}

View File

@ -0,0 +1,265 @@
from pydantic import BaseModel, Field
from datetime import datetime
from decimal import Decimal
from typing import Optional, Literal
from enum import Enum
# ============================================================
# Audit Log Schemas
# ============================================================
class AuditLogAction(str, Enum):
DATA_WIPE = "DATA_WIPE"
DATA_SEED = "DATA_SEED"
SIMULATION_START = "SIMULATION_START"
SIMULATION_STOP = "SIMULATION_STOP"
USER_STATUS_CHANGE = "USER_STATUS_CHANGE"
USER_BALANCE_ADJUST = "USER_BALANCE_ADJUST"
USER_ADMIN_GRANT = "USER_ADMIN_GRANT"
USER_ADMIN_REVOKE = "USER_ADMIN_REVOKE"
USER_UPDATE = "USER_UPDATE"
SETTINGS_UPDATE = "SETTINGS_UPDATE"
EVENT_CREATE = "EVENT_CREATE"
EVENT_UPDATE = "EVENT_UPDATE"
EVENT_DELETE = "EVENT_DELETE"
class AuditLogResponse(BaseModel):
id: int
admin_id: int
admin_username: str
action: str
target_type: Optional[str] = None
target_id: Optional[int] = None
description: str
details: Optional[str] = None
ip_address: Optional[str] = None
created_at: datetime
class Config:
from_attributes = True
class AuditLogListResponse(BaseModel):
logs: list[AuditLogResponse]
total: int
page: int
page_size: int
# ============================================================
# Data Wiper Schemas
# ============================================================
class WipePreviewResponse(BaseModel):
"""Preview of what will be deleted in a wipe operation."""
users_count: int
wallets_count: int
transactions_count: int
bets_count: int
spread_bets_count: int
events_count: int
event_comments_count: int
match_comments_count: int
admin_settings_preserved: bool = True
admin_users_preserved: bool = True
can_wipe: bool = True
cooldown_remaining_seconds: int = 0
last_wipe_at: Optional[datetime] = None
class WipeRequest(BaseModel):
"""Request to execute a data wipe."""
confirmation_phrase: str = Field(..., description="Must be exactly 'CONFIRM WIPE'")
preserve_admin_users: bool = Field(default=True, description="Keep admin users and their wallets")
preserve_events: bool = Field(default=False, description="Keep sport events but delete bets")
class WipeResponse(BaseModel):
"""Response after a successful wipe operation."""
success: bool
message: str
deleted_counts: dict[str, int]
preserved_counts: dict[str, int]
executed_at: datetime
executed_by: str
# ============================================================
# Data Seeder Schemas
# ============================================================
class SeedRequest(BaseModel):
"""Request to seed the database with test data."""
num_users: int = Field(default=10, ge=1, le=100, description="Number of users to create")
num_events: int = Field(default=5, ge=0, le=50, description="Number of sport events to create")
num_bets_per_event: int = Field(default=3, ge=0, le=20, description="Average bets per event")
starting_balance: Decimal = Field(default=Decimal("1000.00"), ge=Decimal("100"), le=Decimal("10000"))
create_admin: bool = Field(default=True, description="Create a test admin user")
class SeedResponse(BaseModel):
"""Response after seeding the database."""
success: bool
message: str
created_counts: dict[str, int]
test_admin: Optional[dict] = None # Contains username/password if admin created
# ============================================================
# Simulation Schemas
# ============================================================
class SimulationConfig(BaseModel):
"""Configuration for activity simulation."""
delay_seconds: float = Field(default=2.0, ge=0.5, le=30.0, description="Delay between actions")
actions_per_iteration: int = Field(default=3, ge=1, le=10, description="Actions per iteration")
create_users: bool = Field(default=True, description="Allow creating new users")
create_bets: bool = Field(default=True, description="Allow creating bets")
take_bets: bool = Field(default=True, description="Allow taking/matching bets")
add_comments: bool = Field(default=True, description="Allow adding comments")
cancel_bets: bool = Field(default=True, description="Allow cancelling bets")
class SimulationStatusResponse(BaseModel):
"""Response with current simulation status."""
is_running: bool
started_at: Optional[datetime] = None
started_by: Optional[str] = None
iterations_completed: int = 0
config: Optional[SimulationConfig] = None
last_activity: Optional[str] = None
class SimulationStartRequest(BaseModel):
"""Request to start simulation."""
config: Optional[SimulationConfig] = None
class SimulationStartResponse(BaseModel):
"""Response after starting simulation."""
success: bool
message: str
status: SimulationStatusResponse
class SimulationStopResponse(BaseModel):
"""Response after stopping simulation."""
success: bool
message: str
total_iterations: int
ran_for_seconds: float
# ============================================================
# User Management Schemas
# ============================================================
class AdminUserListItem(BaseModel):
"""User item in admin user list."""
id: int
email: str
username: str
display_name: Optional[str] = None
is_admin: bool
status: str
balance: Decimal
escrow: Decimal
total_bets: int
wins: int
losses: int
win_rate: float
created_at: datetime
class Config:
from_attributes = True
class AdminUserListResponse(BaseModel):
"""Paginated list of users."""
users: list[AdminUserListItem]
total: int
page: int
page_size: int
class AdminUserDetailResponse(BaseModel):
"""Detailed user info for admin."""
id: int
email: str
username: str
display_name: Optional[str] = None
avatar_url: Optional[str] = None
bio: Optional[str] = None
is_admin: bool
status: str
created_at: datetime
updated_at: datetime
# Wallet info
balance: Decimal
escrow: Decimal
# Stats
total_bets: int
wins: int
losses: int
win_rate: float
# Counts
open_bets_count: int
matched_bets_count: int
transaction_count: int
class Config:
from_attributes = True
class AdminUserUpdateRequest(BaseModel):
"""Request to update user details."""
display_name: Optional[str] = None
email: Optional[str] = None
is_admin: Optional[bool] = None
class AdminUserStatusRequest(BaseModel):
"""Request to change user status."""
status: Literal["active", "suspended"]
reason: Optional[str] = None
class AdminBalanceAdjustRequest(BaseModel):
"""Request to adjust user balance."""
amount: Decimal = Field(..., description="Positive to add, negative to subtract")
reason: str = Field(..., min_length=5, max_length=500, description="Reason for adjustment")
class AdminBalanceAdjustResponse(BaseModel):
"""Response after balance adjustment."""
success: bool
user_id: int
username: str
previous_balance: Decimal
adjustment: Decimal
new_balance: Decimal
reason: str
transaction_id: int
# ============================================================
# Admin Dashboard Stats
# ============================================================
class AdminDashboardStats(BaseModel):
"""Dashboard statistics for admin panel."""
total_users: int
active_users: int
suspended_users: int
admin_users: int
total_events: int
upcoming_events: int
live_events: int
total_bets: int
open_bets: int
matched_bets: int
total_volume: Decimal
escrow_locked: Decimal
simulation_running: bool

View File

@ -42,6 +42,7 @@ class UserResponse(BaseModel):
losses: int
win_rate: float
status: UserStatus
is_admin: bool = False
created_at: datetime
model_config = ConfigDict(from_attributes=True)

View File

@ -0,0 +1,370 @@
"""
Audit Service for logging admin actions.
All admin operations should call this service to create audit trails.
"""
import json
from typing import Optional, Any
from datetime import datetime
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from app.models.admin_audit_log import AdminAuditLog
from app.models import User
class AuditService:
"""Service for creating and querying audit logs."""
@staticmethod
async def log(
db: AsyncSession,
admin: User,
action: str,
description: str,
target_type: Optional[str] = None,
target_id: Optional[int] = None,
details: Optional[dict[str, Any]] = None,
ip_address: Optional[str] = None,
) -> AdminAuditLog:
"""
Create an audit log entry.
Args:
db: Database session
admin: The admin user performing the action
action: Action code (e.g., DATA_WIPE, USER_UPDATE)
description: Human-readable description
target_type: Type of target (user, event, bet, etc.)
target_id: ID of the target entity
details: Additional details as a dictionary
ip_address: IP address of the admin
Returns:
The created audit log entry
"""
log_entry = AdminAuditLog(
admin_id=admin.id,
admin_username=admin.username,
action=action,
target_type=target_type,
target_id=target_id,
description=description,
details=json.dumps(details) if details else None,
ip_address=ip_address,
)
db.add(log_entry)
await db.flush()
return log_entry
@staticmethod
async def get_logs(
db: AsyncSession,
page: int = 1,
page_size: int = 50,
action_filter: Optional[str] = None,
admin_id_filter: Optional[int] = None,
target_type_filter: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
) -> tuple[list[AdminAuditLog], int]:
"""
Get paginated audit logs with optional filters.
Returns:
Tuple of (logs, total_count)
"""
query = select(AdminAuditLog)
# Apply filters
if action_filter:
query = query.where(AdminAuditLog.action == action_filter)
if admin_id_filter:
query = query.where(AdminAuditLog.admin_id == admin_id_filter)
if target_type_filter:
query = query.where(AdminAuditLog.target_type == target_type_filter)
if start_date:
query = query.where(AdminAuditLog.created_at >= start_date)
if end_date:
query = query.where(AdminAuditLog.created_at <= end_date)
# Get total count
count_query = select(func.count()).select_from(query.subquery())
total_result = await db.execute(count_query)
total = total_result.scalar() or 0
# Apply pagination and ordering
query = query.order_by(AdminAuditLog.created_at.desc())
query = query.offset((page - 1) * page_size).limit(page_size)
result = await db.execute(query)
logs = list(result.scalars().all())
return logs, total
@staticmethod
async def get_latest_action(
db: AsyncSession,
action: str,
) -> Optional[AdminAuditLog]:
"""Get the most recent log entry for a specific action."""
query = (
select(AdminAuditLog)
.where(AdminAuditLog.action == action)
.order_by(AdminAuditLog.created_at.desc())
.limit(1)
)
result = await db.execute(query)
return result.scalar_one_or_none()
# Convenience functions for common audit actions
async def log_data_wipe(
db: AsyncSession,
admin: User,
deleted_counts: dict[str, int],
preserved_counts: dict[str, int],
ip_address: Optional[str] = None,
) -> AdminAuditLog:
"""Log a data wipe operation."""
return await AuditService.log(
db=db,
admin=admin,
action="DATA_WIPE",
description=f"Database wiped by {admin.username}",
details={
"deleted": deleted_counts,
"preserved": preserved_counts,
},
ip_address=ip_address,
)
async def log_data_seed(
db: AsyncSession,
admin: User,
created_counts: dict[str, int],
ip_address: Optional[str] = None,
) -> AdminAuditLog:
"""Log a data seed operation."""
return await AuditService.log(
db=db,
admin=admin,
action="DATA_SEED",
description=f"Database seeded by {admin.username}",
details={"created": created_counts},
ip_address=ip_address,
)
async def log_simulation_start(
db: AsyncSession,
admin: User,
config: dict,
ip_address: Optional[str] = None,
) -> AdminAuditLog:
"""Log simulation start."""
return await AuditService.log(
db=db,
admin=admin,
action="SIMULATION_START",
description=f"Simulation started by {admin.username}",
details={"config": config},
ip_address=ip_address,
)
async def log_simulation_stop(
db: AsyncSession,
admin: User,
iterations: int,
duration_seconds: float,
ip_address: Optional[str] = None,
) -> AdminAuditLog:
"""Log simulation stop."""
return await AuditService.log(
db=db,
admin=admin,
action="SIMULATION_STOP",
description=f"Simulation stopped by {admin.username} after {iterations} iterations",
details={
"iterations": iterations,
"duration_seconds": duration_seconds,
},
ip_address=ip_address,
)
async def log_user_status_change(
db: AsyncSession,
admin: User,
target_user: User,
old_status: str,
new_status: str,
reason: Optional[str] = None,
ip_address: Optional[str] = None,
) -> AdminAuditLog:
"""Log user status change."""
return await AuditService.log(
db=db,
admin=admin,
action="USER_STATUS_CHANGE",
description=f"{admin.username} changed {target_user.username}'s status from {old_status} to {new_status}",
target_type="user",
target_id=target_user.id,
details={
"old_status": old_status,
"new_status": new_status,
"reason": reason,
},
ip_address=ip_address,
)
async def log_user_balance_adjust(
db: AsyncSession,
admin: User,
target_user: User,
old_balance: float,
new_balance: float,
amount: float,
reason: str,
transaction_id: int,
ip_address: Optional[str] = None,
) -> AdminAuditLog:
"""Log user balance adjustment."""
return await AuditService.log(
db=db,
admin=admin,
action="USER_BALANCE_ADJUST",
description=f"{admin.username} adjusted {target_user.username}'s balance by ${amount:+.2f}",
target_type="user",
target_id=target_user.id,
details={
"old_balance": old_balance,
"new_balance": new_balance,
"amount": amount,
"reason": reason,
"transaction_id": transaction_id,
},
ip_address=ip_address,
)
async def log_user_admin_change(
db: AsyncSession,
admin: User,
target_user: User,
granted: bool,
ip_address: Optional[str] = None,
) -> AdminAuditLog:
"""Log admin privilege grant/revoke."""
action = "USER_ADMIN_GRANT" if granted else "USER_ADMIN_REVOKE"
verb = "granted admin privileges to" if granted else "revoked admin privileges from"
return await AuditService.log(
db=db,
admin=admin,
action=action,
description=f"{admin.username} {verb} {target_user.username}",
target_type="user",
target_id=target_user.id,
details={"granted": granted},
ip_address=ip_address,
)
async def log_user_update(
db: AsyncSession,
admin: User,
target_user: User,
changes: dict[str, Any],
ip_address: Optional[str] = None,
) -> AdminAuditLog:
"""Log user details update."""
return await AuditService.log(
db=db,
admin=admin,
action="USER_UPDATE",
description=f"{admin.username} updated {target_user.username}'s profile",
target_type="user",
target_id=target_user.id,
details={"changes": changes},
ip_address=ip_address,
)
async def log_settings_update(
db: AsyncSession,
admin: User,
changes: dict[str, Any],
ip_address: Optional[str] = None,
) -> AdminAuditLog:
"""Log platform settings update."""
return await AuditService.log(
db=db,
admin=admin,
action="SETTINGS_UPDATE",
description=f"{admin.username} updated platform settings",
details={"changes": changes},
ip_address=ip_address,
)
async def log_event_create(
db: AsyncSession,
admin: User,
event_id: int,
event_title: str,
ip_address: Optional[str] = None,
) -> AdminAuditLog:
"""Log sport event creation."""
return await AuditService.log(
db=db,
admin=admin,
action="EVENT_CREATE",
description=f"{admin.username} created event: {event_title}",
target_type="event",
target_id=event_id,
ip_address=ip_address,
)
async def log_event_update(
db: AsyncSession,
admin: User,
event_id: int,
event_title: str,
changes: dict[str, Any],
ip_address: Optional[str] = None,
) -> AdminAuditLog:
"""Log sport event update."""
return await AuditService.log(
db=db,
admin=admin,
action="EVENT_UPDATE",
description=f"{admin.username} updated event: {event_title}",
target_type="event",
target_id=event_id,
details={"changes": changes},
ip_address=ip_address,
)
async def log_event_delete(
db: AsyncSession,
admin: User,
event_id: int,
event_title: str,
ip_address: Optional[str] = None,
) -> AdminAuditLog:
"""Log sport event deletion."""
return await AuditService.log(
db=db,
admin=admin,
action="EVENT_DELETE",
description=f"{admin.username} deleted event: {event_title}",
target_type="event",
target_id=event_id,
ip_address=ip_address,
)

View File

@ -0,0 +1,262 @@
"""
Data Seeder Service for populating the database with test data.
Can be controlled via API for admin-initiated seeding.
"""
import random
from decimal import Decimal
from datetime import datetime, timedelta
from typing import Optional
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.models import (
User, Wallet, SportEvent, SpreadBet, EventComment,
SportType, EventStatus, SpreadBetStatus, TeamSide
)
from app.schemas.admin import SeedRequest, SeedResponse
from app.services.audit_service import log_data_seed
from app.utils.security import get_password_hash
# Sample data for generating random content
FIRST_NAMES = [
"James", "Emma", "Liam", "Olivia", "Noah", "Ava", "Oliver", "Sophia",
"Elijah", "Isabella", "Lucas", "Mia", "Mason", "Charlotte", "Ethan",
"Amelia", "Alexander", "Harper", "Henry", "Evelyn", "Sebastian", "Luna",
"Jack", "Camila", "Aiden", "Gianna", "Owen", "Abigail", "Samuel", "Ella",
]
LAST_NAMES = [
"Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia", "Miller",
"Davis", "Rodriguez", "Martinez", "Hernandez", "Lopez", "Gonzalez",
"Wilson", "Anderson", "Thomas", "Taylor", "Moore", "Jackson", "Martin",
]
NFL_TEAMS = [
("Kansas City Chiefs", "Arrowhead Stadium"),
("San Francisco 49ers", "Levi's Stadium"),
("Philadelphia Eagles", "Lincoln Financial Field"),
("Dallas Cowboys", "AT&T Stadium"),
("Buffalo Bills", "Highmark Stadium"),
("Miami Dolphins", "Hard Rock Stadium"),
("Detroit Lions", "Ford Field"),
("Green Bay Packers", "Lambeau Field"),
("Baltimore Ravens", "M&T Bank Stadium"),
("Cincinnati Bengals", "Paycor Stadium"),
]
NBA_TEAMS = [
("Boston Celtics", "TD Garden"),
("Denver Nuggets", "Ball Arena"),
("Milwaukee Bucks", "Fiserv Forum"),
("Los Angeles Lakers", "Crypto.com Arena"),
("Phoenix Suns", "Footprint Center"),
("Golden State Warriors", "Chase Center"),
("Miami Heat", "Kaseya Center"),
("Cleveland Cavaliers", "Rocket Mortgage FieldHouse"),
]
EVENT_COMMENTS = [
"This is going to be a great game!",
"Home team looking strong this season",
"I'm betting on the underdog here",
"What do you all think about the spread?",
"Last time these teams played it was close",
"The odds seem off to me",
"Sharp money coming in on the home side",
"Value play on the underdog here",
"Home field advantage is huge here",
"Rivalry game, throw out the records!",
]
class SeederService:
"""Service for seeding the database with test data."""
@staticmethod
async def seed(
db: AsyncSession,
admin: User,
request: SeedRequest,
ip_address: Optional[str] = None,
) -> SeedResponse:
"""
Seed the database with test data.
"""
created_counts = {
"users": 0,
"wallets": 0,
"events": 0,
"bets": 0,
"comments": 0,
}
test_admin_info = None
# Create test admin if requested and doesn't exist
if request.create_admin:
existing_admin = await db.execute(
select(User).where(User.username == "testadmin")
)
if not existing_admin.scalar_one_or_none():
test_admin = User(
email="testadmin@example.com",
username="testadmin",
password_hash=get_password_hash("admin123"),
display_name="Test Administrator",
is_admin=True,
)
db.add(test_admin)
await db.flush()
admin_wallet = Wallet(
user_id=test_admin.id,
balance=Decimal("10000.00"),
escrow=Decimal("0.00"),
)
db.add(admin_wallet)
created_counts["users"] += 1
created_counts["wallets"] += 1
test_admin_info = {
"username": "testadmin",
"email": "testadmin@example.com",
"password": "admin123",
}
# Create regular test users
users = []
for i in range(request.num_users):
first = random.choice(FIRST_NAMES)
last = random.choice(LAST_NAMES)
suffix = random.randint(100, 9999)
username = f"{first.lower()}{last.lower()}{suffix}"
email = f"{username}@example.com"
# Check if user already exists
existing = await db.execute(
select(User).where(User.username == username)
)
if existing.scalar_one_or_none():
continue
user = User(
email=email,
username=username,
password_hash=get_password_hash("password123"),
display_name=f"{first} {last}",
)
db.add(user)
await db.flush()
wallet = Wallet(
user_id=user.id,
balance=request.starting_balance,
escrow=Decimal("0.00"),
)
db.add(wallet)
users.append(user)
created_counts["users"] += 1
created_counts["wallets"] += 1
# Create sport events
events = []
all_teams = [(t, v, SportType.FOOTBALL, "NFL") for t, v in NFL_TEAMS] + \
[(t, v, SportType.BASKETBALL, "NBA") for t, v in NBA_TEAMS]
for i in range(request.num_events):
# Pick two different teams from the same sport
team_pool = random.choice([NFL_TEAMS, NBA_TEAMS])
sport = SportType.FOOTBALL if team_pool == NFL_TEAMS else SportType.BASKETBALL
league = "NFL" if sport == SportType.FOOTBALL else "NBA"
home_team, venue = random.choice(team_pool)
away_team, _ = random.choice([t for t in team_pool if t[0] != home_team])
# Random game time in the next 7 days
game_time = datetime.utcnow() + timedelta(
days=random.randint(1, 7),
hours=random.randint(0, 23),
)
# Random spread
spread = round(random.uniform(-10, 10) * 2) / 2 # Half-point spread
event = SportEvent(
sport=sport,
home_team=home_team,
away_team=away_team,
official_spread=spread,
game_time=game_time,
venue=venue,
league=league,
min_spread=-15.0,
max_spread=15.0,
min_bet_amount=Decimal("10.00"),
max_bet_amount=Decimal("1000.00"),
status=EventStatus.UPCOMING,
created_by=admin.id,
)
db.add(event)
await db.flush()
events.append(event)
created_counts["events"] += 1
# Create bets on events
if users and events:
for event in events:
num_bets = random.randint(1, request.num_bets_per_event * 2)
for _ in range(num_bets):
user = random.choice(users)
# Random spread near official
spread_offset = random.choice([-2, -1.5, -1, -0.5, 0, 0.5, 1, 1.5, 2])
spread = event.official_spread + spread_offset
spread = max(event.min_spread, min(event.max_spread, spread))
# Ensure half-point
spread = round(spread * 2) / 2
if spread % 1 == 0:
spread += 0.5
stake = Decimal(str(random.randint(25, 200)))
team = random.choice([TeamSide.HOME, TeamSide.AWAY])
bet = SpreadBet(
event_id=event.id,
spread=spread,
team=team,
creator_id=user.id,
stake_amount=stake,
house_commission_percent=Decimal("10.00"),
status=SpreadBetStatus.OPEN,
)
db.add(bet)
created_counts["bets"] += 1
# Add some comments
for _ in range(random.randint(0, 3)):
user = random.choice(users)
comment = EventComment(
event_id=event.id,
user_id=user.id,
content=random.choice(EVENT_COMMENTS),
)
db.add(comment)
created_counts["comments"] += 1
# Log the seed operation
await log_data_seed(
db=db,
admin=admin,
created_counts=created_counts,
ip_address=ip_address,
)
await db.commit()
return SeedResponse(
success=True,
message="Database seeded successfully",
created_counts=created_counts,
test_admin=test_admin_info,
)

View File

@ -0,0 +1,382 @@
"""
Simulation Service for running automated activity in the background.
This service manages starting/stopping simulated user activity.
"""
import asyncio
import random
from decimal import Decimal
from datetime import datetime
from typing import Optional
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from app.database import async_session
from app.models import (
User, Wallet, SportEvent, SpreadBet, EventComment, MatchComment,
EventStatus, SpreadBetStatus, TeamSide, Transaction, TransactionType, TransactionStatus
)
from app.schemas.admin import SimulationConfig, SimulationStatusResponse
from app.utils.security import get_password_hash
# Sample data for simulation
FIRST_NAMES = [
"James", "Emma", "Liam", "Olivia", "Noah", "Ava", "Oliver", "Sophia",
"Elijah", "Isabella", "Lucas", "Mia", "Mason", "Charlotte", "Ethan",
]
LAST_NAMES = [
"Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia", "Miller",
"Davis", "Rodriguez", "Martinez",
]
EVENT_COMMENTS = [
"This is going to be a great game!",
"Home team looking strong this season",
"I'm betting on the underdog here",
"What do you all think about the spread?",
"The odds seem off to me",
"Sharp money coming in on the home side",
"Home field advantage is huge here",
]
MATCH_COMMENTS = [
"Good luck!",
"May the best bettor win",
"I'm feeling confident about this one",
"Nice bet, looking forward to the game",
"GL HF",
]
class SimulationManager:
"""
Singleton manager for controlling simulation state.
Runs simulation in a background asyncio task.
"""
_instance: Optional["SimulationManager"] = None
_task: Optional[asyncio.Task] = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self._initialized = True
self._running = False
self._started_at: Optional[datetime] = None
self._started_by: Optional[str] = None
self._iterations = 0
self._config: Optional[SimulationConfig] = None
self._last_activity: Optional[str] = None
self._stop_event = asyncio.Event()
@property
def is_running(self) -> bool:
return self._running
def get_status(self) -> SimulationStatusResponse:
return SimulationStatusResponse(
is_running=self._running,
started_at=self._started_at,
started_by=self._started_by,
iterations_completed=self._iterations,
config=self._config,
last_activity=self._last_activity,
)
async def start(self, admin_username: str, config: Optional[SimulationConfig] = None) -> bool:
"""Start the simulation in a background task."""
if self._running:
return False
self._config = config or SimulationConfig()
self._running = True
self._started_at = datetime.utcnow()
self._started_by = admin_username
self._iterations = 0
self._stop_event.clear()
# Start background task
SimulationManager._task = asyncio.create_task(self._run_simulation())
return True
async def stop(self) -> tuple[int, float]:
"""Stop the simulation and return stats."""
if not self._running:
return 0, 0.0
self._stop_event.set()
if SimulationManager._task:
try:
await asyncio.wait_for(SimulationManager._task, timeout=5.0)
except asyncio.TimeoutError:
SimulationManager._task.cancel()
iterations = self._iterations
duration = (datetime.utcnow() - self._started_at).total_seconds() if self._started_at else 0
self._running = False
self._started_at = None
self._started_by = None
self._iterations = 0
self._config = None
SimulationManager._task = None
return iterations, duration
async def _run_simulation(self):
"""Main simulation loop."""
while not self._stop_event.is_set():
try:
async with async_session() as db:
# Get existing users and events
users_result = await db.execute(select(User).where(User.is_admin == False))
users = list(users_result.scalars().all())
events_result = await db.execute(
select(SportEvent).where(SportEvent.status == EventStatus.UPCOMING)
)
events = list(events_result.scalars().all())
if not events:
self._last_activity = "No upcoming events - waiting..."
await asyncio.sleep(self._config.delay_seconds)
continue
# Perform random actions based on config
for _ in range(self._config.actions_per_iteration):
if self._stop_event.is_set():
break
action = self._pick_action()
try:
if action == "create_user" and self._config.create_users:
await self._create_user(db)
# Refresh users list
users_result = await db.execute(select(User).where(User.is_admin == False))
users = list(users_result.scalars().all())
elif action == "create_bet" and self._config.create_bets and users and events:
await self._create_bet(db, users, events)
elif action == "take_bet" and self._config.take_bets and users:
await self._take_bet(db, users)
elif action == "add_comment" and self._config.add_comments and users and events:
await self._add_comment(db, users, events)
elif action == "cancel_bet" and self._config.cancel_bets:
await self._cancel_bet(db)
except Exception as e:
self._last_activity = f"Error: {str(e)[:50]}"
self._iterations += 1
await asyncio.sleep(self._config.delay_seconds)
except Exception as e:
self._last_activity = f"Loop error: {str(e)[:50]}"
await asyncio.sleep(self._config.delay_seconds)
def _pick_action(self) -> str:
"""Pick a random action based on weights."""
actions = [
("create_user", 0.15),
("create_bet", 0.35),
("take_bet", 0.25),
("add_comment", 0.20),
("cancel_bet", 0.05),
]
rand = random.random()
cumulative = 0
for action, weight in actions:
cumulative += weight
if rand <= cumulative:
return action
return "create_bet"
async def _create_user(self, db):
"""Create a random user."""
first = random.choice(FIRST_NAMES)
last = random.choice(LAST_NAMES)
suffix = random.randint(100, 9999)
username = f"{first.lower()}{last.lower()}{suffix}"
email = f"{username}@example.com"
# Check if exists
existing = await db.execute(select(User).where(User.username == username))
if existing.scalar_one_or_none():
return
user = User(
email=email,
username=username,
password_hash=get_password_hash("password123"),
display_name=f"{first} {last}",
)
db.add(user)
await db.flush()
wallet = Wallet(
user_id=user.id,
balance=Decimal(str(random.randint(500, 5000))),
escrow=Decimal("0.00"),
)
db.add(wallet)
await db.commit()
self._last_activity = f"Created user: {username}"
async def _create_bet(self, db, users: list[User], events: list[SportEvent]):
"""Create a random bet."""
# Find users with balance
users_with_balance = []
for user in users:
wallet_result = await db.execute(select(Wallet).where(Wallet.user_id == user.id))
wallet = wallet_result.scalar_one_or_none()
if wallet and wallet.balance >= Decimal("10"):
users_with_balance.append((user, wallet))
if not users_with_balance:
return
user, wallet = random.choice(users_with_balance)
event = random.choice(events)
spread = round(random.uniform(event.min_spread, event.max_spread) * 2) / 2
if spread % 1 == 0:
spread += 0.5
max_stake = min(float(wallet.balance) * 0.5, 500)
stake = Decimal(str(round(random.uniform(10, max(10, max_stake)), 2)))
team = random.choice([TeamSide.HOME, TeamSide.AWAY])
bet = SpreadBet(
event_id=event.id,
spread=spread,
team=team,
creator_id=user.id,
stake_amount=stake,
house_commission_percent=Decimal("10.00"),
status=SpreadBetStatus.OPEN,
)
db.add(bet)
await db.commit()
team_name = event.home_team if team == TeamSide.HOME else event.away_team
self._last_activity = f"{user.username} created ${stake} bet on {team_name}"
async def _take_bet(self, db, users: list[User]):
"""Take a random open bet."""
result = await db.execute(
select(SpreadBet)
.options(selectinload(SpreadBet.event), selectinload(SpreadBet.creator))
.where(SpreadBet.status == SpreadBetStatus.OPEN)
)
open_bets = result.scalars().all()
if not open_bets:
return
bet = random.choice(open_bets)
# Find eligible takers
eligible = []
for user in users:
if user.id == bet.creator_id:
continue
wallet_result = await db.execute(select(Wallet).where(Wallet.user_id == user.id))
wallet = wallet_result.scalar_one_or_none()
if wallet and wallet.balance >= bet.stake_amount:
eligible.append((user, wallet))
if not eligible:
return
taker, taker_wallet = random.choice(eligible)
# Get creator wallet
creator_wallet_result = await db.execute(
select(Wallet).where(Wallet.user_id == bet.creator_id)
)
creator_wallet = creator_wallet_result.scalar_one_or_none()
if not creator_wallet or creator_wallet.balance < bet.stake_amount:
return
# Lock funds
creator_wallet.balance -= bet.stake_amount
creator_wallet.escrow += bet.stake_amount
taker_wallet.balance -= bet.stake_amount
taker_wallet.escrow += bet.stake_amount
# Create transactions
creator_tx = Transaction(
user_id=bet.creator_id,
wallet_id=creator_wallet.id,
type=TransactionType.ESCROW_LOCK,
amount=-bet.stake_amount,
balance_after=creator_wallet.balance,
reference_id=bet.id,
description=f"Escrow lock for spread bet #{bet.id}",
status=TransactionStatus.COMPLETED,
)
taker_tx = Transaction(
user_id=taker.id,
wallet_id=taker_wallet.id,
type=TransactionType.ESCROW_LOCK,
amount=-bet.stake_amount,
balance_after=taker_wallet.balance,
reference_id=bet.id,
description=f"Escrow lock for spread bet #{bet.id}",
status=TransactionStatus.COMPLETED,
)
db.add(creator_tx)
db.add(taker_tx)
bet.taker_id = taker.id
bet.status = SpreadBetStatus.MATCHED
bet.matched_at = datetime.utcnow()
await db.commit()
self._last_activity = f"{taker.username} took ${bet.stake_amount} bet"
async def _add_comment(self, db, users: list[User], events: list[SportEvent]):
"""Add a random comment."""
user = random.choice(users)
event = random.choice(events)
content = random.choice(EVENT_COMMENTS)
comment = EventComment(
event_id=event.id,
user_id=user.id,
content=content,
)
db.add(comment)
await db.commit()
self._last_activity = f"{user.username} commented on {event.home_team} vs {event.away_team}"
async def _cancel_bet(self, db):
"""Cancel a random open bet."""
if random.random() > 0.2: # Only 20% chance
return
result = await db.execute(
select(SpreadBet)
.options(selectinload(SpreadBet.creator))
.where(SpreadBet.status == SpreadBetStatus.OPEN)
)
open_bets = result.scalars().all()
if not open_bets:
return
bet = random.choice(open_bets)
bet.status = SpreadBetStatus.CANCELLED
await db.commit()
self._last_activity = f"{bet.creator.username} cancelled ${bet.stake_amount} bet"
# Global instance
simulation_manager = SimulationManager()

View File

@ -0,0 +1,224 @@
"""
Data Wiper Service for safely clearing database data.
Includes safeguards like confirmation phrases and cooldowns.
"""
from datetime import datetime, timedelta
from typing import Optional
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, delete, func
from app.models import (
User, Wallet, Transaction, Bet, SpreadBet, SportEvent,
EventComment, MatchComment, AdminSettings, AdminAuditLog,
UserStats, Achievement, UserAchievement, LootBox, ActivityFeed, DailyReward
)
from app.schemas.admin import WipePreviewResponse, WipeRequest, WipeResponse
from app.services.audit_service import log_data_wipe
# Cooldown between wipes (5 minutes)
WIPE_COOLDOWN_SECONDS = 300
CONFIRMATION_PHRASE = "CONFIRM WIPE"
class WiperService:
"""Service for wiping database data with safeguards."""
@staticmethod
async def get_preview(db: AsyncSession) -> WipePreviewResponse:
"""
Get a preview of what would be deleted in a wipe operation.
Also checks cooldown status.
"""
# Count all entities
users_count = (await db.execute(select(func.count(User.id)))).scalar() or 0
admin_users = (await db.execute(
select(func.count(User.id)).where(User.is_admin == True)
)).scalar() or 0
wallets_count = (await db.execute(select(func.count(Wallet.id)))).scalar() or 0
transactions_count = (await db.execute(select(func.count(Transaction.id)))).scalar() or 0
bets_count = (await db.execute(select(func.count(Bet.id)))).scalar() or 0
spread_bets_count = (await db.execute(select(func.count(SpreadBet.id)))).scalar() or 0
events_count = (await db.execute(select(func.count(SportEvent.id)))).scalar() or 0
event_comments_count = (await db.execute(select(func.count(EventComment.id)))).scalar() or 0
match_comments_count = (await db.execute(select(func.count(MatchComment.id)))).scalar() or 0
# Check last wipe time from audit log
last_wipe_log = await db.execute(
select(AdminAuditLog)
.where(AdminAuditLog.action == "DATA_WIPE")
.order_by(AdminAuditLog.created_at.desc())
.limit(1)
)
last_wipe = last_wipe_log.scalar_one_or_none()
cooldown_remaining = 0
can_wipe = True
last_wipe_at = None
if last_wipe:
last_wipe_at = last_wipe.created_at
elapsed = (datetime.utcnow() - last_wipe.created_at).total_seconds()
if elapsed < WIPE_COOLDOWN_SECONDS:
cooldown_remaining = int(WIPE_COOLDOWN_SECONDS - elapsed)
can_wipe = False
return WipePreviewResponse(
users_count=users_count - admin_users, # Non-admin users that would be deleted
wallets_count=wallets_count,
transactions_count=transactions_count,
bets_count=bets_count,
spread_bets_count=spread_bets_count,
events_count=events_count,
event_comments_count=event_comments_count,
match_comments_count=match_comments_count,
admin_settings_preserved=True,
admin_users_preserved=True,
can_wipe=can_wipe,
cooldown_remaining_seconds=cooldown_remaining,
last_wipe_at=last_wipe_at,
)
@staticmethod
async def execute_wipe(
db: AsyncSession,
admin: User,
request: WipeRequest,
ip_address: Optional[str] = None,
) -> WipeResponse:
"""
Execute a database wipe with safeguards.
Raises:
ValueError: If confirmation phrase is wrong or cooldown not elapsed
"""
# Verify confirmation phrase
if request.confirmation_phrase != CONFIRMATION_PHRASE:
raise ValueError(f"Invalid confirmation phrase. Must be exactly '{CONFIRMATION_PHRASE}'")
# Check cooldown
preview = await WiperService.get_preview(db)
if not preview.can_wipe:
raise ValueError(
f"Wipe cooldown in effect. Please wait {preview.cooldown_remaining_seconds} seconds."
)
deleted_counts = {}
preserved_counts = {}
# Get admin user IDs to preserve
admin_user_ids = []
if request.preserve_admin_users:
admin_users_result = await db.execute(
select(User.id).where(User.is_admin == True)
)
admin_user_ids = [row[0] for row in admin_users_result.fetchall()]
preserved_counts["admin_users"] = len(admin_user_ids)
# Delete in order respecting foreign keys:
# 1. Comments (no FK dependencies)
result = await db.execute(delete(MatchComment))
deleted_counts["match_comments"] = result.rowcount
result = await db.execute(delete(EventComment))
deleted_counts["event_comments"] = result.rowcount
# 2. Gamification data
result = await db.execute(delete(ActivityFeed))
deleted_counts["activity_feed"] = result.rowcount
result = await db.execute(delete(DailyReward))
deleted_counts["daily_rewards"] = result.rowcount
result = await db.execute(delete(LootBox))
deleted_counts["loot_boxes"] = result.rowcount
result = await db.execute(delete(UserAchievement))
deleted_counts["user_achievements"] = result.rowcount
result = await db.execute(delete(UserStats))
deleted_counts["user_stats"] = result.rowcount
# 3. Bets
result = await db.execute(delete(SpreadBet))
deleted_counts["spread_bets"] = result.rowcount
result = await db.execute(delete(Bet))
deleted_counts["bets"] = result.rowcount
# 4. Events (only if not preserving)
if not request.preserve_events:
result = await db.execute(delete(SportEvent))
deleted_counts["events"] = result.rowcount
else:
events_count = (await db.execute(select(func.count(SportEvent.id)))).scalar() or 0
preserved_counts["events"] = events_count
# 5. Transactions
if admin_user_ids:
result = await db.execute(
delete(Transaction).where(Transaction.user_id.notin_(admin_user_ids))
)
else:
result = await db.execute(delete(Transaction))
deleted_counts["transactions"] = result.rowcount
# 6. Wallets (preserve admin wallets, but reset them)
if admin_user_ids:
# Delete non-admin wallets
result = await db.execute(
delete(Wallet).where(Wallet.user_id.notin_(admin_user_ids))
)
deleted_counts["wallets"] = result.rowcount
# Reset admin wallets
from decimal import Decimal
admin_wallets = await db.execute(
select(Wallet).where(Wallet.user_id.in_(admin_user_ids))
)
for wallet in admin_wallets.scalars():
wallet.balance = Decimal("1000.00")
wallet.escrow = Decimal("0.00")
preserved_counts["admin_wallets"] = len(admin_user_ids)
else:
result = await db.execute(delete(Wallet))
deleted_counts["wallets"] = result.rowcount
# 7. Users (preserve admins)
if admin_user_ids:
result = await db.execute(
delete(User).where(User.id.notin_(admin_user_ids))
)
# Reset admin user stats
admin_users = await db.execute(
select(User).where(User.id.in_(admin_user_ids))
)
for user in admin_users.scalars():
user.total_bets = 0
user.wins = 0
user.losses = 0
user.win_rate = 0.0
else:
result = await db.execute(delete(User))
deleted_counts["users"] = result.rowcount
# Log the wipe before committing
await log_data_wipe(
db=db,
admin=admin,
deleted_counts=deleted_counts,
preserved_counts=preserved_counts,
ip_address=ip_address,
)
await db.commit()
return WipeResponse(
success=True,
message="Database wiped successfully",
deleted_counts=deleted_counts,
preserved_counts=preserved_counts,
executed_at=datetime.utcnow(),
executed_by=admin.username,
)