""" https://github.com/allanrbo/pb.py """ import struct _MAX_VARINT_BYTES = 10 # uint64 max _MAX_NESTING_DEPTH = 100 def _read_varint(b, i): shift = 0 n = 0 read = 0 blen = len(b) while True: if i >= blen: raise ValueError("truncated varint") c = b[i] i += 1 read += 1 n |= (c & 0x7F) << shift if not (c & 0x80): break if read >= _MAX_VARINT_BYTES: raise ValueError("varint too long") shift += 7 return n, i def _write_varint(n): if n < 0: raise ValueError("varint requires non-negative integer; use 'sint' for signed") out = bytearray() while True: to_write = n & 0x7F n >>= 7 if n: out.append(to_write | 0x80) else: out.append(to_write) break return bytes(out) def zigzag_encode(n): return (n << 1) ^ (n >> 63) def zigzag_decode(n): return (n >> 1) ^ -(n & 1) def _zz32(n): # zigzag for 32-bit signed n = int(n) if n < -0x80000000 or n > 0x7FFFFFFF: raise ValueError("sint32 out of range") v = (n << 1) ^ (n >> 31) return v & 0xFFFFFFFF def _i32_from_uvarint(v): v &= 0xFFFFFFFF return v - 0x100000000 if v >= 0x80000000 else v def _i64_from_uvarint(v): return v - 0x10000000000000000 if v >= 0x8000000000000000 else v def _i32_to_uvarint(n): # represent signed 32-bit as 64-bit varint two's complement n = int(n) if n < 0: u = (n & 0xFFFFFFFF) | 0xFFFFFFFF00000000 else: u = n & 0xFFFFFFFF return u & 0xFFFFFFFFFFFFFFFF def _read_fixed32(b, i): if i + 4 > len(b): raise ValueError("truncated fixed32") v = struct.unpack_from(" len(b): raise ValueError("truncated fixed64") v = struct.unpack_from(" 1: raise ValueError(f"oneof group '{grp}' has multiple fields set: {ks}") # Emit fields in caller-provided order for key, val in values.items(): # Allow using field names; map to number if needed if isinstance(key, int): field = key else: field = ns["names"].get(key) if field is None: print(ns["names"]) raise KeyError(f"unknown field name '{key}' in schema") spec = ns["fields"].get(field) if not spec: raise KeyError(f"field {field} not defined in schema") if spec["kind"] == "packed": seq = val if isinstance(val, list) else [val] packed = _encode_packed(seq, spec["type"]) _ld_write(field, packed) continue # For repeated, the value must be a list; otherwise wrap singletons is_repeated = (spec["kind"] == "repeated") vals = val if isinstance(val, list) else ([val] if is_repeated else [val]) for v in vals: t = spec.get("type") if spec else None if spec["kind"] == "scalar" and t in {"varint", "uint64"}: _var_write(field, int(v)) elif spec["kind"] == "scalar" and t == "uint32": _var_write(field, int(v) & 0xFFFFFFFF) elif spec["kind"] == "scalar" and t == "int64": _var_write(field, int(v) & 0xFFFFFFFFFFFFFFFF) elif spec["kind"] == "scalar" and t == "int32": u = _i32_to_uvarint(v) _var_write(field, u) elif spec["kind"] == "scalar" and t in {"sint", "sint64"}: _var_write(field, zigzag_encode(int(v))) elif spec["kind"] == "scalar" and t == "sint32": zz = _zz32(v) _var_write(field, zz) elif spec["kind"] == "scalar" and t == "fixed32": out += _write_key(field, 5) out += _write_fixed32(int(v)) elif spec["kind"] == "scalar" and t == "sfixed32": out += _write_key(field, 5) out += _write_sfixed32(v) elif spec["kind"] == "scalar" and t == "fixed64": out += _write_key(field, 1) out += _write_fixed64(int(v)) elif spec["kind"] == "scalar" and t == "sfixed64": out += _write_key(field, 1) out += _write_sfixed64(v) elif spec["kind"] == "scalar" and t == "float": out += _write_key(field, 5) out += struct.pack(" _max_depth: raise ValueError("group nesting too deep") b = memoryview(buf).toreadonly() if not isinstance(buf, bytearray) else buf n = len(b) while i < n: key, i = _read_varint(b, i) field = key >> 3 wt = key & 7 if wt == 4: # end group if field == group_field: return i # группа закрыта, возвращаем позицию после ключа end group else: raise ValueError(f"mismatched end group: expected {group_field}, got {field}") # Пропускаем содержимое в зависимости от wire type if wt == 0: # varint _, i = _read_varint(b, i) elif wt == 1: # 64-bit i += 8 elif wt == 2: # length-delimited length, i = _read_varint(b, i) if i + length > n: raise ValueError("truncated length-delimited inside group") i += length elif wt == 3: # start group (вложенная) i = _skip_group(b, i, field, _depth+1, _max_depth) elif wt == 5: # 32-bit i += 4 else: raise ValueError(f"unsupported wire type {wt} inside group") raise ValueError("group not terminated") def decode(buf, schema=None, _depth=0, _max_depth=_MAX_NESTING_DEPTH): if _depth > _max_depth: raise ValueError("message nesting too deep") ns = _normalize_schema(schema) if isinstance(buf, bytearray): b = buf else: b = memoryview(buf).toreadonly() out = {} oneof_seen = {} i = 0 n = len(b) while i < n: key, i = _read_varint(b, i) field = key >> 3 wt = key & 7 spec = ns["fields"].get(field) if wt == 0: try: v, i = _read_varint(b, i) except Exception as rve: print("key=",key, "i=",i, "field=",field, "wt=", wt,"spec=",spec, "ns-fields=", ns["fields"]) raise rve if spec: t = spec.get("type") if t in {"sint", "sint64"}: v = zigzag_decode(v) elif t == "sint32": v = _i32_from_uvarint(zigzag_decode(v)) elif t == "int64": v = _i64_from_uvarint(v) elif t == "int32": v = _i32_from_uvarint(v) elif t == "uint32": v &= 0xFFFFFFFF elif t in {"uint64", "varint"}: pass elif t == "bool": v = bool(v) elif wt == 1: v, i = _read_fixed64(b, i) if spec: tt = spec.get("type") if tt == "double": v = struct.unpack("= 0x8000000000000000 else v elif wt == 2: length, i = _read_varint(b, i) if i + length > n: raise ValueError("truncated length-delimited field") chunk = bytes(b[i:i+length]) i += length t = spec.get("type") if spec else None if spec and spec.get("kind") == "message": v = decode(chunk, spec["schema"], _depth=_depth+1, _max_depth=_max_depth) elif spec and spec.get("kind") == "repeated" and "type" in spec and t in {"varint", "uint64", "uint32", "int64", "int32", "sint", "sint64", "sint32", "fixed32", "fixed64", "sfixed32", "sfixed64", "float", "double", "bool"}: # Proto3 default: repeated numeric are packed; flatten into list v = _decode_packed(chunk, t) elif spec and spec.get("kind") == "repeated" and "schema" in spec: v = decode(chunk, spec["schema"], _depth=_depth+1, _max_depth=_max_depth) elif spec and spec.get("kind") == "packed": v = _decode_packed(chunk, t) elif t == "string": try: v = chunk.decode("utf-8") except UnicodeDecodeError: v = None elif t == "bytes" or spec is None: v = chunk else: v = chunk #elif wt == 3:#AI SLOOP # # Пропускаем всю группу, данные не сохраняем # #i = _skip_group(b, i, field, _depth, _max_depth) # continue # ничего не добавляем в out #elif wt == 4:#AI SLOOP # # Если мы здесь, значит end group встречен не вовремя — ошибка # #raise ValueError("unexpected end group") # continue elif wt == 5: v, i = _read_fixed32(b, i) if spec: tt = spec.get("type") if tt == "float": v = struct.unpack("= 0x80000000 else v else: print("key=",key, "i=",i, "field=",field, "wt=", wt,"spec=",spec, "ns-fields=", ns["fields"]) raise ValueError("unsupported wire type {}".format(wt)) out_key = (spec.get("name") if spec else field) if isInt(out_key): out_key = f"unk_proto_{out_key}" if spec: is_repeated = spec.get("kind") in {"repeated", "packed"} if is_repeated: # Repeated scalar (numeric packed yields list 'v'), or repeated message (dict 'v') seq = v if isinstance(v, list) else [v] out.setdefault(out_key, []).extend(seq) else: # Singleton (scalar or message) — store as value, not list grp = spec.get("oneof") if grp: prev = oneof_seen.get(grp) if prev and prev in out: try: del out[prev] except Exception: out.pop(prev, None) oneof_seen[grp] = out_key out[out_key] = v else: # Unknown field: if first occurrence, store scalar; if multiple, upgrade to list if out_key in out: prev = out[out_key] if isinstance(prev, list): prev.append(v) else: out[out_key] = [prev, v] else: out[out_key] = v return out # Packed repeated element decode def _decode_packed(chunk, elem_type): b = memoryview(chunk).toreadonly() i = 0 out = [] while i < len(b): if elem_type in {"varint", "uint64"}: v, i = _read_varint(b, i) out.append(v) elif elem_type == "uint32": v, i = _read_varint(b, i) out.append(v & 0xFFFFFFFF) elif elem_type == "int64": v, i = _read_varint(b, i) out.append(_i64_from_uvarint(v)) elif elem_type == "int32": v, i = _read_varint(b, i) out.append(_i32_from_uvarint(v)) elif elem_type == "sint": v, i = _read_varint(b, i) out.append(zigzag_decode(v)) elif elem_type == "sint64": v, i = _read_varint(b, i) out.append(zigzag_decode(v)) elif elem_type == "sint32": v, i = _read_varint(b, i) out.append(_i32_from_uvarint(zigzag_decode(v))) elif elem_type == "fixed32": v, i = _read_fixed32(b, i) out.append(v) elif elem_type == "sfixed32": v, i = _read_fixed32(b, i) v = v - 0x100000000 if v >= 0x80000000 else v out.append(v) elif elem_type == "fixed64": v, i = _read_fixed64(b, i) out.append(v) elif elem_type == "sfixed64": v, i = _read_fixed64(b, i) v = v - 0x10000000000000000 if v >= 0x8000000000000000 else v out.append(v) elif elem_type == "float": v, i = _read_fixed32(b, i) out.append(struct.unpack("