Browse Source

payload decoder

main
gsd 4 months ago
parent
commit
4399e73704
  1. 10
      mesht_device.py
  2. 6
      packet_test.py
  3. 78
      payload_decrypter.py
  4. 1
      service.py
  5. 16
      webExtensions/publicEndpoints.py

10
mesht_device.py

@ -13,6 +13,8 @@ from protobufs_extra.device_metadata_proto import *
from protobufs_extra.file_info_proto import *
from protobuf_decoder.protobuf_decoder import Parser
from payload_decrypter import decrypt_aes128_ctr, CUSTOM_SCHEMA
import base64
DATA_SCHEMA = [
("varint", "portnum", 1),
@ -289,6 +291,14 @@ class MeshtDevice:
if self.udp_packet:
#mqtt или udp пакеты мы ЖЕСТКО перекодируем словно так и надо
dec = pb.decode(data, MESHPACKET_SCHEMA)
try:
d = decrypt_aes128_ctr(base64.b64encode(dec["encrypted"]), "1PG7OiApB1nwvP+rz05pAQ==", dec["id"], dec["from"])
dec["decoded"] = pb.decode(d, CUSTOM_SCHEMA)
del dec["encrypted"]
except:
print("cannot prepare payload")
pass
#пушим финальный пакет
data = pb.encode({"packet": dec}, FROMRADIO_SCHEMA)
fr = pb.decode(data, FROMRADIO_SCHEMA)

6
packet_test.py

@ -12,6 +12,12 @@ res = Parser().parse(data.hex())
print(res.to_dict(), "\n\n")
dec = pb.decode(data, MESHPACKET_SCHEMA)
print(dec, "\n\n")
from payload_decrypter import decrypt_aes128_ctr, CUSTOM_SCHEMA, _print_result
d = decrypt_aes128_ctr(base64.b64encode(dec["encrypted"]), "1PG7OiApB1nwvP+rz05pAQ==", dec["id"], dec["from"])
dec["decoded"] = pb.decode(d, CUSTOM_SCHEMA)
print("CUSTOM SCHEMA",pb.decode(d, CUSTOM_SCHEMA))
del dec["encrypted"]
print("DECODED", dec, "\n\n")
enc = pb.encode({"packet": dec}, FROMRADIO_SCHEMA)
decTwice = pb.decode(enc, FROMRADIO_SCHEMA)

78
payload_decrypter.py

@ -0,0 +1,78 @@
import base64
from Crypto.Cipher import AES
from Crypto.Util import Counter
import struct
def decrypt_aes128_ctr(enc_b64: str, key_b64: str, id_val: int, from_val:int) -> bytes:
# Декодируем ключ и шифротекст из base64
key = base64.b64decode(key_b64) # 16 байт для AES-128
encrypted_data = base64.b64decode(enc_b64)
# Формируем вектор инициализации (IV) аналогично pack('P', $id) . pack('P', $from) в PHP
# 'P' в PHP — это беззнаковое 64-битное целое в little-endian
iv = struct.pack('<Q', id_val) + struct.pack('<Q', from_val) # 16 байт
# В OpenSSL (и PHP) IV для CTR режима интерпретируется как начальное значение 128-битного счётчика,
# которое увеличивается как big-endian целое.
# Поэтому преобразуем IV в big-endian целое и создаём счётчик.
initial_counter = int.from_bytes(iv, byteorder='big')
ctr = Counter.new(128, initial_value=initial_counter, little_endian=False)
# Создаём шифр AES в режиме CTR и дешифруем
cipher = AES.new(key, AES.MODE_CTR, counter=ctr)
return cipher.decrypt(encrypted_data)
from protobufs_extra.node_info_lite_proto import NODE_INFO_LITE_SCHEME
CUSTOM_SCHEMA = [
("varint", "portnum", 1),
("bytes", "payload", 2),
("bool", "want_response", 3),
("uint32", "bitfield", 9),
]
def _print_result(data: bytes):
from mesht_device import DATA_SCHEMA, USER_SCHEMA
import pb
"""Выводит байты в hex и как текст (если читаемо)."""
print(f" hex: {data.hex()}")
try:
from protobuf_decoder.protobuf_decoder import Parser
res = Parser().parse(data.hex())
for k,v in res.to_dict().items():
print(k)
for field in v:
print(field)
except:
print("can't parse to protobuf")
pass
finally:
print()
try:
dec = pb.decode(data, CUSTOM_SCHEMA)
print(dec)
print(pb.decode(dec["payload"], USER_SCHEMA))
except:
print("can't decode protobuf")
import traceback
traceback.print_exc()
finally:
print()
try:
text = data.decode('utf-8')
print(f" text: {text}")
except UnicodeDecodeError:
print(" text: <не UTF-8>")
AO = "1PG7OiApB1nwvP+rz05pAQ=="
if __name__ == "__main__":
ciphertext_b64 = "LDsNJgdxZzQntg3KmvaRWRXwt39JAMkRq35RRZoK6lQ/0PnbQ4HJnXBYIjx+B5Y+k3ukq6cLevjra8ky1ANcLANGJTjH92lMbyu3rvwwfyO+4eJNOezJWoE="
id = 2876191354
fr0m = 313608604
key_b64 = AO
d = decrypt_aes128_ctr(ciphertext_b64, key_b64, id, fr0m)
_print_result(d)
print()

1
service.py

@ -95,6 +95,7 @@ class MeshMultiListener(MeshArgsParse):
self.defaultDeviceUUID = self.json_config[0]["uuid"]
self.defaultDeviceUUIDHash = md5hash(self.defaultDeviceUUID)
for device in self.devices:
print(md5hash(device.device_uuid), device)
self.devicesUuidHashToUuid[md5hash(device.device_uuid)] = device.device_uuid
async def meshListener(self, device: MeshtDevice, queue: asyncio.Queue):

16
webExtensions/publicEndpoints.py

@ -73,4 +73,18 @@ class WebExtension:
return Response(content=img, headers=headers, media_type="image/png")
except:
traceback.print_exc()
raise HTTPException(status_code=404, detail="not found tile")
raise HTTPException(status_code=404, detail="not found tile")
@self.app.get(f"{self.core.context}/devices")
async def devicesList():
response = []
count = 1
for hash in self.core.devicesUuidHashToUuid.keys():
obj = {
"hash": hash,
"default": hash == self.core.defaultDeviceUUIDHash,
"name": f"Устройство {count}"
}
response.append(obj)
count += 1
return response
Loading…
Cancel
Save