FANZA Games 的 Lumina Prognosis 服务即将终止。
我知道“config”文件存在,但我无法从该文件创建资源 URL。
有谁可以做到这一点吗?
可能没有必要,但我会上传“config”文件。
FANZA Games 的 Lumina Prognosis 服务即将终止。
我知道“config”文件存在,但我无法从该文件创建资源 URL。
有谁可以做到这一点吗?
可能没有必要,但我会上传“config”文件。
没网址不知道哪个游戏,不过uuid还原应该就base64。
BASE64_CHARS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"
BASE64_VALUES = [0] * 128
for idx, char in enumerate(BASE64_CHARS):
BASE64_VALUES[ord(char)] = idx
HEX_CHARS = list('0123456789abcdef')
_t = ['', '', '', '']
UUID_TEMPLATE = _t + _t + ['-'] + _t + ['-'] + _t + ['-'] + _t + ['-'] + _t + _t + _t
INDICES = [i for i, x in enumerate(UUID_TEMPLATE) if x != '-']
def decode_uuid(base64_str):
"""将Base64编码的字符串还原为UUID格式"""
if len(base64_str) != 22:
return base64_str
result = UUID_TEMPLATE.copy()
result[0] = base64_str[0]
result[1] = base64_str[1]
j = 2
for i in range(2, 22, 2):
lhs = BASE64_VALUES[ord(base64_str[i])]
rhs = BASE64_VALUES[ord(base64_str[i + 1])]
result[INDICES[j]] = HEX_CHARS[lhs >> 2]
j += 1
result[INDICES[j]] = HEX_CHARS[((lhs & 3) << 2) | (rhs >> 4)]
j += 1
result[INDICES[j]] = HEX_CHARS[rhs & 0xF]
j += 1
return ''.join(result)
def encode_uuid(uuid_str):
"""将UUID格式的字符串编码为Base64编码的字符串"""
uuid_str = uuid_str.replace('-', '')
if len(uuid_str) != 32:
return uuid_str
base64_result = [''] * 22
base64_result[0] = uuid_str[0]
base64_result[1] = uuid_str[1]
j = 2
for i in range(2, 32, 3):
a = HEX_CHARS.index(uuid_str[i])
b = HEX_CHARS.index(uuid_str[i + 1])
c = HEX_CHARS.index(uuid_str[i + 2])
base64_result[j] = BASE64_CHARS[(a << 2) | (b >> 2)]
j += 1
base64_result[j] = BASE64_CHARS[((b & 3) << 4) | c]
j += 1
return ''.join(base64_result)
def main():
"""交互式主函数"""
print("=== UUID还原与编码工具 ===")
print("请选择操作类型:")
print("1. 将22位Base64编码的字符串还原为UUID格式")
print("2. 将UUID格式的字符串编码为22位Base64编码的字符串")
print("输入'q'或'exit'退出程序")
while True:
# 获取用户选择
choice = input("\n请输入操作编号 (1/2) 或输入 'q'/'exit' 退出: ").strip()
# 检查退出条件
if choice.lower() in ['q', 'exit']:
print("程序已退出。")
break
if choice not in ['1', '2']:
print("错误: 请输入有效的操作编号 (1/2)。")
continue
if choice == '1':
print("请输入22位Base64编码的字符串,将还原为UUID格式")
user_input = input("\n请输入要还原的字符串: ").strip()
# 检查输入长度
if len(user_input) != 22:
print(f"错误: 输入必须是22个字符,当前长度为 {len(user_input)}")
continue
# 检查所有字符是否都在BASE64_CHARS中
if not all(c in BASE64_CHARS for c in user_input):
print("错误: 输入包含无效字符,请确保所有字符都在以下范围内:")
print(BASE64_CHARS)
continue
# 执行还原
try:
decoded_uuid = decode_uuid(user_input)
print(f"还原结果: {decoded_uuid}")
except Exception as e:
print(f"还原过程中发生错误: {e}")
elif choice == '2':
print("请输入UUID格式的字符串,将编码为22位Base64编码的字符串")
user_input = input("\n请输入要编码的字符串: ").strip()
# 检查输入长度
uuid_str = user_input.replace('-', '')
if len(uuid_str) != 32:
print(f"错误: 输入必须是32个十六进制字符(不包括连字符),当前长度为 {len(uuid_str)}")
continue
# 执行编码
try:
encoded_base64 = encode_uuid(user_input)
print(f"编码结果: {encoded_base64}")
except Exception as e:
print(f"编码过程中发生错误: {e}")
if __name__ == "__main__":
main()
这是“url”。
https://play.games.dmm.co.jp/game/luminaprognosis_x
我希望能够从“config”创建资产“URL”。
后面看了下发现写的一坨。
抄了抄改了改。
import re
import os
import shutil
import requests
import time
import json
import concurrent.futures
import urllib3
from datetime import datetime
from pathlib import Path
from numpy import uint8, frombuffer as npbuff, tile as npfill
# 禁用 InsecureRequestWarning 警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 定义替换规则
replace_rules = {
"cc.ImageAsset": ".json,.png",
"cc.SpriteFrame": ".json",
"sp.SkeletonData": ".atlas,.skel",
"cc.TextAsset": ".json",
"cc.VideoClip": ".json,.mp4",
"cc.AudioClip": ".json,.mp3",
"cc.JsonAsset": ".json",
"cc.Prefab": ".json",
"cc.ParticleAsset": ".json",
"cc.Asset": ".json"
}
BASE64_CHARS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"
BASE64_VALUES = [0] * 128
for idx, char in enumerate(BASE64_CHARS):
BASE64_VALUES[ord(char)] = idx
HEX_CHARS = list('0123456789abcdef')
_t = ['', '', '', '']
UUID_TEMPLATE = _t + _t + ['-'] + _t + ['-'] + _t + ['-'] + _t + ['-'] + _t + _t + _t
INDICES = [i for i, x in enumerate(UUID_TEMPLATE) if x != '-']
def decode_uuid(base64_str):
"""将Base64编码的字符串还原为UUID格式,处理包含@的情况"""
if '@' in base64_str:
main_part, suffix = base64_str.split('@', 1)
else:
main_part = base64_str
suffix = ''
if len(main_part) != 22:
result = main_part
else:
result = UUID_TEMPLATE.copy()
result[0] = main_part[0]
result[1] = main_part[1]
j = 2
for i in range(2, 22, 2):
lhs = BASE64_VALUES[ord(main_part[i])]
rhs = BASE64_VALUES[ord(main_part[i + 1])]
result[INDICES[j]] = HEX_CHARS[lhs >> 2]
j += 1
result[INDICES[j]] = HEX_CHARS[((lhs & 3) << 2) | (rhs >> 4)]
j += 1
result[INDICES[j]] = HEX_CHARS[rhs & 0xF]
j += 1
result = ''.join(result)
if suffix:
result = f"{result}@{suffix}"
return result
def extract_arrays_from_json(json_path):
try:
with open(json_path, "r", encoding="utf-8") as f:
content = f.read()
print(f"✅ 读取 JSON 文件成功,长度: {len(content)} 字符")
except Exception as e:
print(f"❌ 读取文件失败: {str(e)}")
return None, None, None, None, None, None, None
try:
uuids_match = re.search(r'"uuids":\s*\[([^\]]+)\]', content)
if not uuids_match:
print("警告: 未找到 uuids 数组")
uuids = []
else:
uuids_str = uuids_match.group(1)
uuids = [item.strip().strip('"') for item in uuids_str.split(',') if item.strip()]
uuids = [decode_uuid(uuid) for uuid in uuids]
print(f"✅ 提取到 uuids: {len(uuids)} 条")
except Exception as e:
print(f"❌ 处理 uuids 失败: {str(e)}")
uuids = []
try:
import_match = re.search(r'"versions":\s*\{\s*"import":\s*\[([^\]]+)\]', content)
if not import_match:
print("警告: 未找到 versions.import 数组")
imports = {}
else:
import_str = import_match.group(1)
import_items = [item.strip() for item in import_str.split(',') if item.strip()]
imports = {}
for i in range(0, len(import_items), 2):
if i + 1 < len(import_items):
num = import_items[i]
string_val = import_items[i + 1].strip('"')
imports[num] = string_val
print(f"✅ 提取到 import: {len(imports)} 项")
except Exception as e:
print(f"❌ 处理 import 失败: {str(e)}")
imports = {}
try:
native_match = re.search(r'"native":\s*\[([^\]]+)\]', content)
if not native_match:
print("警告: 未找到 native 数组")
natives = {}
else:
native_str = native_match.group(1)
native_items = [item.strip() for item in native_str.split(',') if item.strip()]
natives = {}
for i in range(0, len(native_items), 2):
if i + 1 < len(native_items):
num = native_items[i]
string_val = native_items[i + 1].strip('"')
natives[num] = string_val
print(f"✅ 提取到 native: {len(natives)} 项")
except Exception as e:
print(f"❌ 处理 native 失败: {str(e)}")
natives = {}
try:
paths_match = re.search(r'"paths":\s*\{([^}]+)\}', content)
if not paths_match:
print("警告: 未找到 paths 块")
paths = {}
else:
paths_str = paths_match.group(1)
# 同时匹配包含3个元素和4个元素的数组格式
path_pattern = r'"(\d+)":\s*\[\s*"([^"]+)"\s*,\s*(\d+)\s*(?:,\s*(\d+)\s*)?\]'
path_items = re.findall(path_pattern, paths_str)
if not path_items:
print("警告: 在 paths 块中未找到有效路径项")
paths = {}
else:
paths = {}
for item in path_items:
key, path, num1, num2 = item # num2 可能为 None
paths[key] = {
'path': path,
'num': int(num1) - 1 # 保持原逻辑,使用第一个数字减1作为索引
}
print(f"✅ 提取到 paths: {len(paths)} 项")
except Exception as e:
print(f"❌ 处理 paths 失败: {str(e)}")
paths = {}
try:
types_match = re.search(r'"types":\s*\[([^\]]+)\]', content)
if not types_match:
print("警告: 未找到 types 数组")
types = []
else:
types_str = types_match.group(1)
types = [item.strip().strip('"') for item in types_str.split(',') if item.strip()]
new_types = []
for line in types:
replaced = False
for old_text, new_text in replace_rules.items():
if old_text in line:
new_types.append(new_text)
replaced = True
break
if not replaced:
new_types.append('')
types = new_types
print(f"✅ 提取到 types: {len(types)} 项")
except Exception as e:
print(f"❌ 处理 types 失败: {str(e)}")
types = []
try:
name_match = re.search(r'"name"\s*:\s*"([^"]+)"', content)
if not name_match:
print("警告: 未找到 name 字段")
name = ''
else:
name = name_match.group(1)
print(f"✅ 提取到 name: {name}")
except Exception as e:
print(f"❌ 处理 name 失败: {str(e)}")
name = ''
return uuids, imports, natives, paths, types, name
def determine_file_name(path):
"""根据路径确定文件名,处理spriteFrame和texture的特殊情况"""
parts = path.split('/')
last_part = parts[-1]
if 'spriteFrame' in parts or 'texture' in parts:
for i in range(len(parts) - 2, -1, -1):
if parts[i] not in ['spriteFrame', 'texture']:
return parts[i]
return last_part
else:
return last_part.split('.')[0] if '.' in last_part else last_part
def get_final_path(path, file_name, suffix):
"""生成最终的文件路径,保持原后缀不变"""
parts = path.split('/')
if parts[-1] in ['spriteFrame', 'texture']:
parts = parts[:-1]
parts.append(f"{file_name}{suffix}")
return "/".join(parts)
def get_unique_path(path, written_paths):
"""获取唯一的文件路径,如果路径已存在则添加编号"""
base_path, ext = os.path.splitext(path)
counter = 1
new_path = path
while new_path in written_paths:
new_path = f"{base_path}({counter}){ext}"
counter += 1
return new_path
def extract_urls(uuids, imports, natives, paths, types, name, json_file_name):
download_links = []
target_names = []
written_paths = set()
# 基础URL保持不变
base_url = "https://cdn-app.miraigirl.net/assets/"
for serial, data in paths.items():
path = data['path']
num = data['num']
print(f"\n[处理路径] 序列号: {serial}")
type_idx = num + 1
print(f"[INFO] 计算type_idx: {type_idx}")
if type_idx >= len(types):
print(f"[ERROR] type索引{type_idx}超出types范围(共{len(types)}项),跳过处理")
continue
processed_line = types[type_idx].strip()
print(f"[INFO] types第{type_idx}项 - 处理后: '{processed_line}'")
if not processed_line:
houzhui_list = []
else:
houzhui_list = [h.strip() for h in processed_line.split(',')]
file_name = determine_file_name(path)
print(f"[INFO] 文件名: {file_name}")
uuid_idx = int(serial)
if uuid_idx >= len(uuids):
print(f"[ERROR] uuid索引{uuid_idx}超出uuids范围(共{len(uuids)}个),跳过处理")
continue
uuid_val = uuids[uuid_idx]
uu = uuid_val[:2] if uuid_val else ''
print(f"[INFO] uuid: {uuid_val}, uu: {uu}")
# 遍历所有后缀,根据后缀类型选择imports或natives
for h in houzhui_list:
if not h:
continue # 跳过空后缀
print(f"[INFO] 处理后缀: {h}")
# 核心判断:仅.json和.atlas使用imports,其他用natives
if h in ['.json', '.atlas']:
hash_val = imports.get(serial, '')
url_dir = 'import' # 对应imports的URL路径
# 对于.atlas后缀,URL使用.json
processed_h = '.json' if h == '.atlas' else h
else:
hash_val = natives.get(serial, '')
url_dir = 'native' # 对应natives的URL路径
# 特殊处理:.skel替换为.bin(仅在非imports后缀时生效)
processed_h = '.bin' if h == '.skel' else h
# 检查哈希是否存在
if not hash_val:
print(f"[WARN] 未找到序列号{serial}对应的{url_dir} hash,跳过{h} URL")
continue
# 生成URL
url = f"{base_url}{name}/{url_dir}/{uu}/{uuid_val}.{hash_val}{processed_h}"
download_links.append(url)
print(f"[SUCCESS] 生成{url_dir} URL: {url.strip()}")
# 生成保存路径:对于.atlas后缀,保存为.atlas.json
save_suffix = '.atlas.json' if h == '.atlas' else h
final_path = get_final_path(path, file_name, save_suffix)
final_path = get_unique_path(final_path, written_paths)
written_paths.add(final_path)
target_names.append(final_path)
return download_links, target_names
def get_simplified_timestamp():
now = datetime.now()
return now.strftime("%y%m%d%H%M%S")
def decrypt_file(file_path: Path, klen: int = 32):
"""解密单个图片文件"""
try:
with open(file_path, 'rb') as f:
# 检查是否已经是解密的图片
header = f.read(4)
if header.startswith((b'\xFF\xD8\xFF', b'\x89PNG')):
print(f"已解密,跳过: {file_path.as_posix()}")
return True
f.seek(0)
darr = npbuff(f.read(), dtype=uint8)
dlen = len(darr)
# 生成基础密钥
basekey = f'{dlen}2fjaykPFd6bAJn59beX5TWDQzsEW'.encode('utf-8')
keylen = len(basekey)
# 扩展密钥
key = bytearray(klen)
o, t = 0, 0
for i in range(klen):
o = key[i] = basekey[t] ^ o
t = (t + 1) % keylen
# 异或解密
karr = npbuff(key, dtype=uint8)
data = (darr ^ npfill(karr, (dlen // klen) + 1)[:dlen]).tobytes()
# 验证解密结果
if not data[:4].startswith((b'\xFF\xD8\xFF', b'\x89PNG')):
print(f'解密错误 --- {file_path.as_posix()}')
return False
# 写回解密后的文件
with open(file_path, 'wb') as f:
f.write(data)
print(f"解密成功: {file_path.as_posix()}")
return True
except Exception as e:
print(f"处理文件时出错 {file_path.as_posix()}: {str(e)}")
return False
def silent_download(link, save_path):
"""静默下载函数,不输出任何提示,仅返回成功与否"""
try:
# 检查文件是否已存在且大小匹配
if os.path.exists(save_path):
local_size = os.path.getsize(save_path)
headers = requests.head(link, allow_redirects=True, timeout=10, verify=False).headers
remote_size = int(headers.get('Content-Length', 0))
if local_size == remote_size and remote_size != 0:
return True
# 下载文件
response = requests.get(link, stream=True, timeout=10, verify=False)
response.raise_for_status()
with open(save_path, 'wb') as file:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
file.write(chunk)
# 验证文件大小
if os.path.exists(save_path) and os.path.getsize(save_path) > 0:
return True
except:
return False
def process_atlas_json_file(file_path):
"""处理单个.atlas.json文件,提取数据并生成.atlas和.json文件"""
try:
# 读取文件内容
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 提取基础文件名(去掉.atlas.json后缀)
base_name = os.path.splitext(os.path.splitext(file_path)[0])[0]
# 第一步:提取纹理集信息到.atlas文件
# 匹配到第一个引号处结束
pattern_atlas = r'(\\n.{3,50}\.png\\nsize:.*?)(?=")'
full_matches = re.findall(pattern_atlas, content, re.DOTALL)
atlas_content = ""
if not full_matches:
print(f"警告: 在文件 {file_path} 中未找到.atlas数据")
else:
print(f"找到 {len(full_matches)} 个图片区块")
for match in full_matches:
# 将字符串形式的\\n转换为实际换行符
processed = match.replace('\\n', '\n')
atlas_content += processed
# 写入.atlas文件
atlas_file = f"{base_name}.atlas"
with open(atlas_file, 'w', encoding='utf-8') as f:
f.write(atlas_content)
print(f"已生成: {atlas_file}")
# 第二步:提取骨架信息到.json文件
# 从{"skeleton":{"hash":开始,到第一个{对应的}结束
# 首先找到起始位置
start_pattern = r'{"skeleton":{"hash":'
start_match = re.search(start_pattern, content)
if start_match:
start_index = start_match.start()
# 起始位置的第一个{是start_index处的{
# 我们需要找到与这个{匹配的}
open_braces = 0
end_index = -1
# 从起始位置开始遍历
for i in range(start_index, len(content)):
if content[i] == '{':
open_braces += 1
elif content[i] == '}':
open_braces -= 1
# 当所有打开的花括号都被关闭时,找到匹配的结束位置
if open_braces == 0:
end_index = i + 1 # 包含这个}
break
if end_index != -1:
json_content = content[start_index:end_index]
# 写入.json文件
json_file = f"{base_name}.json"
with open(json_file, 'w', encoding='utf-8') as f:
f.write(json_content)
print(f"已生成: {json_file}")
else:
print(f"警告: 在文件 {file_path} 中未找到匹配的花括号")
else:
print(f"警告: 在文件 {file_path} 中未找到骨架起始标记")
except Exception as e:
print(f"处理文件 {file_path} 时出错: {str(e)}")
def download_file(link, save_path, total_files, index, digit_count, success_files, failed_files, target_names, name):
"""下载文件并按指定路径保存,优化重试逻辑"""
max_retries = 5
retry_delay = 2
download_dir = os.path.join(os.getcwd(), "downloader", name)
final_save_path = os.path.join(download_dir, save_path)
os.makedirs(os.path.dirname(final_save_path), exist_ok=True)
# 生成任务标识 [当前任务数/总任务数]
task_id = f"[{index}/{total_files}]"
# 移除将.json后缀强制改为.atlas的逻辑
# 对于PNG文件使用特殊重试逻辑
if final_save_path.lower().endswith('.png'):
# 定义要尝试的替代格式列表
alternatives = [
('.jpg', link[:-4] + '.jpg'),
('.webp', link[:-4] + '.webp')
]
success = False
last_error = ""
for attempt in range(max_retries):
# 尝试下载原始PNG
if silent_download(link, final_save_path):
success = True
break
# 尝试所有替代格式各一次
alt_success = False
for ext, alt_link in alternatives:
alt_save_path = final_save_path[:-4] + ext
if silent_download(alt_link, alt_save_path):
# 如果是JPG文件,尝试解密
if ext == '.jpg':
if decrypt_file(Path(alt_save_path)):
success_files.append(alt_save_path)
alt_success = True
final_save_path = alt_save_path # 更新最终路径
break
else:
success_files.append(alt_save_path)
alt_success = True
final_save_path = alt_save_path # 更新最终路径
break
if alt_success:
success = True
break
# 等待重试
time.sleep(retry_delay)
if success:
print(f"{task_id} [成功] 下载完成: {final_save_path}")
# 检查是否需要解密(PNG情况)
if final_save_path.lower().endswith('.png'):
print(f"{task_id} [解密] 开始解密图片文件: {final_save_path}")
if decrypt_file(Path(final_save_path)):
success_files.append(final_save_path)
else:
failed_files.append(f"解密失败: {final_save_path}")
return
else:
# 获取最终错误状态码
try:
response = requests.head(link, allow_redirects=True, timeout=10, verify=False)
status_code = response.status_code
if status_code == 404:
print(f"{task_id} [失败] 404 未找到: {link}")
elif status_code == 403:
print(f"{task_id} [失败] 403 权限不足,达到最大尝试次数: {link}")
else:
print(f"{task_id} [失败] 所有替代格式下载失败: {link}")
except:
print(f"{task_id} [失败] 所有替代格式下载失败: {link}")
failed_files.append(link)
return
# 非PNG文件的下载逻辑
success = False
for attempt in range(max_retries):
if silent_download(link, final_save_path):
success = True
break
time.sleep(retry_delay)
if success:
print(f"{task_id} [成功] 下载完成: {final_save_path}")
# 检查是否为PNG或JPG文件,如果是则进行解密
if final_save_path.lower().endswith(('.png', '.jpg', '.jpeg')):
print(f"{task_id} [解密] 开始解密图片文件: {final_save_path}")
if decrypt_file(Path(final_save_path)):
success_files.append(final_save_path)
else:
failed_files.append(f"解密失败: {final_save_path}")
else:
success_files.append(final_save_path)
# 检查是否是.atlas.json文件,如果是则处理提取
if final_save_path.lower().endswith('.atlas.json'):
print(f"{task_id} [处理] 开始提取.atlas和.json文件: {final_save_path}")
process_atlas_json_file(final_save_path)
return
else:
# 获取最终错误状态码
try:
response = requests.head(link, allow_redirects=True, timeout=10, verify=False)
status_code = response.status_code
if status_code == 404:
print(f"{task_id} [失败] 404 未找到: {link}")
elif status_code == 403:
print(f"{task_id} [失败] 403 权限不足,达到最大尝试次数: {link}")
else:
print(f"{task_id} [失败] 下载失败: {link}")
except:
print(f"{task_id} [失败] 下载失败: {link}")
failed_files.append(link)
return
def count_files_in_directory(directory):
"""计算目录内文件总数"""
file_count = 0
for root, _, files in os.walk(directory):
file_count += len(files)
return file_count
def main():
# 清空已有output.txt和error.txt(如果存在)
with open('output.txt', 'w', encoding='utf-8') as f:
pass # 仅清空文件
with open('error.txt', 'w', encoding='utf-8') as f:
pass # 仅清空文件
# 查找包含config的json文件
json_files = [f for f in os.listdir('.') if f.endswith('.json') and 'config' in f]
if not json_files:
print("❌ 未找到包含config的json文件")
return
overall_success_files = []
overall_failed_files = []
overall_total_files = 0
for json_path in json_files:
json_file_name = os.path.basename(json_path)
print(f"\n开始处理文件: {json_file_name}")
# 解析JSON文件
uuids, imports, natives, paths, types, name = extract_arrays_from_json(json_path)
if uuids is None:
continue
# 提取URL
download_links, target_names = extract_urls(uuids, imports, natives, paths, types, name, json_file_name)
# 将生成的URL写入output.txt(追加模式)
with open('output.txt', 'a', encoding='utf-8') as f:
for url in download_links:
f.write(url + '\n')
print(f"✅ 已将 {len(download_links)} 个URL写入 output.txt")
# 下载文件
total_files = len(download_links)
digit_count = len(str(total_files))
download_dir = os.path.join(os.getcwd(), "downloader")
os.makedirs(download_dir, exist_ok=True)
success_files = []
failed_files = []
print(f"✅ 开始下载 {total_files} 个文件到 {download_dir}")
print("-" * 60)
save_paths = [os.path.join(download_dir, name, path) for path in target_names]
# 多线程下载
with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor:
futures = []
for idx, (link, path) in enumerate(zip(download_links, target_names), 1):
futures.append(executor.submit(
download_file, link, path, total_files, idx, digit_count,
success_files, failed_files, target_names, name
))
for future in futures:
future.result()
# 输出统计结果
print("\n" + "=" * 60)
print(f"📊 本次下载统计 | 成功: {len(success_files)} | 失败: {len(failed_files)}")
if failed_files:
print("\n❌ 本次下载失败的链接:")
for link in failed_files:
print(f" - {link}")
# 将本次失败的URL写入error.txt(追加模式)
with open('error.txt', 'a', encoding='utf-8') as f:
for link in failed_files:
f.write(link + '\n')
else:
print("\n🎉 本次所有文件下载完成!")
actual_count = count_files_in_directory(os.path.join(download_dir, name))
print(f"\n📁 本次实际文件数: {actual_count} | 本次目标文件数: {total_files}")
if actual_count == total_files:
print("✅ 本次数量一致,下载完成")
else:
print(f"❌ 本次数量不一致,可能存在文件生成或删除异常")
overall_success_files.extend(success_files)
overall_failed_files.extend(failed_files)
overall_total_files += total_files
# 输出整体统计结果
print("\n" + "=" * 60)
print(f"📊 整体下载统计 | 成功: {len(overall_success_files)} | 失败: {len(overall_failed_files)}")
if overall_failed_files:
print("\n❌ 整体下载失败的链接已记录到 error.txt")
else:
print("\n🎉 所有文件下载完成!")
overall_actual_count = count_files_in_directory(os.path.join(os.getcwd(), "downloader"))
print(f"\n📁 整体实际文件数: {overall_actual_count} | 整体目标文件数: {overall_total_files}")
if overall_actual_count == overall_total_files:
print("✅ 整体数量一致,下载完成")
else:
print(f"❌ 整体数量不一致,可能存在文件生成或删除异常")
input("\n按回车键退出...")
if __name__ == "__main__":
try:
import requests
import numpy
except ImportError as e:
missing = str(e).split("'")[1]
print(f"❌ 请先安装依赖: pip install {missing}")
exit(1)
main()
这个可以全部下完加解密加分离atlas和json。
不过没有大佬给的速度快。我这个完全就是靠猜的。
这个游戏似乎将加密过的文件的uuid写到https://cdn-app.miraigirl.net/assets/resources/ecul 里了,这个文件也需要解密
下载的时候会检查是否需要解密,不想专门再整了,那个文件解密方法与图片相同。
谢谢,我会尝试的!
试了一下成功了,谢谢!
有个问题想请教,附件里的config文件也能用同样方法处理吗?
是《战国恋姬online》的config文件。
游戏网址:Now loading...
附加的config文件:https://drive.google.com/file/d/1s4PJwnu4DXorArlZ69UxV0hpBs8DH-ID/view?usp=sharing
proxyaddr = useproxy = 0
def oenshinshix(overwrite):
import os
import requests
import json
import re
from urllib.parse import urljoin
BASE64_CHARS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"
BASE64_VALUES = [0] * 128
for idx, char in enumerate(BASE64_CHARS):
BASE64_VALUES[ord(char)] = idx
HEX_CHARS = list('0123456789abcdef')
_t = ['', '', '', '']
UUID_TEMPLATE = _t + _t + ['-'] + _t + ['-'] + _t + ['-'] + _t + ['-'] + _t + _t + _t
INDICES = [i for i, x in enumerate(UUID_TEMPLATE) if x != '-']
mainurl = "https://d9ptw0bza8yos.cloudfront.net"
def decode_uuid(base64_str):
if len(base64_str) != 22:
return base64_str
result = UUID_TEMPLATE.copy()
result[0] = base64_str[0]
result[1] = base64_str[1]
j = 2
for i in range(2, 22, 2):
lhs = BASE64_VALUES[ord(base64_str[i])]
rhs = BASE64_VALUES[ord(base64_str[i + 1])]
result[INDICES[j]] = HEX_CHARS[lhs >> 2]
j += 1
result[INDICES[j]] = HEX_CHARS[((lhs & 3) << 2) | (rhs >> 4)]
j += 1
result[INDICES[j]] = HEX_CHARS[rhs & 0xF]
j += 1
return ''.join(result)
def has_sp_skeleton_data(obj):
if isinstance(obj, list):
if len(obj) > 0 and obj[0] == "sp.SkeletonData":
return obj
for item in obj:
found = has_sp_skeleton_data(item)
if found:
return found
elif isinstance(obj, dict):
for value in obj.values():
found = has_sp_skeleton_data(value)
if found:
return found
return None
def fetch_url(url):
"""下载URL内容"""
print(f"Fetching: {url}")
response = requests.get(url)
response.raise_for_status()
return response.text
def getspresjson():
index_html = fetch_url("https://d9ptw0bza8yos.cloudfront.net/1/web-mobile/firebase-app-and-analytics.js")
# 2. 正则匹配 index.xxx.js
index_js_match = re.search(r'specified by the "main\.([0-9a-f]+)"', index_html)
if not index_js_match:
raise Exception("未找到 main.xxx.js 文件!")
index_js_file = index_js_match.group(1)
index_js_url = f"{mainurl}/1/web-mobile/main.{index_js_file}.js"
# 3. 获取 index.js 内容
app_js_content = fetch_url(index_js_url)
# 6. 匹配 settings.json
settings_match = re.search(r"require\(['\"](src/settings\..*?\.js)['\"]\)", app_js_content)
if not settings_match:
raise Exception("未找到 settings.json 文件!")
settings_path = settings_match.group(1)
settings_url = f"{mainurl}/1/web-mobile/{settings_path}"
# 7. 获取 settings.json 内容
settings_content = fetch_url(settings_url)
configv = re.search(r'assetbundle:\s*"([^"]+)"', settings_content)
if not configv:
raise Exception("未找到 config.xxx.json 文件!")
confv = configv.group(1)
print("资源版本号:", confv)
url2 = f"{mainurl}/1/web-mobile/assets/assetbundle/config.{confv}.json"
try:
response2 = requests.get(url2)
response2.raise_for_status()
return response2.json()
except Exception as e:
print(e)
return None
def resdownloader(BASE, data, output_dir):
if not data:
return
uuids = data["uuids"]
vers_imp = data["versions"]["import"]
vers_nat = data["versions"]["native"]
paths = data["paths"]
types = data["types"]
if "sp.SkeletonData" not in types:
print(f"[!] 未找到 sp.SkeletonData 数据")
return
SpriteFrameint = types.index("sp.SkeletonData")
vimp = {vers_imp[i]: vers_imp[i + 1] for i in range(0, len(vers_imp), 2)}
vnat = {vers_nat[i]: vers_nat[i + 1] for i in range(0, len(vers_nat), 2)}
os.makedirs(output_dir, exist_ok=True)
for idx, sid in enumerate(uuids):
if paths.get(str(idx)) and paths.get(str(idx))[1] == SpriteFrameint:
if '@' in sid:
uuid_base64, ext = sid.split('@', 1)
else:
uuid_base64, ext = sid, None
uuid = decode_uuid(uuid_base64)
full_uuid = f"{uuid}@{ext}" if ext else uuid
imp_ver = f".{vimp.get(idx, '')}"
nat_ver = f".{vnat.get(idx, '')}"
json_url = f"{BASE}/import/{uuid[:2]}/{full_uuid}{imp_ver}.json"
skel_url = f"{BASE}/native/{uuid[:2]}/{full_uuid}{nat_ver}.bin"
base_name = os.path.basename(paths[str(idx)][0])
print(f'json - {base_name}.json -> {json_url} | skel - {base_name}.skel -> {skel_url}')
try:
response = requests.get(json_url)
response.raise_for_status()
json_data = response.json()
except Exception as e:
print(f"[!] 下载失败: {json_url} ({e})")
continue
if not has_sp_skeleton_data(json_data):
continue
res_path = paths[str(idx)][0]
folder = os.path.join(output_dir, *res_path.split("/"))
os.makedirs(folder, exist_ok=True)
try:
sp_type_block = next(item for item in json_data[3] if item[0] == "sp.SkeletonData")
field_names = sp_type_block[1]
except Exception as e:
print(f"[!] 无法解析字段名: {e}")
continue
try:
obj_data = json_data[5][0]
field_values = obj_data[1:]
sp_dict = dict(zip(field_names, field_values))
_name = sp_dict.get("_name", base_name)
_atlasText = sp_dict.get("_atlasText", "")
_skeletonJson = sp_dict.get("_skeletonJson", None)
atlas_path = os.path.join(folder, f"{_name}.atlas")
if os.path.exists(atlas_path) and not overwrite:
print(f"[!] 已存在 atlas,跳过:{_name}.atlas")
continue
with open(atlas_path, "w", encoding="utf-8") as f:
f.write(_atlasText)
print(f"[+] 写入 atlas 成功:{_name}.atlas")
if _skeletonJson:
json_path = os.path.join(folder, f"{_name}.json")
with open(json_path, "w", encoding="utf-8") as f:
json.dump(_skeletonJson, f, ensure_ascii=False, indent=2)
print(f"[+] 写入 skeletonJson 成功:{_name}.json")
else:
skel_path = os.path.join(folder, f"{_name}.skel")
with open("oenshinshix.txt", "a", encoding="utf-8") as f:
f.write(f"{skel_url}\n out={skel_path}\n")
pngdata = json_data[1]
for i, pngsid in enumerate(pngdata):
pnguuid = decode_uuid(pngsid[:22])
try:
idxpng = uuids.index(pngsid[:22])
nat_verpng = vnat[idxpng]
pngurl = f"{BASE}/native/{pnguuid[:2]}/{pnguuid}.{nat_verpng}.png"
png_tempname = f"{_name}_{i + 1}" if i >= 1 else f"{_name}"
png_path = os.path.join(folder, png_tempname + ".png")
print(f'{png_tempname}.png -> {pngurl}')
with open("oenshinshix.txt", "a", encoding="utf-8") as f:
f.write(f"{pngurl}\n out={png_path}\n")
except Exception as e:
print(f"[!] PNG处理失败: {e}")
except Exception as e:
print(f"[!] 解包 sp.SkeletonData 数据失败: {e}")
if os.path.exists("oenshinshix.txt"):
os.remove("oenshinshix.txt")
resdownloader(f"{mainurl}/1/web-mobile/assets/assetbundle", getspresjson(),
"resdownload\\oenshinshix")
os.system(
f"aria2c -i oenshinshix.txt -j 16 --max-tries=100 --retry-wait=60 --auto-file-renaming=false {f'{proxyaddr} ' if useproxy else ''}{'--allow-overwrite=true ' if overwrite else ''}--check-certificate=false")
oenshinshix(0)
这个代码会生成oenshinshix.txt然后使用aria2c工具下载
不要关注注解。。。从其他项目直接复制的代码这是cocod2d js V3
谢谢!
试用后发现,似乎只支持png、skel、atlas文件,那么json、mp3、jpg文件是否不支持呢?
看起来只获取到了部分配置文件内容。
只要是spine的文件atlas png json skel什么的都支持其他的我没怎么在乎
如果要全数据可以照葫芦画瓢参考代码筛选sp.SkeletonData那块
你只需要修改
base_url = “https://d9ptw0bza8yos.cloudfront.net/1/web-mobile/assets/”
然后将脚本放到config文件所在目录下就行。
失败的是后缀名错误,有些音频文件是wav后缀。
看了下types,要更新替换列表
直接提供一个修改后的脚本吧,小心资源下太多爆炸,如果有选择下载可以直接修改替换规则replace_rules把不想下载的格式和types去掉。
import re
import os
import shutil
import requests
import time
import json
import concurrent.futures
import urllib3
from datetime import datetime
from pathlib import Path
# 禁用 InsecureRequestWarning 警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 定义替换规则
replace_rules = {
"cc.ImageAsset": ".json,.png",
"cc.SpriteFrame": ".json",
"sp.SkeletonData": ".atlas,.skel",
"cc.TextAsset": ".json",
"cc.VideoClip": ".json,.mp4",
"cc.AudioClip": ".json,.mp3",
"cc.JsonAsset": ".json",
"cc.Prefab": ".json",
"cc.ParticleAsset": ".json",
"cc.Asset": ".json",
"cc.Texture2D":".json,.png",
"cc.SpriteAtlas":".json",
"cc.TextAsset":".json",
"cc.AnimationClip":".json"
}
BASE64_CHARS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"
BASE64_VALUES = [0] * 128
for idx, char in enumerate(BASE64_CHARS):
BASE64_VALUES[ord(char)] = idx
HEX_CHARS = list('0123456789abcdef')
_t = ['', '', '', '']
UUID_TEMPLATE = _t + _t + ['-'] + _t + ['-'] + _t + ['-'] + _t + ['-'] + _t + _t + _t
INDICES = [i for i, x in enumerate(UUID_TEMPLATE) if x != '-']
def decode_uuid(base64_str):
"""将Base64编码的字符串还原为UUID格式,处理包含@的情况"""
if '@' in base64_str:
main_part, suffix = base64_str.split('@', 1)
else:
main_part = base64_str
suffix = ''
if len(main_part) != 22:
result = main_part
else:
result = UUID_TEMPLATE.copy()
result[0] = main_part[0]
result[1] = main_part[1]
j = 2
for i in range(2, 22, 2):
lhs = BASE64_VALUES[ord(main_part[i])]
rhs = BASE64_VALUES[ord(main_part[i + 1])]
result[INDICES[j]] = HEX_CHARS[lhs >> 2]
j += 1
result[INDICES[j]] = HEX_CHARS[((lhs & 3) << 2) | (rhs >> 4)]
j += 1
result[INDICES[j]] = HEX_CHARS[rhs & 0xF]
j += 1
result = ''.join(result)
if suffix:
result = f"{result}@{suffix}"
return result
def extract_arrays_from_json(json_path):
try:
with open(json_path, "r", encoding="utf-8") as f:
content = f.read()
print(f"✅ 读取 JSON 文件成功,长度: {len(content)} 字符")
except Exception as e:
print(f"❌ 读取文件失败: {str(e)}")
return None, None, None, None, None, None, None
try:
uuids_match = re.search(r'"uuids":\s*\[([^\]]+)\]', content)
if not uuids_match:
print("警告: 未找到 uuids 数组")
uuids = []
else:
uuids_str = uuids_match.group(1)
uuids = [item.strip().strip('"') for item in uuids_str.split(',') if item.strip()]
uuids = [decode_uuid(uuid) for uuid in uuids]
print(f"✅ 提取到 uuids: {len(uuids)} 条")
except Exception as e:
print(f"❌ 处理 uuids 失败: {str(e)}")
uuids = []
try:
import_match = re.search(r'"versions":\s*\{\s*"import":\s*\[([^\]]+)\]', content)
if not import_match:
print("警告: 未找到 versions.import 数组")
imports = {}
else:
import_str = import_match.group(1)
import_items = [item.strip() for item in import_str.split(',') if item.strip()]
imports = {}
for i in range(0, len(import_items), 2):
if i + 1 < len(import_items):
num = import_items[i]
string_val = import_items[i + 1].strip('"')
imports[num] = string_val
print(f"✅ 提取到 import: {len(imports)} 项")
except Exception as e:
print(f"❌ 处理 import 失败: {str(e)}")
imports = {}
try:
native_match = re.search(r'"native":\s*\[([^\]]+)\]', content)
if not native_match:
print("警告: 未找到 native 数组")
natives = {}
else:
native_str = native_match.group(1)
native_items = [item.strip() for item in native_str.split(',') if item.strip()]
natives = {}
for i in range(0, len(native_items), 2):
if i + 1 < len(native_items):
num = native_items[i]
string_val = native_items[i + 1].strip('"')
natives[num] = string_val
print(f"✅ 提取到 native: {len(natives)} 项")
except Exception as e:
print(f"❌ 处理 native 失败: {str(e)}")
natives = {}
try:
paths_match = re.search(r'"paths":\s*\{([^}]+)\}', content)
if not paths_match:
print("警告: 未找到 paths 块")
paths = {}
else:
paths_str = paths_match.group(1)
# 同时匹配包含3个元素和4个元素的数组格式
path_pattern = r'"(\d+)":\s*\[\s*"([^"]+)"\s*,\s*(\d+)\s*(?:,\s*(\d+)\s*)?\]'
path_items = re.findall(path_pattern, paths_str)
if not path_items:
print("警告: 在 paths 块中未找到有效路径项")
paths = {}
else:
paths = {}
for item in path_items:
key, path, num1, num2 = item # num2 可能为 None
paths[key] = {
'path': path,
'num': int(num1) - 1 # 保持原逻辑,使用第一个数字减1作为索引
}
print(f"✅ 提取到 paths: {len(paths)} 项")
except Exception as e:
print(f"❌ 处理 paths 失败: {str(e)}")
paths = {}
try:
types_match = re.search(r'"types":\s*\[([^\]]+)\]', content)
if not types_match:
print("警告: 未找到 types 数组")
types = []
else:
types_str = types_match.group(1)
types = [item.strip().strip('"') for item in types_str.split(',') if item.strip()]
new_types = []
for line in types:
replaced = False
for old_text, new_text in replace_rules.items():
if old_text in line:
new_types.append(new_text)
replaced = True
break
if not replaced:
new_types.append('')
types = new_types
print(f"✅ 提取到 types: {len(types)} 项")
except Exception as e:
print(f"❌ 处理 types 失败: {str(e)}")
types = []
try:
name_match = re.search(r'"name"\s*:\s*"([^"]+)"', content)
if not name_match:
print("警告: 未找到 name 字段")
name = ''
else:
name = name_match.group(1)
print(f"✅ 提取到 name: {name}")
except Exception as e:
print(f"❌ 处理 name 失败: {str(e)}")
name = ''
return uuids, imports, natives, paths, types, name
def determine_file_name(path):
"""根据路径确定文件名,处理spriteFrame和texture的特殊情况"""
parts = path.split('/')
last_part = parts[-1]
if 'spriteFrame' in parts or 'texture' in parts:
for i in range(len(parts) - 2, -1, -1):
if parts[i] not in ['spriteFrame', 'texture']:
return parts[i]
return last_part
else:
return last_part.split('.')[0] if '.' in last_part else last_part
def get_final_path(path, file_name, suffix):
"""生成最终的文件路径,保持原后缀不变"""
parts = path.split('/')
if parts[-1] in ['spriteFrame', 'texture']:
parts = parts[:-1]
parts.append(f"{file_name}{suffix}")
return "/".join(parts)
def get_unique_path(path, written_paths):
"""获取唯一的文件路径,如果路径已存在则添加编号"""
base_path, ext = os.path.splitext(path)
counter = 1
new_path = path
while new_path in written_paths:
new_path = f"{base_path}({counter}){ext}"
counter += 1
return new_path
def extract_urls(uuids, imports, natives, paths, types, name, json_file_name):
download_links = []
target_names = []
written_paths = set()
# 基础URL保持不变
base_url = "https://d9ptw0bza8yos.cloudfront.net/1/web-mobile/assets/"
for serial, data in paths.items():
path = data['path']
num = data['num']
print(f"\n[处理路径] 序列号: {serial}")
type_idx = num + 1
print(f"[INFO] 计算type_idx: {type_idx}")
if type_idx >= len(types):
print(f"[ERROR] type索引{type_idx}超出types范围(共{len(types)}项),跳过处理")
continue
processed_line = types[type_idx].strip()
print(f"[INFO] types第{type_idx}项 - 处理后: '{processed_line}'")
if not processed_line:
houzhui_list = []
else:
houzhui_list = [h.strip() for h in processed_line.split(',')]
file_name = determine_file_name(path)
print(f"[INFO] 文件名: {file_name}")
uuid_idx = int(serial)
if uuid_idx >= len(uuids):
print(f"[ERROR] uuid索引{uuid_idx}超出uuids范围(共{len(uuids)}个),跳过处理")
continue
uuid_val = uuids[uuid_idx]
uu = uuid_val[:2] if uuid_val else ''
print(f"[INFO] uuid: {uuid_val}, uu: {uu}")
# 遍历所有后缀,根据后缀类型选择imports或natives
for h in houzhui_list:
if not h:
continue # 跳过空后缀
print(f"[INFO] 处理后缀: {h}")
# 核心判断:仅.json和.atlas使用imports,其他用natives
if h in ['.json', '.atlas']:
hash_val = imports.get(serial, '')
url_dir = 'import' # 对应imports的URL路径
# 对于.atlas后缀,URL使用.json
processed_h = '.json' if h == '.atlas' else h
else:
hash_val = natives.get(serial, '')
url_dir = 'native' # 对应natives的URL路径
# 特殊处理:.skel替换为.bin(仅在非imports后缀时生效)
processed_h = '.bin' if h == '.skel' else h
# 检查哈希是否存在
if not hash_val:
print(f"[WARN] 未找到序列号{serial}对应的{url_dir} hash,跳过{h} URL")
continue
# 生成URL
url = f"{base_url}{name}/{url_dir}/{uu}/{uuid_val}.{hash_val}{processed_h}"
download_links.append(url)
print(f"[SUCCESS] 生成{url_dir} URL: {url.strip()}")
# 生成保存路径:对于.atlas后缀,保存为.atlas.json
save_suffix = '.atlas.json' if h == '.atlas' else h
final_path = get_final_path(path, file_name, save_suffix)
final_path = get_unique_path(final_path, written_paths)
written_paths.add(final_path)
target_names.append(final_path)
return download_links, target_names
def get_simplified_timestamp():
now = datetime.now()
return now.strftime("%y%m%d%H%M%S")
def silent_download(link, save_path):
"""静默下载函数,不输出任何提示,仅返回成功与否"""
try:
# 检查文件是否已存在且大小匹配
if os.path.exists(save_path):
local_size = os.path.getsize(save_path)
headers = requests.head(link, allow_redirects=True, timeout=10, verify=False).headers
remote_size = int(headers.get('Content-Length', 0))
if local_size == remote_size and remote_size != 0:
return True
# 下载文件
response = requests.get(link, stream=True, timeout=10, verify=False)
response.raise_for_status()
with open(save_path, 'wb') as file:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
file.write(chunk)
# 验证文件大小
if os.path.exists(save_path) and os.path.getsize(save_path) > 0:
return True
except:
return False
def process_atlas_json_file(file_path):
"""处理单个.atlas.json文件,提取数据并生成.atlas和.json文件"""
try:
# 读取文件内容
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 提取基础文件名(去掉.atlas.json后缀)
base_name = os.path.splitext(os.path.splitext(file_path)[0])[0]
# 第一步:提取纹理集信息到.atlas文件
# 匹配到第一个引号处结束
pattern_atlas = r'(\\n.{3,50}\.png\\nsize:.*?)(?=")'
full_matches = re.findall(pattern_atlas, content, re.DOTALL)
atlas_content = ""
if not full_matches:
print(f"警告: 在文件 {file_path} 中未找到.atlas数据")
else:
print(f"找到 {len(full_matches)} 个图片区块")
for match in full_matches:
# 将字符串形式的\\n转换为实际换行符
processed = match.replace('\\n', '\n')
atlas_content += processed
# 写入.atlas文件
atlas_file = f"{base_name}.atlas"
with open(atlas_file, 'w', encoding='utf-8') as f:
f.write(atlas_content)
print(f"已生成: {atlas_file}")
# 第二步:提取骨架信息到.json文件
# 从{"skeleton":{"hash":开始,到第一个{对应的}结束
# 首先找到起始位置
start_pattern = r'{"skeleton":{"hash":'
start_match = re.search(start_pattern, content)
if start_match:
start_index = start_match.start()
# 起始位置的第一个{是start_index处的{
# 我们需要找到与这个{匹配的}
open_braces = 0
end_index = -1
# 从起始位置开始遍历
for i in range(start_index, len(content)):
if content[i] == '{':
open_braces += 1
elif content[i] == '}':
open_braces -= 1
# 当所有打开的花括号都被关闭时,找到匹配的结束位置
if open_braces == 0:
end_index = i + 1 # 包含这个}
break
if end_index != -1:
json_content = content[start_index:end_index]
# 写入.json文件
json_file = f"{base_name}.json"
with open(json_file, 'w', encoding='utf-8') as f:
f.write(json_content)
print(f"已生成: {json_file}")
else:
print(f"警告: 在文件 {file_path} 中未找到匹配的花括号")
else:
print(f"警告: 在文件 {file_path} 中未找到骨架起始标记")
except Exception as e:
print(f"处理文件 {file_path} 时出错: {str(e)}")
def download_file(link, save_path, total_files, index, digit_count, success_files, failed_files, target_names, name):
"""下载文件并按指定路径保存,优化重试逻辑(已移除解密,新增MP3替代后缀)"""
max_retries = 5
retry_delay = 2
download_dir = os.path.join(os.getcwd(), "downloader", name)
final_save_path = os.path.join(download_dir, save_path)
os.makedirs(os.path.dirname(final_save_path), exist_ok=True)
# 生成任务标识 [当前任务数/总任务数]
task_id = f"[{index}/{total_files}]"
# 1. 处理PNG文件(保留原替代逻辑,移除解密)
if final_save_path.lower().endswith('.png'):
alternatives = [
('.jpg', link[:-4] + '.jpg'),
('.webp', link[:-4] + '.webp')
]
success = False
for attempt in range(max_retries):
# 尝试下载原始PNG
if silent_download(link, final_save_path):
success = True
break
# 尝试所有替代格式
alt_success = False
for ext, alt_link in alternatives:
alt_save_path = final_save_path[:-4] + ext
if silent_download(alt_link, alt_save_path):
success_files.append(alt_save_path)
alt_success = True
final_save_path = alt_save_path
break
if alt_success:
success = True
break
time.sleep(retry_delay)
if success:
success_files.append(final_save_path)
print(f"{task_id} [成功] 下载完成: {final_save_path}")
return
else:
# 错误处理
try:
response = requests.head(link, allow_redirects=True, timeout=10, verify=False)
status_code = response.status_code
if status_code == 404:
print(f"{task_id} [失败] 404 未找到: {link}")
elif status_code == 403:
print(f"{task_id} [失败] 403 权限不足: {link}")
else:
print(f"{task_id} [失败] 所有替代格式下载失败: {link}")
except:
print(f"{task_id} [失败] 所有替代格式下载失败: {link}")
failed_files.append(link)
return
# 2. 新增:处理MP3文件,添加.wav和.ogg替代后缀(仿照PNG逻辑)
elif final_save_path.lower().endswith('.mp3'):
alternatives = [
('.wav', link[:-4] + '.wav'),
('.ogg', link[:-4] + '.ogg')
]
success = False
for attempt in range(max_retries):
# 尝试下载原始MP3
if silent_download(link, final_save_path):
success = True
break
# 尝试所有音频替代格式
alt_success = False
for ext, alt_link in alternatives:
alt_save_path = final_save_path[:-4] + ext
if silent_download(alt_link, alt_save_path):
success_files.append(alt_save_path)
alt_success = True
final_save_path = alt_save_path
break
if alt_success:
success = True
break
time.sleep(retry_delay)
if success:
success_files.append(final_save_path)
print(f"{task_id} [成功] 下载完成: {final_save_path}")
return
else:
# 错误处理
try:
response = requests.head(link, allow_redirects=True, timeout=10, verify=False)
status_code = response.status_code
if status_code == 404:
print(f"{task_id} [失败] 404 未找到: {link}")
elif status_code == 403:
print(f"{task_id} [失败] 403 权限不足: {link}")
else:
print(f"{task_id} [失败] 所有音频替代格式下载失败: {link}")
except:
print(f"{task_id} [失败] 所有音频替代格式下载失败: {link}")
failed_files.append(link)
return
# 3. 处理其他格式文件(移除解密相关代码)
success = False
for attempt in range(max_retries):
if silent_download(link, final_save_path):
success = True
break
time.sleep(retry_delay)
if success:
success_files.append(final_save_path)
print(f"{task_id} [成功] 下载完成: {final_save_path}")
# 处理atlas.json文件的提取逻辑(保留)
if final_save_path.lower().endswith('.atlas.json'):
print(f"{task_id} [处理] 开始提取.atlas和.json文件: {final_save_path}")
process_atlas_json_file(final_save_path)
return
else:
try:
response = requests.head(link, allow_redirects=True, timeout=10, verify=False)
status_code = response.status_code
if status_code == 404:
print(f"{task_id} [失败] 404 未找到: {link}")
elif status_code == 403:
print(f"{task_id} [失败] 403 权限不足: {link}")
else:
print(f"{task_id} [失败] 下载失败: {link}")
except:
print(f"{task_id} [失败] 下载失败: {link}")
failed_files.append(link)
return
def count_files_in_directory(directory):
"""计算目录内文件总数"""
file_count = 0
for root, _, files in os.walk(directory):
file_count += len(files)
return file_count
def main():
# 清空已有output.txt和error.txt(如果存在)
with open('output.txt', 'w', encoding='utf-8') as f:
pass # 仅清空文件
with open('error.txt', 'w', encoding='utf-8') as f:
pass # 仅清空文件
# 查找包含config的json文件
json_files = [f for f in os.listdir('.') if f.endswith('.json') and 'config' in f]
if not json_files:
print("❌ 未找到包含config的json文件")
return
overall_success_files = []
overall_failed_files = []
overall_total_files = 0
for json_path in json_files:
json_file_name = os.path.basename(json_path)
print(f"\n开始处理文件: {json_file_name}")
# 解析JSON文件
uuids, imports, natives, paths, types, name = extract_arrays_from_json(json_path)
if uuids is None:
continue
# 提取URL
download_links, target_names = extract_urls(uuids, imports, natives, paths, types, name, json_file_name)
# 将生成的URL写入output.txt(追加模式)
with open('output.txt', 'a', encoding='utf-8') as f:
for url in download_links:
f.write(url + '\n')
print(f"✅ 已将 {len(download_links)} 个URL写入 output.txt")
# 下载文件
total_files = len(download_links)
digit_count = len(str(total_files))
download_dir = os.path.join(os.getcwd(), "downloader")
os.makedirs(download_dir, exist_ok=True)
success_files = []
failed_files = []
print(f"✅ 开始下载 {total_files} 个文件到 {download_dir}")
print("-" * 60)
# 多线程下载
with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor:
futures = []
for idx, (link, path) in enumerate(zip(download_links, target_names), 1):
futures.append(executor.submit(
download_file, link, path, total_files, idx, digit_count,
success_files, failed_files, target_names, name
))
for future in futures:
future.result()
# 输出统计结果
print("\n" + "=" * 60)
print(f"📊 本次下载统计 | 成功: {len(success_files)} | 失败: {len(failed_files)}")
if failed_files:
print("\n❌ 本次下载失败的链接:")
for link in failed_files:
print(f" - {link}")
# 将本次失败的URL写入error.txt(追加模式)
with open('error.txt', 'a', encoding='utf-8') as f:
for link in failed_files:
f.write(link + '\n')
else:
print("\n🎉 本次所有文件下载完成!")
actual_count = count_files_in_directory(os.path.join(download_dir, name))
print(f"\n📁 本次实际文件数: {actual_count} | 本次目标文件数: {total_files}")
if actual_count == total_files:
print("✅ 本次数量一致,下载完成")
else:
print(f"❌ 本次数量不一致,可能存在文件生成或删除异常")
overall_success_files.extend(success_files)
overall_failed_files.extend(failed_files)
overall_total_files += total_files
# 输出整体统计结果
print("\n" + "=" * 60)
print(f"📊 整体下载统计 | 成功: {len(overall_success_files)} | 失败: {len(overall_failed_files)}")
if overall_failed_files:
print("\n❌ 整体下载失败的链接已记录到 error.txt")
else:
print("\n🎉 所有文件下载完成!")
overall_actual_count = count_files_in_directory(os.path.join(os.getcwd(), "downloader"))
print(f"\n📁 整体实际文件数: {overall_actual_count} | 整体目标文件数: {overall_total_files}")
if overall_actual_count == overall_total_files:
print("✅ 整体数量一致,下载完成")
else:
print(f"❌ 整体数量不一致,可能存在文件生成或删除异常")
input("\n按回车键退出...")
if __name__ == "__main__":
try:
# 移除numpy依赖(原解密功能使用,现已删除)
import requests
except ImportError as e:
missing = str(e).split("'")[1]
print(f"❌ 请先安装依赖: pip install {missing}")
exit(1)
main()
关于Lumina Prognosis的这个config文件
使用脚本后后出错,没有输入任何url到output.txt中
以及在downloader\resources\Texture\HCG\Main目录下存在许多无法打开的jpg文件
这游戏的属性,等级这些数据在哪里查看,拔取的数据中没有找到
请问有大佬知道是什么情况吗,如何处理
开始处理文件: config.92e64.json
读取 JSON 文件成功,长度: 50451 字符
提取到 uuids: 1372 条
提取到 import: 14 项
提取到 native: 114 项
提取到 paths: 2 项
提取到 types: 2 项
提取到 name: main
[处理路径] 序列号: 131
[INFO] 计算type_idx: 1
[INFO] types第1项 - 处理后: ‘’
[INFO] 文件名: default-physics-material
[INFO] uuid: ba21476f-2866-4f81-9c4d-6e359316e448, uu: ba
[处理路径] 序列号: 162
[INFO] 计算type_idx: 0
[INFO] types第0项 - 处理后: ‘’
[INFO] 文件名: builtin-forward
[INFO] uuid: fd8ec536-a354-4a17-9c74-4f3883c378c8, uu: fd
已将 0 个URL写入 output.txt
在修改脚本后,则提示[WARN] 未找到序列号162对应的import hash
エンジェリックリンクR 也很像
config文件:
https://ancl.jp/game/client/pc/assets/resources/config.c426e.json
base_url:
https://ancl.jp/game/client/pc/assets/
產出來的URL確實是有檔案
但會顯示403權限不足
就我所知的圖片跟mp4檔URL格式不太一樣
| https://ancl.jp/img/game/event/(StoryID)/image/(FileName).jpg |
|---|
| https://ancl.jp/img/game/event/(StoryID)/movie/(FileName).mp4 |
| 以下範例 |
拿错config文件了吧。我脚本是根据path的索引计算的,所以只有两个
pcr我进不去不知道情况。
我是VPN跳日本IP,但不是每個日本IP都能用,這遊戲很奇怪,換過幾次IP就能成功取得了
你发config文件我看看吧。顺便发下所有url完整的请求头和请求体。
angelic.rar (220.5 KB)