求助天启悖论数据拉取问题

各位大佬们,请问天启悖论角色寝室数据怎么拉取,这个游戏是能拉全量寝室数据还是只能供档,供档的话也麻烦大佬们给个方法,我这里有账号,谢谢

https://cdne-paripari-prod.tenkei-paradox.com/production/production9-864dc0b0-6f1a-4aaa-bb36-703600ea7966/Assets/Android/base_catalog.json

https://cdne-paripari-prod.tenkei-paradox.com/production/production9-864dc0b0-6f1a-4aaa-bb36-703600ea7966/Assets/Android/vfx_catalog.json

https://cdne-paripari-prod.tenkei-paradox.com/production/production9-864dc0b0-6f1a-4aaa-bb36-703600ea7966/Assets/Android/pp_sounds_catalog.json

https://cdne-paripari-prod.tenkei-paradox.com/production/production9-864dc0b0-6f1a-4aaa-bb36-703600ea7966/Assets/Android/assets_catalog.json

https://cdne-paripari-prod.tenkei-paradox.com/production/production9-864dc0b0-6f1a-4aaa-bb36-703600ea7966/Assets/Android/3dModels_catalog.json

https://cdne-paripari-prod.tenkei-paradox.com/production/production9-864dc0b0-6f1a-4aaa-bb36-703600ea7966/Assets/Android/pp_sounds_data2_catalog.json

https://cdne-paripari-prod.tenkei-paradox.com/production/production9-864dc0b0-6f1a-4aaa-bb36-703600ea7966/Assets/Android/pp_ui_catalog.json

https://cdne-paripari-prod.tenkei-paradox.com/production/production9-864dc0b0-6f1a-4aaa-bb36-703600ea7966/Assets/Android/pp_assets_data1_catalog.json

https://cdne-paripari-prod.tenkei-paradox.com/production/production9-864dc0b0-6f1a-4aaa-bb36-703600ea7966/Assets/Android/pp_sounds_data1_catalog.json

拼接样例

https://cdne-paripari-prod.tenkei-paradox.com/production/production9-864dc0b0-6f1a-4aaa-bb36-703600ea7966/Assets/Android/bgm_assets_bgm/bgm_1001_loop_562ec835128129a72cd196b1ebf7fc57.bundle

不知道全不全

你看看这个资产下载脚本,剧本我也没找到

import os
import re
import json
import time
import argparse
import threading
from pathlib import Path
from typing import Any, List, Set, Tuple, Dict
from urllib.parse import urlparse, unquote, quote

import requests
import urllib3
from concurrent.futures import ThreadPoolExecutor, as_completed

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

# =========================
# Part 1: Configuration
# =========================
API_URL = "https://app-paripari-prod.tenkei-paradox.com/api/Environment/EnvConfiguration"
CDN_PREFIX = "https://cdne-paripari-prod.tenkei-paradox.com/production/"

SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
DOWNLOAD_DIR = os.path.join(SCRIPT_DIR, '天啓パラドクスX资产')
CATALOG_DIR = os.path.join(SCRIPT_DIR, '天啓パラドクスX配置')
RECORD_FILE = os.path.join(SCRIPT_DIR, '天啓パラドクスX下载记录.json')
URLS_FILE = os.path.join(SCRIPT_DIR, '天啓パラドクスX下载链接.txt')
FAILED_URLS_FILE = os.path.join(SCRIPT_DIR, '天啓パラドクスX下载失败链接.json')
VERSION_FILE = os.path.join(SCRIPT_DIR, '天啓パラドクスX版本信息.json')

MAX_THREADS = 8
SAVE_INTERVAL = 2
CHUNK_SIZE = 1024 * 1024
AUTO_START = True

RED = '\033[91m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
RESET = '\033[0m'

record_lock = threading.Lock()

CATALOG_FILES = [
    'base_catalog.json',
    'vfx_catalog.json',
    'assets_catalog.json',
    '3dModels_catalog.json',
    'pp_sounds_catalog.json',
    'pp_ui_catalog.json',
    'pp_sounds_data1_catalog.json',
    'pp_assets_data1_catalog.json',
    'pp_sounds_data2_catalog.json',
]


# =========================
# Part 2: API & Version Management
# =========================
def get_latest_version_info() -> Dict[str, Any] | None:
    """
    从 EnvConfiguration API 获取最新版本信息。
    返回包含 version_id, base_url, assets_url 的字典。
    """
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
        'Accept': '*/*',
        'Accept-Encoding': 'gzip, deflate, br',
        'Connection': 'keep-alive'
    }
    
    try:
        print(f"  正在获取云端版本信息: {API_URL}")
        resp = requests.get(API_URL, headers=headers, timeout=30, verify=False)
        if resp.status_code != 200:
            print(f"  {RED}API请求失败: {resp.status_code}{RESET}")
            return None
        
        data = resp.json()
        assets_url = data.get('result', {}).get('assetsUrl', '')
        if not assets_url:
            print(f"  {RED}API响应中缺少 assetsUrl{RESET}")
            return None
        
        version_match = re.search(r'production/(production\d+-[a-f0-9-]+)/', assets_url)
        if not version_match:
            print(f"  {RED}无法从 assetsUrl 解析版本号{RESET}")
            return None
        
        version_id = version_match.group(1)
        base_url = f"{CDN_PREFIX}{version_id}/"
        
        return {
            'version_id': version_id,
            'base_url': base_url,
            'assets_url': assets_url,
            'api_response': data
        }
    except Exception as e:
        print(f"  {RED}获取版本信息错误: {e}{RESET}")
        return None


def load_version_record() -> Dict[str, Any]:
    """加载本地版本记录文件。"""
    if os.path.exists(VERSION_FILE):
        with open(VERSION_FILE, 'r', encoding='utf-8') as f:
            return json.load(f)
    return {
        'last_version_id': '',
        'last_check_time': '',
        'version_history': []
    }


def save_version_record(record: Dict[str, Any]):
    """保存版本记录到文件。"""
    with open(VERSION_FILE, 'w', encoding='utf-8') as f:
        json.dump(record, f, ensure_ascii=False, indent=2)


def compare_versions(old_version: str, new_version: str) -> Tuple[str, bool]:
    """
    对比两个版本号,返回状态描述和是否需要更新。
    """
    if not old_version:
        return "首次运行", True
    
    if old_version == new_version:
        return "版本相同", False
    
    old_num = re.search(r'production(\d+)', old_version)
    new_num = re.search(r'production(\d+)', new_version)
    
    if old_num and new_num:
        old_n = int(old_num.group(1))
        new_n = int(new_num.group(1))
        if new_n > old_n:
            return f"版本升级: {old_n} -> {new_n}", True
        elif new_n < old_n:
            return f"版本降级: {old_n} -> {new_n} (异常)", True
    
    return f"版本变更: {old_version} -> {new_version}", True


# =========================
# Part 3: Catalog Management
# =========================
def get_catalog_urls(base_url: str) -> Dict[str, str]:
    """生成所有 catalog 文件的 URL 映射。"""
    return {
        'settings.json': base_url + 'WebGL/StreamingAssets/aa/settings.json',
        'catalog.json': base_url + 'WebGL/StreamingAssets/aa/catalog.json',
        'base_catalog.json': base_url + 'Assets/WebGL/base_catalog.json',
        'vfx_catalog.json': base_url + 'Assets/WebGL/vfx_catalog.json',
        'assets_catalog.json': base_url + 'Assets/WebGL/assets_catalog.json',
        '3dModels_catalog.json': base_url + 'Assets/WebGL/3dModels_catalog.json',
        'pp_sounds_catalog.json': base_url + 'Assets/WebGL/pp_sounds_catalog.json',
        'pp_ui_catalog.json': base_url + 'Assets/WebGL/pp_ui_catalog.json',
        'pp_sounds_data1_catalog.json': base_url + 'Assets/WebGL/pp_sounds_data1_catalog.json',
        'pp_assets_data1_catalog.json': base_url + 'Assets/WebGL/pp_assets_data1_catalog.json',
        'pp_sounds_data2_catalog.json': base_url + 'Assets/WebGL/pp_sounds_data2_catalog.json',
    }


def download_catalogs(base_url: str, force_update: bool = False) -> Tuple[List[str], List[str]]:
    """
    下载 catalog 文件到配置文件夹。
    返回 (已下载列表, 已跳过列表)。
    """
    os.makedirs(CATALOG_DIR, exist_ok=True)
    
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
        'Accept': '*/*',
        'Accept-Encoding': 'gzip, deflate, br',
        'Connection': 'keep-alive'
    }
    
    session = requests.Session()
    catalog_urls = get_catalog_urls(base_url)
    
    downloaded = []
    skipped = []
    
    for name, url in catalog_urls.items():
        save_path = os.path.join(CATALOG_DIR, name)
        
        if not force_update and os.path.exists(save_path):
            skipped.append(name)
            continue
        
        try:
            resp = session.get(url, headers=headers, timeout=30, verify=False)
            if resp.status_code == 200:
                with open(save_path, 'w', encoding='utf-8') as f:
                    f.write(resp.text)
                downloaded.append(name)
            else:
                print(f"    {RED}{name}: HTTP {resp.status_code}{RESET}")
        except Exception as e:
            print(f"    {RED}{name}: {e}{RESET}")
    
    return downloaded, skipped


def parse_catalogs() -> List[str]:
    """解析所有 catalog 文件(从配置文件夹),提取 bundle 名称列表。"""
    bundles = set()
    bundle_pattern = r'"([^"]*\.bundle)"'
    
    for catalog_file in CATALOG_FILES:
        filepath = os.path.join(CATALOG_DIR, catalog_file)
        if not os.path.exists(filepath):
            continue
        
        with open(filepath, 'r', encoding='utf-8') as f:
            content = f.read()
        
        found = re.findall(bundle_pattern, content)
        for bundle in found:
            bundles.add(bundle)
    
    return list(bundles)


# =========================
# Part 4: Bundle Path Conversion
# =========================
def convert_bundle_path(bundle_path: str) -> str:
    """将 bundle 名称转换为完整的下载路径。"""
    if bundle_path.startswith('PariPariPublicRemote/'):
        return 'Assets/WebGL/' + bundle_path[len('PariPariPublicRemote/'):]
    elif bundle_path.startswith('PariPariRemote/'):
        return 'Assets/WebGL/' + bundle_path[len('PariPariRemote/'):]
    elif bundle_path.startswith('{UnityEngine.AddressableAssets.Addressables.RuntimePath}/'):
        return 'WebGL/StreamingAssets/aa/' + bundle_path[len('{UnityEngine.AddressableAssets.Addressables.RuntimePath}/'):]
    elif bundle_path.startswith('Assets/'):
        return bundle_path
    else:
        return 'Assets/WebGL/' + bundle_path


def safe_filename(s: str) -> str:
    """将字符串转换为安全的文件名。"""
    return re.sub(r'[^A-Za-z0-9._-]+', "_", s)


# =========================
# Part 5: Download Record Management
# =========================
def load_record() -> Dict[str, Any]:
    """加载下载记录文件。"""
    if os.path.exists(RECORD_FILE):
        with open(RECORD_FILE, 'r', encoding='utf-8') as f:
            return json.load(f)
    return {
        'last_update': '',
        'current_version': '',
        'total_count': 0,
        'downloaded_bundles': [],
        'bundle_hashes': {}
    }


def save_record(record: Dict[str, Any]):
    """保存下载记录到文件。"""
    with record_lock:
        with open(RECORD_FILE, 'w', encoding='utf-8') as f:
            json.dump(record, f, ensure_ascii=False, indent=2)


def get_local_bundles() -> Set[str]:
    """扫描本地目录,获取已存在的 bundle 文件集合。"""
    local = set()
    if os.path.exists(DOWNLOAD_DIR):
        for root, dirs, files in os.walk(DOWNLOAD_DIR):
            for file in files:
                if file.endswith('.bundle'):
                    local.add(file)
    return local


# =========================
# Part 6: Download Functions
# =========================
def download_stream(url: str, out_path: str, timeout: int = 60, retries: int = 3) -> Tuple[bool, int, int]:
    """
    流式下载文件,支持重试。
    返回 (成功, 状态码, 字节数)。
    """
    headers = {"User-Agent": "Mozilla/5.0", "Accept": "*/*"}
    
    for attempt in range(1, retries + 1):
        try:
            with requests.get(url, stream=True, timeout=timeout, headers=headers, verify=False) as r:
                sc = r.status_code
                if sc != 200:
                    return False, sc, 0
                
                tmp = out_path + ".part"
                total = 0
                with open(tmp, "wb") as f:
                    for chunk in r.iter_content(CHUNK_SIZE):
                        if chunk:
                            f.write(chunk)
                            total += len(chunk)
                os.replace(tmp, out_path)
            return True, 200, total
        except Exception as e:
            if attempt == retries:
                print(f"{RED}[FAIL]{RESET} {url} -> {e}")
                return False, -1, 0
            time.sleep(0.8 * attempt)
    return False, -1, 0


def download_file(bundle_name: str, base_url: str, session: requests.Session, headers: Dict[str, str]) -> Tuple[bool, str, int, int]:
    """
    下载单个 bundle 文件。
    返回 (成功, bundle名, 状态码/错误, 大小)。
    """
    converted_path = convert_bundle_path(bundle_name)
    encoded_path = quote(converted_path, safe='/')
    url = base_url + encoded_path
    save_path = os.path.join(DOWNLOAD_DIR, os.path.basename(bundle_name))
    
    try:
        resp = session.get(url, headers=headers, timeout=60, verify=False)
        if resp.status_code == 200:
            with open(save_path, 'wb') as f:
                f.write(resp.content)
            return True, bundle_name, 200, len(resp.content)
        return False, bundle_name, resp.status_code, 0
    except Exception as e:
        return False, bundle_name, str(e), 0


def draw_progress_bar(progress: float, completed: int, total: int, remaining_time: float):
    """绘制进度条。"""
    bar_length = 50
    filled_length = int(bar_length * progress)
    
    if progress < 0.3:
        color = RED
    elif progress < 0.7:
        color = YELLOW
    else:
        color = GREEN
    
    bar = f"{color}{'█' * filled_length}{RESET}{'-' * (bar_length - filled_length)}"
    
    hours = int(remaining_time // 3600)
    minutes = int((remaining_time % 3600) // 60)
    seconds = int(remaining_time % 60)
    
    if hours > 0:
        time_str = f"{hours}时{minutes}分{seconds}秒"
    elif minutes > 0:
        time_str = f"{minutes}分{seconds}秒"
    else:
        time_str = f"{seconds}秒"
    
    print(f"\r[{bar}] {completed}/{total} ({progress*100:.1f}%) 剩余: {time_str}", end='', flush=True)


def generate_urls_file(bundles: List[str], base_url: str):
    """生成下载链接文件。"""
    with open(URLS_FILE, 'w', encoding='utf-8') as f:
        f.write(f"Total: {len(bundles)}\n")
        f.write(f"Base URL: {base_url}\n\n")
        for bundle in bundles:
            converted_path = convert_bundle_path(bundle)
            url = base_url + converted_path
            f.write(f"{url}\n")
    print(f"{GREEN}[OK]{RESET} 下载链接已保存到: {GREEN}{URLS_FILE}{RESET}")


def save_failed_urls_json(failed_bundles: List[Tuple[str, str, str]], base_url: str, version_id: str = ''):
    """保存下载失败链接到 JSON 文件(实时更新)。"""
    data = {
        'total_failed': len(failed_bundles),
        'base_url': base_url,
        'version_id': version_id,
        'update_time': time.strftime('%Y-%m-%d %H:%M:%S'),
        'failed_urls': [
            {
                'bundle': bundle_name,
                'url': url,
                'error': error
            }
            for bundle_name, url, error in failed_bundles
        ]
    }
    with open(FAILED_URLS_FILE, 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=2)


def download_bundles(new_bundles: List[str], base_url: str, bundles: List[str], 
                     current_version_id: str, downloaded_bundles: Set[str],
                     record: Dict[str, Any], version_record: Dict[str, Any]) -> Tuple[int, int, int]:
    """
    执行 bundle 下载。
    返回 (成功数, 失败数, 总大小)。
    """
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
        'Accept': '*/*',
        'Accept-Encoding': 'gzip, deflate, br',
        'Connection': 'keep-alive'
    }
    
    session = requests.Session()
    
    completed = 0
    failed = 0
    total_size = 0
    start_time = time.time()
    last_save_time = start_time
    failed_bundles_list: List[Tuple[str, str, str]] = []
    
    with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
        futures = {executor.submit(download_file, bundle, base_url, session, headers): bundle for bundle in new_bundles}
        
        for future in as_completed(futures):
            success, bundle_name, status, size = future.result()
            
            if success:
                completed += 1
                total_size += size
                downloaded_bundles.add(os.path.basename(bundle_name))
            else:
                failed += 1
                converted_path = convert_bundle_path(bundle_name)
                url = base_url + converted_path
                error_msg = f"HTTP {status}" if isinstance(status, int) else str(status)
                failed_bundles_list.append((bundle_name, url, error_msg))
                save_failed_urls_json(failed_bundles_list, base_url, current_version_id)
            
            current_time = time.time()
            if current_time - last_save_time >= SAVE_INTERVAL:
                record['downloaded_bundles'] = list(downloaded_bundles)
                record['total_count'] = len(bundles)
                record['current_version'] = current_version_id
                record['last_update'] = time.strftime('%Y-%m-%d %H:%M:%S')
                save_record(record)
                last_save_time = current_time
            
            progress = (completed + failed) / len(new_bundles)
            elapsed = current_time - start_time
            if completed > 0:
                avg_time = elapsed / completed
                remaining_time = avg_time * (len(new_bundles) - completed - failed)
            else:
                remaining_time = 0
            
            draw_progress_bar(progress, completed + failed, len(new_bundles), remaining_time)
    
    print()
    
    elapsed = time.time() - start_time
    
    record['downloaded_bundles'] = list(downloaded_bundles)
    record['total_count'] = len(bundles)
    record['current_version'] = current_version_id
    record['last_update'] = time.strftime('%Y-%m-%d %H:%M:%S')
    save_record(record)
    
    version_record['last_version_id'] = current_version_id
    version_record['last_check_time'] = time.strftime('%Y-%m-%d %H:%M:%S')
    existing_versions = [v.get('version_id') for v in version_record.get('version_history', [])]
    if current_version_id not in existing_versions:
        version_record.setdefault('version_history', []).append({
            'version_id': current_version_id,
            'check_time': time.strftime('%Y-%m-%d %H:%M:%S'),
            'bundle_count': len(bundles),
            'downloaded_count': completed
        })
    save_version_record(version_record)
    
    if failed_bundles_list:
        print(f"{RED}[FAIL]{RESET} 失败链接已实时保存到: {RED}{FAILED_URLS_FILE}{RESET}")
    
    print(f"\n{'=' * 60}")
    print(f"下载完成!")
    print(f"  版本: {current_version_id}")
    print(f"  成功: {GREEN}{completed}{RESET}")
    print(f"  失败: {RED}{failed}{RESET}")
    print(f"  总大小: {total_size / 1024 / 1024:.2f} MB")
    print(f"  耗时: {elapsed:.1f} 秒")
    if elapsed > 0:
        print(f"  速度: {total_size / 1024 / 1024 / elapsed:.2f} MB/s")
    print(f"{'=' * 60}")
    
    return completed, failed, total_size


# =========================
# Part 7: Main Pipeline
# =========================
def run_get_version() -> Tuple[str, str] | None:
    """
    获取云端版本信息。
    返回 (version_id, base_url) 或 None。
    """
    print(f"{YELLOW}[*]{RESET} GET {API_URL}")
    version_info = get_latest_version_info()
    
    if not version_info:
        print(f"{RED}[WARN]{RESET} 无法获取云端版本信息")
        return None
    
    version_id = version_info['version_id']
    base_url = version_info['base_url']
    
    print(f"{GREEN}[OK]{RESET} 当前云端版本: {GREEN}{version_id}{RESET}")
    print(f"{GREEN}[OK]{RESET} Base URL: {base_url}")
    
    return version_id, base_url


def run_download_catalogs(base_url: str, force_update: bool) -> Tuple[List[str], List[str]]:
    """
    下载 catalog 文件。
    返回 (已下载列表, 已跳过列表)。
    """
    print(f"{YELLOW}[*]{RESET} 检查 Catalog 文件...")
    downloaded, skipped = download_catalogs(base_url, force_update=force_update)
    
    if downloaded:
        print(f"{GREEN}[OK]{RESET} 已更新: {GREEN}{len(downloaded)}{RESET} 个 catalog 文件")
        for name in downloaded:
            print(f"    {GREEN}- {name}{RESET}")
    
    if skipped:
        print(f"{YELLOW}[SKIP]{RESET} 已跳过(未变更): {YELLOW}{len(skipped)}{RESET} 个")
    
    return downloaded, skipped


def run_parse_catalogs() -> List[str]:
    """
    解析 catalog 文件获取 bundle 列表。
    返回 bundle 名称列表。
    """
    print(f"{YELLOW}[*]{RESET} 解析 Catalog...")
    bundles = parse_catalogs()
    print(f"{GREEN}[OK]{RESET} 发现 {GREEN}{len(bundles)}{RESET} 个资源文件")
    return bundles


def run_calculate_diff(bundles: List[str], local_version_id: str, need_update: bool) -> Tuple[List[str], Set[str]]:
    """
    计算差异更新。
    返回 (待下载列表, 已有文件集合)。
    """
    print(f"{YELLOW}[*]{RESET} 加载下载记录...")
    record = load_record()
    downloaded_bundles = set(record.get('downloaded_bundles', []))
    print(f"{GREEN}[OK]{RESET} 已下载记录: {GREEN}{len(downloaded_bundles)}{RESET} 个")
    
    print(f"{YELLOW}[*]{RESET} 扫描本地文件...")
    local_bundles = get_local_bundles()
    print(f"{GREEN}[OK]{RESET} 本地文件: {GREEN}{len(local_bundles)}{RESET} 个")
    
    existing_bundles = downloaded_bundles | local_bundles
    new_bundles = [b for b in bundles if os.path.basename(b) not in existing_bundles]
    
    print(f"\n{YELLOW}[*]{RESET} 计算差异更新...")
    print(f"  云端总数: {GREEN}{len(bundles)}{RESET}")
    print(f"  已有文件: {GREEN}{len(existing_bundles)}{RESET}")
    print(f"  待下载: {YELLOW}{len(new_bundles)}{RESET}")
    
    if need_update and local_version_id:
        removed_bundles = [b for b in downloaded_bundles if b not in [os.path.basename(x) for x in bundles]]
        if removed_bundles:
            print(f"  {YELLOW}可能已移除: {len(removed_bundles)} 个旧版本文件{RESET}")
    
    return new_bundles, existing_bundles


def main():
    ap = argparse.ArgumentParser(description="天啓パラドクスX 资产下载器 (云端实时更新版)")
    ap.add_argument("--workdir", default=".", help="工作目录(默认当前目录)")
    ap.add_argument("--download-dir", default=None, help="下载目录(默认工作目录下的 天啓パラドクスX资产)")
    ap.add_argument("--threads", type=int, default=MAX_THREADS, help=f"下载线程数(默认 {MAX_THREADS})")
    ap.add_argument("--no-download", action="store_true", help="跳过下载(只生成 URL 文件)")
    ap.add_argument("--force-catalog", action="store_true", help="强制更新 catalog 文件")
    args = ap.parse_args()
    
    workdir = os.path.abspath(args.workdir)
    os.makedirs(workdir, exist_ok=True)
    os.chdir(workdir)
    
    if args.download_dir:
        global DOWNLOAD_DIR, CATALOG_DIR, RECORD_FILE, URLS_FILE, FAILED_URLS_FILE, VERSION_FILE
        DOWNLOAD_DIR = os.path.abspath(args.download_dir)
        CATALOG_DIR = os.path.join(workdir, '天啓パラドクスX配置')
        RECORD_FILE = os.path.join(workdir, '天啓パラドクスX下载记录.json')
        URLS_FILE = os.path.join(workdir, '天啓パラドクスX下载链接.txt')
        FAILED_URLS_FILE = os.path.join(workdir, '天啓パラドクスX下载失败链接.json')
        VERSION_FILE = os.path.join(workdir, '天啓パラドクスX版本信息.json')
    
    print("=" * 60)
    print(f"{GREEN}天啓パラドクスX 资产下载器 (云端实时更新版){RESET}")
    print("=" * 60)
    print(f"工作目录: {YELLOW}{workdir}{RESET}")
    print()
    
    print(f"{YELLOW}[Step 1/5]{RESET} 获取云端最新版本信息...")
    result = run_get_version()
    
    if not result:
        print(f"\n{RED}无法获取云端版本信息,尝试使用本地记录...{RESET}")
        version_record = load_version_record()
        if version_record.get('last_version_id'):
            version_id = version_record['last_version_id']
            base_url = f"{CDN_PREFIX}{version_id}/"
            result = (version_id, base_url)
        else:
            print(f"{RED}无本地版本记录,请检查网络连接{RESET}")
            return
    
    current_version_id, base_url = result
    
    print(f"\n{YELLOW}[Step 2/5]{RESET} 加载本地版本记录...")
    version_record = load_version_record()
    local_version_id = version_record.get('last_version_id', '')
    print(f"{GREEN}[OK]{RESET} 本地记录版本: {GREEN}{local_version_id or '无记录'}{RESET}")
    
    version_status, need_update = compare_versions(local_version_id, current_version_id)
    status_color = GREEN if need_update else YELLOW
    print(f"{GREEN}[OK]{RESET} 版本对比结果: {status_color}{version_status}{RESET}")
    
    force_catalog = args.force_catalog or need_update
    
    print(f"\n{YELLOW}[Step 3/5]{RESET} 下载 Catalog 文件...")
    run_download_catalogs(base_url, force_catalog)
    
    print(f"\n{YELLOW}[Step 4/5]{RESET} 解析 Catalog 并计算差异...")
    bundles = run_parse_catalogs()
    new_bundles, existing_bundles = run_calculate_diff(bundles, local_version_id, need_update)
    
    if not new_bundles:
        print(f"\n{GREEN}所有资源已下载完成!无需更新{RESET}")
        
        version_record['last_version_id'] = current_version_id
        version_record['last_check_time'] = time.strftime('%Y-%m-%d %H:%M:%S')
        existing_versions = [v.get('version_id') if isinstance(v, dict) else v for v in version_record.get('version_history', [])]
        if current_version_id not in existing_versions:
            version_record.setdefault('version_history', []).append({
                'version_id': current_version_id,
                'check_time': time.strftime('%Y-%m-%d %H:%M:%S'),
                'bundle_count': len(bundles)
            })
        save_version_record(version_record)
        
        record = load_record()
        record['current_version'] = current_version_id
        record['total_count'] = len(bundles)
        save_record(record)
        return
    
    print(f"\n{YELLOW}[Step 5/5]{RESET} 生成下载链接...")
    generate_urls_file(new_bundles, base_url)
    
    if args.no_download:
        print(f"\n{YELLOW}[SKIP]{RESET} 跳过下载 (--no-download 已指定)")
        print(f"{GREEN}[OK]{RESET} URL 文件已生成: {GREEN}{URLS_FILE}{RESET}")
        return
    
    if AUTO_START:
        print(f"\n{GREEN}自动开始下载...{RESET}")
    else:
        confirm = input(f"\n是否开始下载? (y/n): ").strip().lower()
        if confirm != 'y':
            print(f"{YELLOW}已取消下载{RESET}")
            return
    
    print(f"\n{GREEN}开始下载 {YELLOW}{len(new_bundles)}{RESET} 个文件...")
    print(f"  线程数: {GREEN}{args.threads}{RESET}")
    print(f"  保存到: {YELLOW}{DOWNLOAD_DIR}{RESET}")
    print()
    
    record = load_record()
    downloaded_bundles = set(record.get('downloaded_bundles', []))
    
    download_bundles(
        new_bundles, base_url, bundles,
        current_version_id, downloaded_bundles,
        record, version_record
    )
    
    print("\n" + "=" * 60)
    print(f"{GREEN}完成!{RESET}")
    print("=" * 60)


if __name__ == '__main__':
    main()
```粗体文本

你是要弄离线吗?:star_struck:

我记得剧本是鉴权的,供档简单抓取就行。

能否使用此脚本检索场景?

检索场景比较困难,如果可以检索到,我愿意提供帮助。