游戏市场数据分析与预测系统

Computer Game Market Data Analysis & Forecasting System

PostgreSQL · Random Forest · Gradient Boosting · ARIMA · Prophet

简介

Introduction

本项目是一个面向游戏虚拟物品市场的自动化数据采集与智能分析系统。系统基于 Darker Market API,构建了从数据采集、清洗去重、数据库存储到价格预测与时间序列分析的一体化数据管道。

系统支持多物品的动态配置与自动发现机制,可在无需修改代码的情况下扩展新的物品类型;在数据层面,采用 PostgreSQL 进行集中管理,并实现批量写入与重复数据控制;在分析层面,集成多种机器学习模型与时间序列模型(Random Forest、Gradient Boosting、LSTM、ARIMA、Prophet 等),用于价格预测、趋势分析与风险评估。

该项目完整覆盖了真实业务场景下的数据工程流程与分析建模流程,体现了对自动化数据采集、可扩展系统设计以及数据驱动决策支持系统的实践能力。

This project is an automated data collection and analytics system for in-game virtual item markets. Built on the Darker Market API, it implements an end-to-end pipeline from data ingestion, cleaning and deduplication, database storage, to price forecasting and time-series analysis.

The system supports dynamic multi-item configuration and auto-discovery, allowing new item types to be added without code changes. Data is stored in PostgreSQL with batch writes and duplicate control. On the analytics side, it integrates multiple ML and time-series models (Random Forest, Gradient Boosting, LSTM, ARIMA, Prophet, etc.) for price prediction, trend analysis, and risk assessment.

The project covers the full data-engineering and modeling workflow in a real business setting, demonstrating practical ability in automated data collection, scalable system design, and data-driven decision support.

核心代码

Core Code

数据采集模块 — API 数据收集类

Data collection — API collector (DarkerMarketAPI)

class DarkerMarketAPI:
    def __init__(self):
        self.item = "Iron Ore"
        self.seen_records = set()  # 去重集合
        self.no_new_data_count = 0  # 连续无新数据计数
        self.max_no_new_data = 3   # 自动停止阈值

    def run(self):
        self.db = DarkerMarketDB(items=self.item.replace(" ", "_").lower())
        self._load_existing_records()
        while self.page < 200 and self.no_new_data_count < 3:
            new_data_count = self.get_market_data()
            if new_data_count > 0:
                self.no_new_data_count = 0
            else:
                self.no_new_data_count += 1
            self.page += 1
        self.total_inserted = self.db.batch_insert_all_data()
        self.save_data_to_csv()

    def get_market_data(self):
        response = requests.get(url, params=params, headers=self.headers)
        new_data = []
        for item in data["body"]:
            record_key = f"{item['id']}_{item['created_at']}"
            if record_key not in self.seen_records:
                new_data.append(item)
                self.seen_records.add(record_key)
        self.db.add_data(new_data)
        return len(new_data)

数据库模块 — 批量插入与去重

Database — batch insert & deduplication

class DarkerMarketDB:
    def batch_insert_all_data(self):
        cursor = self.connector.cursor()
        cursor.execute("SELECT item_id, created_at FROM " + self.items)
        existing_records = set()
        for row in cursor.fetchall():
            existing_records.add(f"{row[0]}_{row[1]}")

        data_to_insert = []
        for item in self.all_data:
            record_key = f"{item['id']}_{item['created_at']}"
            if record_key not in existing_records:
                data_to_insert.append((
                    item_id, item_name, quantity, price_per_unit,
                    price, has_sold, created_at, datetime.now()
                ))
                existing_records.add(record_key)

        if data_to_insert:
            cursor.executemany(insert_query, data_to_insert)
            self.connector.commit()
        return len(data_to_insert)

    def export_to_csv(self):
        query = "SELECT * FROM " + self.items + " ORDER BY created_at DESC"
        df = pd.read_sql_query(query, self.connector)
        df.to_csv(self.path, index=False, encoding='utf-8')

机器学习模块 — 特征工程(防数据泄露)

ML module — feature engineering (no leakage)

def _feature_engineering(self):
    # 时间特征
    self.df['hour'] = self.df['created_at'].dt.hour
    self.df['day_of_week'] = self.df['created_at'].dt.dayofweek
    self.df['is_weekend'] = (self.df['day_of_week'] >= 5).astype(int)

    # 滞后特征(避免数据泄露)
    for lag in [1, 2, 3, 5, 10]:
        self.df['price_lag_%d' % lag] = self.df['price_per_unit'].shift(lag)
        self.df['quantity_lag_%d' % lag] = self.df['quantity'].shift(lag)

    # 移动平均(基于滞后价格)
    for window in [3, 5, 7]:
        self.df['price_ma_%d' % window] = \
            self.df['price_lag_1'].rolling(window, min_periods=1).mean()

    # 波动性特征
    for window in [5, 10]:
        self.df['price_volatility_%d' % window] = \
            self.df['price_lag_1'].rolling(window, min_periods=1).std()

    # 交互特征
    self.df['price_quantity_ratio'] = \
        self.df['price'] / (self.df['quantity'] + 1)

机器学习模块 — 多模型训练与集成

ML module — multi-model training & ensemble

def train_models(self):
    models = {
        'Random Forest': RandomForestRegressor(n_estimators=200, max_depth=12),
        'Gradient Boosting': GradientBoostingRegressor(n_estimators=200, max_depth=6),
        'Ridge Regression': Ridge(alpha=0.1),
        'Extra Trees': ExtraTreesRegressor(n_estimators=200),
        'MLP': MLPRegressor(hidden_layers=(100, 50))
    }
    for name, model in models.items():
        model.fit(self.X_train_scaled, self.y_train)
        cv_scores = cross_val_score(model, self.X_train_scaled, self.y_train, cv=5, scoring='r2')
        self.models[name] = {'model': model, 'cv_r2_mean': cv_scores.mean(), ...}

def create_ensemble_models(self):
    top_models = sorted(self.models.items(), key=lambda x: x[1]['cv_r2_mean'], reverse=True)[:3]
    weights = self._calculate_dynamic_weights(top_models)
    voting_ensemble = VotingRegressor(
        estimators=[(n, m['model']) for n, m in top_models],
        weights=weights
    )
    return voting_ensemble

任务调度模块 — 异步执行与配置

Scheduler — async execution & config

# core_scheduler.py
class SmartScheduler:
    def run_script_async(self, script_path, script_name):
        def run():
            success = self.run_script(script_path, script_name)
        thread = threading.Thread(target=run, name="AsyncTask-" + script_name, daemon=True)
        thread.start()
        return thread

    def add_task_from_config(self, task_config):
        if task_config["schedule_type"] == "daily":
            schedule.every().day.at(task_config["schedule_value"]).do(
                self.run_script_async, script_path=task_config["script_path"], script_name=task_config["name"]
            )
        elif task_config["schedule_type"] == "hourly":
            schedule.every().hour.do(...)

    def start_scheduler(self):
        while True:
            schedule.run_pending()
            time.sleep(1)

# task_config.py
TASKS = [
    {"name": "Iron Ore API", "script_path": "src/api/Iron_Ore_API.py",
     "schedule_type": "daily", "schedule_value": "23:00", "enabled": True},
]