绅士冒险半个月前抓的。具体资源清单hash的逻辑没仔细看。
下载清单
import os
import requests
import urllib3
from concurrent.futures import ThreadPoolExecutor
import time
# 忽略 InsecureRequestWarning 警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 下载单个文件的函数
def download_file(link, download_dir):
# 从链接中提取文件名(保留原始文件名)
path_part = link.split("?")[0].split("/")
file_name = path_part[-1] if path_part[-1] else f"file_{hash(link) % 1000000}.bin"
file_path = os.path.join(download_dir, file_name)
headers = {
"Accept": "*/*",
"Accept-Encoding": "deflate, gzip",
"User-Agent": "UnityPlayer/2020.3.41f1 (UnityWebRequest/1.0, libcurl/7.84.0-DEV)",
"X-Unity-Version": "2020.3.41f1",
"Host": "nidg.bhjsj8.com"
}
try:
# 禁用 SSL 证书验证
response = requests.get(link, headers=headers, verify=False)
response.raise_for_status()
# 确定响应内容类型
content_type = response.headers.get('Content-Type', '')
# 根据内容类型选择写入模式
if 'json' in content_type or 'text' in content_type:
# 文本内容
with open(file_path, 'w', encoding='utf-8') as file:
file.write(response.text)
print(f"下载完成: {file_name} (文本模式)")
else:
# 二进制内容
with open(file_path, 'wb') as file:
file.write(response.content)
print(f"下载完成: {file_name} (二进制模式)")
# 打印文件信息
file_size = os.path.getsize(file_path)
print(f"文件大小: {file_size} 字节")
print(f"保存路径: {os.path.abspath(file_path)}")
except requests.RequestException as e:
print(f"请求过程中出现错误: {e}")
except Exception as e:
print(f"下载过程中出现未知错误: {e}")
# 主下载函数
def main_download():
# 生成时间戳
ts = int(time.time())
# 下载链接
base_link = "https://nidg.bhjsj8.com/new_patch/files/index.txt.c47c53a61a6b650788724579ebab4da9"
download_link = [f"{base_link}?ts={ts}"]
# 确保下载目录存在
download_dir = os.getcwd()
os.makedirs(download_dir, exist_ok=True)
print(f"下载目录: {os.path.abspath(download_dir)}")
# 使用线程池进行下载
with ThreadPoolExecutor(max_workers=2) as executor:
print(f"开始下载: {download_link}")
executor.map(lambda link: download_file(link, download_dir), download_link)
print("\n🎉 文件下载完成!")
if __name__ == "__main__":
# 执行下载操作
main_download()
# 等待用户查看结果
input("按 Enter 键退出...")
提取url
import os
import json
def process_index_file():
# 读取同目录下的 index.txt 文件
input_file_path = os.path.join(os.getcwd(), 'index.txt')
output_file_path = os.path.join(os.getcwd(), 'output.txt')
try:
with open(input_file_path, 'r', encoding='utf-8') as f:
content = f.read()
data = json.loads(content)
except FileNotFoundError:
print(f"错误: 找不到 {input_file_path} 文件")
return
except json.JSONDecodeError:
print(f"错误: {input_file_path} 文件不是有效的 JSON 格式")
return
except Exception as e:
print(f"错误: 读取文件时发生未知错误: {e}")
return
# 基础 URL
base_url = "https://nidg.bhjsj8.com/new_patch/files/"
# 用于存储结果的列表
result_urls = []
# 遍历 JSON 数据中的每个键值对
for key, value in data.items():
# 确保值是列表且至少有两个元素
if isinstance(value, list) and len(value) >= 2:
# 拼接文件名和哈希值(直接使用键名,不做前缀过滤)
filename = f"{key}.{value[1]}"
# 拼接完整 URL
full_url = f"{base_url}{filename}"
result_urls.append(full_url)
# 将结果写入 output.txt 文件
try:
with open(output_file_path, 'w', encoding='utf-8') as f:
# 每行写入一个 URL
for url in result_urls:
f.write(url + '\n')
print(f"成功生成 {output_file_path} 文件,共写入 {len(result_urls)} 个 URL")
except Exception as e:
print(f"错误: 写入文件时发生未知错误: {e}")
if __name__ == "__main__":
process_index_file()
input("按 Enter 键退出...")
下载
import requests
import os
import time
from concurrent.futures import ThreadPoolExecutor
import urllib3
from datetime import datetime
# 禁用 InsecureRequestWarning 警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def get_simplified_timestamp():
now = datetime.now()
return now.strftime("%y%m%d%H%M%S")
def download_file(link, download_dir, total_files, index, digit_count, success_files, failed_files):
# 提取文件名和路径
path_parts = link.split("/")
file_name = path_parts[-1].split("?")[0]
relative_path = "/".join(path_parts[3:-1]) # 假设前三个部分是协议和域名,可根据实际情况调整
local_dir = os.path.join(download_dir, relative_path)
local_path = os.path.join(local_dir, file_name)
max_retries = 3 # 最大重试次数
retry_delay = 5 # 重试延迟时间(秒)
# 创建本地目录
os.makedirs(local_dir, exist_ok=True)
for attempt in range(max_retries):
try:
# 检查本地文件是否存在且大小相同(忽略证书验证)
if os.path.exists(local_path):
local_size = os.path.getsize(local_path)
headers = requests.head(link, allow_redirects=True, timeout=10, verify=False).headers
remote_size = int(headers.get('Content-Length', 0))
if local_size == remote_size and remote_size != 0:
print(f"[跳过] {total_files}/{index:0{digit_count}d} 已存在且大小一致: {local_path}")
success_files.append(file_name)
return
response = requests.get(link, stream=True, timeout=10, verify=False)
response.raise_for_status()
with open(local_path, 'wb') as file:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
file.write(chunk)
if os.path.exists(local_path):
final_size = os.path.getsize(local_path)
if final_size > 0:
print(f"[成功] {total_files}/{index:0{digit_count}d} 下载完成: {local_path}")
success_files.append(file_name)
return
else:
raise Exception("下载后文件大小为0")
except (requests.RequestException, Exception) as e:
if attempt < max_retries - 1:
print(f"[尝试 {attempt + 1}/{max_retries}] {total_files}/{index:0{digit_count}d} 下载失败: {str(e)}, 重试中...")
time.sleep(retry_delay)
else:
print(f"[失败] {total_files}/{index:0{digit_count}d} 下载失败: {str(e)}")
failed_files.append(file_name)
def count_files_in_directory(directory):
file_count = 0
for root, dirs, files in os.walk(directory):
file_count += len(files)
return file_count
def main_download():
file_path = "output.txt"
download_dir = os.path.join(os.getcwd(), "com.superhgame.rpg.emma")
if not os.path.exists(file_path):
print("❌ 未找到下载列表文件")
return
with open(file_path, "r", encoding="utf-8") as f:
download_links = [line.strip() for line in f if line.strip()]
if not download_links:
print("❌ 下载列表中没有有效链接")
return
total_files = len(download_links)
digit_count = len(str(total_files))
os.makedirs(download_dir, exist_ok=True)
success_files = []
failed_files = []
print(f"✅ 准备下载 {total_files} 个文件,保存到 {download_dir}")
print("-" * 60)
with ThreadPoolExecutor(max_workers=32) as executor:
futures = []
for index, link in enumerate(download_links, start=1):
futures.append(executor.submit(
download_file,
link, download_dir, total_files, index, digit_count,
success_files, failed_files
))
for future in futures:
future.result()
print("\n" + "=" * 60)
print(f"📦 下载完成 | 成功: {len(success_files)} | 失败: {len(failed_files)}")
if failed_files:
print("\n❌ 以下文件下载失败(请手动检查):")
for fname in failed_files:
print(f" - {fname}")
else:
print("\n🎉 所有文件均下载成功!")
# 计算目标文件夹中的实际文件数
actual_file_count = count_files_in_directory(download_dir)
print(f"\n📁 目标文件夹中的实际文件数: {actual_file_count}")
print(f"📋 下载列表中的文件数: {total_files}")
if actual_file_count == total_files:
print("✅ 实际文件数与下载数相同。")
else:
print("❌ 实际文件数与下载数不同,请检查。")
# 等待用户输入,阻止程序立即退出
input("按回车键退出...")
if __name__ == "__main__":
try:
import requests
except ImportError:
print("❌ 缺少requests库,请先安装:pip install requests")
exit(1)
main_download()