有人会解ero新出的吗?

简单解析了下,看起来没啥大问题?

import msgpack
import json
import base64
import struct
import lz4.block
from typing import Dict, List, Any


INPUT_FILE = 'master-data.decrypted'
OUTPUT_FILE = 'master-data.json'

class MasterMemoryExtTypeHandler:
    @staticmethod
    def decode_ext_type(code: int, data: bytes) -> Any:
        if code == 99 and len(data) >= 5 and data[0] == 0xd2:
            # ExtType 99: 0xd2 + 4字节大小 + LZ4压缩数据
            uncompressed_size = struct.unpack('>I', data[1:5])[0]
            compressed_data = data[5:]

            try:
                decompressed = lz4.block.decompress(compressed_data, uncompressed_size=uncompressed_size)
                return msgpack.unpackb(decompressed, raw=False, strict_map_key=False)
            except Exception:
                pass

class MasterMemoryJSONEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, bytes):
            try:
                return obj.decode('utf-8')
            except UnicodeDecodeError:
                return base64.b64encode(obj).decode('ascii')
        elif isinstance(obj, msgpack.ExtType):
            return MasterMemoryExtTypeHandler.decode_ext_type(obj.code, obj.data)
        elif obj.__class__.__name__ == 'Timestamp':
            return {
                "__type__": "Timestamp",
                "__seconds__": getattr(obj, 'seconds', None),
                "__nanoseconds__": getattr(obj, 'nanoseconds', None)
            }
        return super().default(obj)

def extract_table_data(header: Dict[str, List[int]], raw_data: bytes) -> Dict[str, Any]:
    tables = {}
    
    for table_name, table_info in header.items():
        if not isinstance(table_info, list) or len(table_info) < 2:
            continue
            
        offset, length = table_info[0], table_info[1]
        
        try:
            table_data = raw_data[offset:offset + length]
            
            if len(table_data) == 0:
                tables[table_name] = {"objects_count": 0, "data": []}
                continue
            
            try:
                result = msgpack.unpackb(
                    table_data, 
                    raw=False, 
                    strict_map_key=False,
                    ext_hook=MasterMemoryExtTypeHandler.decode_ext_type
                )
                objects = result if isinstance(result, list) else [result]
            except Exception:
                unpacker = msgpack.Unpacker(
                    raw=False, 
                    strict_map_key=False,
                    ext_hook=MasterMemoryExtTypeHandler.decode_ext_type
                )
                unpacker.feed(table_data)
                objects = [obj for obj in unpacker]
            
            tables[table_name] = {
                "objects_count": len(objects),
                "data": objects
            }
            
        except Exception as e:
            tables[table_name] = {
                "error": str(e),
                "offset": offset,
                "length": length
            }
    
    return tables

def convert_master_memory_to_json():
    try:
        with open(INPUT_FILE, 'rb') as f:
            raw_data = f.read()
        print(f"文件读取成功,大小: {len(raw_data)} 字节")
    except FileNotFoundError:
        print(f"错误: 找不到文件 {INPUT_FILE}")
        return

    try:
        unpacker = msgpack.Unpacker(
            raw=False, 
            strict_map_key=False,
            ext_hook=MasterMemoryExtTypeHandler.decode_ext_type
        )
        unpacker.feed(raw_data)
        all_objects = [obj for obj in unpacker]
        print(f"解析成功!总共 {len(all_objects)} 个对象")
    except Exception as e:
        print(f"解析失败: {e}")
        return
    
    result = {"tables": {}, "compressed_data": {}}
    
    if all_objects and isinstance(all_objects[0], dict):
        header = all_objects[0]
        tables = extract_table_data(header, raw_data)
        result["tables"] = tables
        

    compressed_count = 0
    for i, obj in enumerate(all_objects[1:], 1):
        if isinstance(obj, (dict, list)) and not isinstance(obj, str):
            result["compressed_data"][f"data_{i}"] = obj
            compressed_count += 1
    
    if compressed_count > 0:
        print(f"提取了 {compressed_count} 个压缩数据块")
    
    try:
        with open(OUTPUT_FILE, 'w', encoding='utf-8') as f:
            json.dump(result, f, indent=2, ensure_ascii=False, cls=MasterMemoryJSONEncoder)
        print(f"✓ 转换完成: {OUTPUT_FILE}")
    except Exception as e:
        print(f"保存失败: {e}")

if __name__ == "__main__":
    convert_master_memory_to_json()
2 个赞