Browse Source

kamaz ai 1.2

master
gsd 2 weeks ago
parent
commit
2ae940a518
  1. 437
      other_ext/kamaz_ai.py

437
other_ext/kamaz_ai.py

@ -6,10 +6,431 @@ import aiohttp
import os
import numpy as np
import math
from collections import Counter
import re
from typing import List, Dict, Any, Optional, Union
class AnyFieldIsNotFilled(Exception):
pass
#ai slooop for ai slooop
EPSILON = 1e-6
PERMISSION_LEVEL_MAP = {
'DONT_HAVE': 0, 'NONE': 0, None: 0,
'FREE': 1,
'VIP': 2,
'MODERATOR': 3,
'ADMIN': 4, 'ROOT': 4
}
MAX_PERMISSION_LEVEL = 4
class KamazAI_v1_3:
def __init__(self, report_list):
self.k_neighbors = 5
self.weights = {
'steam_identity': 0.35,
'reason': 0.15,
'permissions': 0.10,
'derived_stats': 0.30,
'server': 0.10
}
self.historical_reports: List[Dict[str, Any]] = []
self.report_actions: Dict[int, List[str]] = {}
self.derived_stats: Optional[Dict[str, Dict[str, float]]] = None
self.derived_fields = [
'a_kd', 'a_kpm', 'a_dpm',
'r_kd', 'r_kpm', 'r_dpm',
'kd_diff',
'a_session_min', 'r_session_min'
]
self.initialize(report_list)
# ------------------------------------------------------------------
# Семантическое сравнение причин
# ------------------------------------------------------------------
@staticmethod
def _reason_similarity(reason1: str, reason2: str) -> float:
"""Jaccard-сходство по множествам слов (регистронезависимо)."""
# Извлекаем буквенно-цифровые токены
tokens1 = set(re.findall(r'\w+', reason1.lower()))
tokens2 = set(re.findall(r'\w+', reason2.lower()))
if not tokens1 or not tokens2:
return 1.0 if reason1 == reason2 else 0.0
intersection = tokens1 & tokens2
union = tokens1 | tokens2
return len(intersection) / len(union)
# ------------------------------------------------------------------
# Вспомогательные методы
# ------------------------------------------------------------------
@staticmethod
def _parse_permission_level(value: Union[str, int, None]) -> int:
if isinstance(value, str):
return PERMISSION_LEVEL_MAP.get(value.upper(), 0)
if value is None:
return 0
try:
return int(value)
except (ValueError, TypeError):
return 0
@staticmethod
def _compute_derived_features(report: Dict[str, Any]) -> Dict[str, float]:
a_min = max((report['a_seconds'] if report['a_seconds'] else 0) / 60.0, EPSILON)
r_min = max((report['r_seconds'] if report['r_seconds'] else 0) / 60.0, EPSILON)
a_kd = (report['a_kills'] if report['a_kills'] else 0) / max((report['a_deads'] if report['a_deads'] else 0), EPSILON)
r_kd = (report['r_kills'] if report['r_kills'] else 0) / max((report['r_deads'] if report['r_deads'] else 0), EPSILON)
kd_diff = r_kd - a_kd
return {
'a_kd': a_kd,
'a_kpm': (report['a_kills'] if report['a_kills'] else 0) / a_min,
'a_dpm': (report['a_deads'] if report['a_deads'] else 0) / a_min,
'r_kd': r_kd,
'r_kpm': (report['r_kills'] if report['r_kills'] else 0) / r_min,
'r_dpm': (report['r_deads'] if report['r_deads'] else 0) / r_min,
'kd_diff': kd_diff,
'a_session_min': a_min,
'r_session_min': r_min
}
def _compute_derived_stats(self) -> Dict[str, Dict[str, float]]:
stats = {f: {'min': float('inf'), 'max': float('-inf')} for f in self.derived_fields}
for r in self.historical_reports:
feats = self._compute_derived_features(r)
for f in self.derived_fields:
val = feats[f]
if val < stats[f]['min']:
stats[f]['min'] = val
if val > stats[f]['max']:
stats[f]['max'] = val
return stats
def _normalize_derived(self, feats: Dict[str, float]) -> Dict[str, float]:
norm = {}
for f in self.derived_fields:
min_val = self.derived_stats[f]['min']
max_val = self.derived_stats[f]['max']
if max_val - min_val < EPSILON:
norm[f] = 0.0
else:
norm[f] = (feats[f] - min_val) / (max_val - min_val)
return norm
def _similarity(self, r1: Dict[str, Any], r2: Dict[str, Any]) -> float:
score = 0.0
# 1. Совпадение связки игроков
#if r1['a_steam2'] == r2['a_steam2'] and r1['r_steam2'] == r2['r_steam2']:
# identity_score = 1.0
#elif r1['a_steam2'] == r2['a_steam2'] or r1['r_steam2'] == r2['r_steam2']:
# identity_score = 0.5
#else:
# identity_score = 0.0
#score += self.weights['steam_identity'] * identity_score
# 2. Причина – теперь семантически, через Jaccard
reason_score = self._reason_similarity(r1['reasons'], r2['reasons'])
score += self.weights['reason'] * reason_score
# 3. Права
a_lvl1 = self._parse_permission_level(r1['a_permition'])
a_lvl2 = self._parse_permission_level(r2['a_permition'])
r_lvl1 = self._parse_permission_level(r1['r_permition'])
r_lvl2 = self._parse_permission_level(r2['r_permition'])
dist_a = abs(a_lvl1 - a_lvl2) / MAX_PERMISSION_LEVEL
dist_r = abs(r_lvl1 - r_lvl2) / MAX_PERMISSION_LEVEL
perm_sim = 1.0 - (dist_a + dist_r) / 2.0
score += self.weights['permissions'] * perm_sim
# 4. Производные признаки
f1 = self._normalize_derived(self._compute_derived_features(r1))
f2 = self._normalize_derived(self._compute_derived_features(r2))
dist_sq = sum((f1[f] - f2[f]) ** 2 for f in self.derived_fields)
eucl_dist = math.sqrt(dist_sq / len(self.derived_fields))
derived_sim = 1.0 - min(eucl_dist, 1.0)
score += self.weights['derived_stats'] * derived_sim
# 5. Сервер
#srv_score = 1.0 if r1['srv'] == r2['srv'] else 0.0
#score += self.weights['server'] * srv_score
return score
# ------------------------------------------------------------------
# Публичные асинхронные методы
# ------------------------------------------------------------------
def initialize(self, reports: List[Dict[str, Any]]) -> None:
self.historical_reports = []
self.report_actions.clear()
for report in reports:
#if report.get("type", "IN_GAME") != "IN_GAME":
# continue
self.historical_reports.append(report)
try:
self.report_actions[report["id"]] = report["actions"]
except:
print("Cannot build action", report)
self.derived_stats = self._compute_derived_stats()
async def predict(self, new_report: Dict[str, Any]) -> Dict[str, Any]:
if not self.historical_reports or self.derived_stats is None:
raise RuntimeError("System not initialized. Call 'await initialize()' first.")
similarities = []
for hist in self.historical_reports:
sim = self._similarity(new_report, hist)
similarities.append((hist['id'], sim))
similarities.sort(key=lambda x: x[1], reverse=True)
top_k = similarities[:self.k_neighbors]
action_counter = Counter()
similar_ids = []
for rid, sim in top_k:
if sim > 0:
similar_ids.append(rid)
if rid in self.report_actions:
for act in self.report_actions[rid]:
action_counter[act] += 1
total = sum(action_counter.values())
suggestions = []
if total > 0:
for action, count in action_counter.most_common():
suggestions.append({
'action': action,
'confidence': round(count / total, 4)
})
else:
suggestions.append({'action': 'none', 'confidence': 1.0})
return {
'suggestions': suggestions,
'similar_reports': similar_ids
}
async def reload_actions(self, new_actions: List[Dict[str, Any]]) -> None:
for act in new_actions:
rid = act['report_id']
self.report_actions.setdefault(rid, []).append(act['action'])
#depricated
class KamazAI_v1_2:
"""
Асинхронная система рекомендации действий модератора на основе исторических жалоб.
Использование:
system = ReportDecisionSystem()
await system.initialize(reports, actions)
result = await system.predict(new_report)
"""
def __init__(self, report_list):
self.k_neighbors = 5
self.weights = {
'steam_identity': 0.35,
'reason': 0.15,
'permissions': 0.10,
'derived_stats': 0.30,
'server': 0.10
}
self.historical_reports: List[Dict[str, Any]] = []
self.report_actions: Dict[int, List[str]] = {}
self.derived_stats: Optional[Dict[str, Dict[str, float]]] = None
self.derived_fields = [
'a_kd', 'a_kpm', 'a_dpm',
'r_kd', 'r_kpm', 'r_dpm',
'kd_diff',
'a_session_min', 'r_session_min'
]
self.initialize(report_list)
# ---------- Вспомогательные методы ----------
@staticmethod
def _compute_derived_features(report: Dict[str, Any]) -> Dict[str, float]:
"""Вычисляет производные признаки для одной записи."""
try:
a_min = max(report['a_seconds'] / 60.0, EPSILON)
except:
a_min = 1
try:
r_min = max(report['r_seconds'] / 60.0, EPSILON)
except:
r_min = 1
try:
a_kd = report['a_kills'] / max(report['a_deads'], EPSILON)
except:
a_kd = 1
try:
r_kd = report['r_kills'] / max(report['r_deads'], EPSILON)
except:
r_kd = 1
kd_diff = r_kd - a_kd
try:
return {
'a_kd': a_kd,
'a_kpm': report['a_kills'] / a_min,
'a_dpm': report['a_deads'] / a_min,
'r_kd': r_kd,
'r_kpm': report['r_kills'] / r_min,
'r_dpm': report['r_deads'] / r_min,
'kd_diff': kd_diff,
'a_session_min': a_min,
'r_session_min': r_min
}
except:
raise AnyFieldIsNotFilled
def _compute_derived_stats(self) -> Dict[str, Dict[str, float]]:
"""Вычисляет min/max для всех производных признаков по историческим данным."""
stats = {f: {'min': float('inf'), 'max': float('-inf')} for f in self.derived_fields}
for r in self.historical_reports:
try:
feats = self._compute_derived_features(r)
for f in self.derived_fields:
val = feats[f]
if val < stats[f]['min']:
stats[f]['min'] = val
if val > stats[f]['max']:
stats[f]['max'] = val
except AnyFieldIsNotFilled:
continue
return stats
def _normalize_derived(self, feats: Dict[str, float]) -> Dict[str, float]:
"""Нормализует производные признаки в [0,1]."""
norm = {}
for f in self.derived_fields:
min_val = self.derived_stats[f]['min']
max_val = self.derived_stats[f]['max']
if max_val - min_val < EPSILON:
norm[f] = 0.0
else:
norm[f] = (feats[f] - min_val) / (max_val - min_val)
return norm
def _similarity(self, r1: Dict[str, Any], r2: Dict[str, Any]) -> float:
"""Вычисляет взвешенное сходство (0..1) между двумя отчётами."""
score = 0.0
# 1. Совпадение пары Steam
#if r1['a_steam2'] == r2['a_steam2'] and r1['r_steam2'] == r2['r_steam2']:
# identity_score = 1.0
#elif r1['a_steam2'] == r2['a_steam2'] or r1['r_steam2'] == r2['r_steam2']:
# identity_score = 0.5
#else:
# identity_score = 0.0
#score += self.weights['steam_identity'] * identity_score
# 2. Причина
reason_score = 1.0 if r1['reasons'] == r2['reasons'] else 0.0
score += self.weights['reason'] * reason_score
# 3. Права
perm_score = 0.0
if r1['a_permition'] == r2['a_permition']:
perm_score += 0.5
if r1['r_permition'] == r2['r_permition']:
perm_score += 0.5
score += self.weights['permissions'] * perm_score
# 4. Производные признаки
f1 = self._normalize_derived(self._compute_derived_features(r1))
f2 = self._normalize_derived(self._compute_derived_features(r2))
dist_sq = sum((f1[f] - f2[f]) ** 2 for f in self.derived_fields)
eucl_dist = math.sqrt(dist_sq / len(self.derived_fields))
derived_sim = 1.0 - min(eucl_dist, 1.0)
score += self.weights['derived_stats'] * derived_sim
# 5. Сервер
#srv_score = 1.0 if r1['srv'] == r2['srv'] else 0.0
#score += self.weights['server'] * srv_score
return score
# ---------- Публичные асинхронные методы ----------
def initialize(self, reports: List[Dict[str, Any]]) -> None:
"""
Загружает исторические данные и вычисляет нормализацию.
:param reports: список записей из user_reports
:param actions: список записей из user_reports_action
"""
self.historical_reports = reports
self.report_actions.clear()
for report in self.historical_reports:
try:
self.report_actions[report["id"]] = report["actions"]
except:
print("Cannot build action", report)
self.derived_stats = self._compute_derived_stats()
async def predict(self, new_report: Dict[str, Any]) -> Dict[str, Any]:
"""
Возвращает рекомендации для новой жалобы.
:param new_report: словарь с полями нового репорта (без id)
:return: словарь с ключами 'suggestions' и 'similar_reports'
"""
if not self.historical_reports or self.derived_stats is None:
raise RuntimeError("System not initialized. Call 'await initialize()' first.")
# Расчёт сходства со всеми историческими записями
similarities = []
for hist in self.historical_reports:
try:
sim = self._similarity(new_report, hist)
similarities.append((hist['id'], sim))
except AnyFieldIsNotFilled:
pass
similarities.sort(key=lambda x: x[1], reverse=True)
top_k = similarities[:self.k_neighbors]
# Сбор действий
action_counter = Counter()
similar_ids = []
for rid, sim in top_k:
if sim > 0:
similar_ids.append(rid)
if rid in self.report_actions:
for act in self.report_actions[rid]:
action_counter[act] += 1
total = sum(action_counter.values())
suggestions = []
if total > 0:
for action, count in action_counter.most_common():
suggestions.append({
'action': action,
'confidence': round(count / total, 4)
})
else:
suggestions.append({'action': 'none', 'confidence': 1.0})
return {
'suggestions': suggestions,
'similar_reports': similar_ids
}
async def reload_actions(self, new_actions: List[Dict[str, Any]]) -> None:
"""Обновить только действия, не пересчитывая нормализацию (если поступили новые решения)."""
for act in new_actions:
rid = act['report_id']
self.report_actions.setdefault(rid, []).append(act['action'])
#ai sloop
class KamazAI:
#depricated
class KamazAI_v1_1:
WEIGHTS = {
'steam_identity': 0.4,
@ -230,7 +651,7 @@ class Extension:
"secretkey":os.getenv("BACKEND_SECRETKEY")}) as session:
async with session.get(f"{os.getenv('BACKEND_URL')}/api/discord/report/s", ssl = False) as response:
self.reports_list = await response.json()
self.kamazai = KamazAI(self.reports_list)
self.kamazai = KamazAI_v1(self.reports_list)
print("KamazAI Enabled")
except:
traceback.print_exc()
@ -286,7 +707,15 @@ if __name__ == "__main__":
print("run")
from json import load
with open("/Users/gsd/Downloads/reports.json", "r", encoding="utf8") as report_list:
kamazAi = KamazAI(load(report_list))
kamazAi = KamazAI_v1_3(load(report_list))
perm_list = []
for r in kamazAi.historical_reports:
if r['a_permition'] not in perm_list:
perm_list.append(r['a_permition'])
if r['r_permition'] not in perm_list:
perm_list.append(r['r_permition'])
print(perm_list)
test_report1 = {"id":23059,"a_nickname":"серийный чувак","a_permition":"DONT_HAVE","a_kills":29,"a_deads":38,"a_seconds":3237,"r_nickname":"Корабль Бомж 1","r_permition":"DONT_HAVE","r_kills":44,"r_deads":31,"r_seconds":3598,"reasons":"Читы","utime":1780842262,"srv":"srv9","online":18,"type":"IN_GAME","actions":[],"serverName":"Норильск 2019","a_steam":{"steam3":"[U:1:1675616295]","steam2":"STEAM_0:1:837808147","steam64":"76561199635882023","community_url":"https://steamcommunity.com/profiles/76561199635882023","account_id":1675616295},"r_steam":{"steam3":"[U:1:1546493291]","steam2":"STEAM_0:1:773246645","steam64":"76561199506759019","community_url":"https://steamcommunity.com/profiles/76561199506759019","account_id":1546493291}}
test_report2 = {"id":23048,"a_nickname":"Евгений Задротов","a_permition":"FREE","a_kills":63,"a_deads":68,"a_seconds":5031,"r_nickname":"Kapusta_KvSH","r_permition":"VIP","r_kills":11,"r_deads":2,"r_seconds":436,"reasons":"VIP абуз","utime":1780837315,"srv":"srv5","online":21,"type":"IN_GAME","actions":[],"serverName":"Завод Ultimate","a_steam":{"steam3":"[U:1:1623272121]","steam2":"STEAM_0:1:811636060","steam64":"76561199583537849","community_url":"https://steamcommunity.com/profiles/76561199583537849","account_id":1623272121},"r_steam":{"steam3":"[U:1:196806785]","steam2":"STEAM_0:1:98403392","steam64":"76561198157072513","community_url":"https://steamcommunity.com/profiles/76561198157072513","account_id":196806785}}

Loading…
Cancel
Save