Browse Source

any sh1t

main
gsd 6 months ago
parent
commit
2248edfe62
  1. 16
      pipboyAPI/api_models.py
  2. 24
      pipboyAPI/api_server.py
  3. 4
      pipboyAPI/test/pingpong.py

16
pipboyAPI/api_models.py

@ -1,4 +1,18 @@
from fastapi import FastAPI
from fastapi import FastAPI, WebSocket
from typing import List
class WsEvents:
UNKNOWN = 0
RESERVED = 1
SYSSTATS = 2#{"cpu": RandomUtils.getRandom(0.0, 99.9), "mem": RandomUtils.getRandom(0, 400)}
SPACESTATS = 3#{"free": RandomUtils.getRandom(0, 40), "total": 50, "format": "GB"}
WIFI = 4#{"state":"", "ap":"", "ip":""}
class WsDevice:
UNKNOWN = 0
SELF = 1
EXTERNAL = 2
class API_CORE:
app: FastAPI
ws_clients: List[WebSocket] = []

24
pipboyAPI/api_server.py

@ -7,7 +7,7 @@ from contextlib import asynccontextmanager
#sys
import asyncio
from typing import List, Dict
import os, sys
import os, sys, traceback
#internal
from api_models import API_CORE
@ -26,6 +26,28 @@ class API(API_CORE):
def run(self):#todo filter
UR(self.app, host="0.0.0.0", port=14088)
def ws(self):
async def wshello(websocket: WebSocket):
for ext_path, ext in self.exts.items():
await ext.wshello(websocket)
@self.app.webhooks("/api/ws")
async def ws(websocket: WebSocket):
await websocket.accept()
self.ws_clients.append(websocket)
await wshello(websocket)
try:
while True:
await websocket.receive_json()
except:
pass
finally:
self.ws_clients.remove(websocket)
try:
await websocket.close()
except:
pass
def extensionLoader(self, search_paths = []):
if type(search_paths) == str:
search_paths = [search_paths]

4
pipboyAPI/test/pingpong.py

@ -1,4 +1,5 @@
from api_models import API_CORE
from fastapi import WebSocket
from time import time
import asyncio
@ -16,6 +17,9 @@ class API_EXTENSION:
async def call():
return self.response
async def wshello(self, websocket: WebSocket):
await websocket.send_json({})
@property
def tasks(self):
async def updateTs():

Loading…
Cancel
Save