简单解析了下,看起来没啥大问题?
import msgpack
import json
import base64
import struct
import lz4.block
from typing import Dict, List, Any
INPUT_FILE = 'master-data.decrypted'
OUTPUT_FILE = 'master-data.json'
class MasterMemoryExtTypeHandler:
@staticmethod
def decode_ext_type(code: int, data: bytes) -> Any:
if code == 99 and len(data) >= 5 and data[0] == 0xd2:
# ExtType 99: 0xd2 + 4字节大小 + LZ4压缩数据
uncompressed_size = struct.unpack('>I', data[1:5])[0]
compressed_data = data[5:]
try:
decompressed = lz4.block.decompress(compressed_data, uncompressed_size=uncompressed_size)
return msgpack.unpackb(decompressed, raw=False, strict_map_key=False)
except Exception:
pass
class MasterMemoryJSONEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, bytes):
try:
return obj.decode('utf-8')
except UnicodeDecodeError:
return base64.b64encode(obj).decode('ascii')
elif isinstance(obj, msgpack.ExtType):
return MasterMemoryExtTypeHandler.decode_ext_type(obj.code, obj.data)
elif obj.__class__.__name__ == 'Timestamp':
return {
"__type__": "Timestamp",
"__seconds__": getattr(obj, 'seconds', None),
"__nanoseconds__": getattr(obj, 'nanoseconds', None)
}
return super().default(obj)
def extract_table_data(header: Dict[str, List[int]], raw_data: bytes) -> Dict[str, Any]:
tables = {}
for table_name, table_info in header.items():
if not isinstance(table_info, list) or len(table_info) < 2:
continue
offset, length = table_info[0], table_info[1]
try:
table_data = raw_data[offset:offset + length]
if len(table_data) == 0:
tables[table_name] = {"objects_count": 0, "data": []}
continue
try:
result = msgpack.unpackb(
table_data,
raw=False,
strict_map_key=False,
ext_hook=MasterMemoryExtTypeHandler.decode_ext_type
)
objects = result if isinstance(result, list) else [result]
except Exception:
unpacker = msgpack.Unpacker(
raw=False,
strict_map_key=False,
ext_hook=MasterMemoryExtTypeHandler.decode_ext_type
)
unpacker.feed(table_data)
objects = [obj for obj in unpacker]
tables[table_name] = {
"objects_count": len(objects),
"data": objects
}
except Exception as e:
tables[table_name] = {
"error": str(e),
"offset": offset,
"length": length
}
return tables
def convert_master_memory_to_json():
try:
with open(INPUT_FILE, 'rb') as f:
raw_data = f.read()
print(f"文件读取成功,大小: {len(raw_data)} 字节")
except FileNotFoundError:
print(f"错误: 找不到文件 {INPUT_FILE}")
return
try:
unpacker = msgpack.Unpacker(
raw=False,
strict_map_key=False,
ext_hook=MasterMemoryExtTypeHandler.decode_ext_type
)
unpacker.feed(raw_data)
all_objects = [obj for obj in unpacker]
print(f"解析成功!总共 {len(all_objects)} 个对象")
except Exception as e:
print(f"解析失败: {e}")
return
result = {"tables": {}, "compressed_data": {}}
if all_objects and isinstance(all_objects[0], dict):
header = all_objects[0]
tables = extract_table_data(header, raw_data)
result["tables"] = tables
compressed_count = 0
for i, obj in enumerate(all_objects[1:], 1):
if isinstance(obj, (dict, list)) and not isinstance(obj, str):
result["compressed_data"][f"data_{i}"] = obj
compressed_count += 1
if compressed_count > 0:
print(f"提取了 {compressed_count} 个压缩数据块")
try:
with open(OUTPUT_FILE, 'w', encoding='utf-8') as f:
json.dump(result, f, indent=2, ensure_ascii=False, cls=MasterMemoryJSONEncoder)
print(f"✓ 转换完成: {OUTPUT_FILE}")
except Exception as e:
print(f"保存失败: {e}")
if __name__ == "__main__":
convert_master_memory_to_json()