From 2ae940a518ffd705f6c559410b616ca8439b7ffa Mon Sep 17 00:00:00 2001 From: gsd Date: Thu, 11 Jun 2026 13:30:43 +0300 Subject: [PATCH] kamaz ai 1.2 --- other_ext/kamaz_ai.py | 437 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 433 insertions(+), 4 deletions(-) diff --git a/other_ext/kamaz_ai.py b/other_ext/kamaz_ai.py index c080cb1..d22f1a3 100644 --- a/other_ext/kamaz_ai.py +++ b/other_ext/kamaz_ai.py @@ -6,10 +6,431 @@ import aiohttp import os import numpy as np +import math from collections import Counter +import re +from typing import List, Dict, Any, Optional, Union + +class AnyFieldIsNotFilled(Exception): + pass + +#ai slooop for ai slooop + +EPSILON = 1e-6 + +PERMISSION_LEVEL_MAP = { + 'DONT_HAVE': 0, 'NONE': 0, None: 0, + 'FREE': 1, + 'VIP': 2, + 'MODERATOR': 3, + 'ADMIN': 4, 'ROOT': 4 +} +MAX_PERMISSION_LEVEL = 4 + +class KamazAI_v1_3: + def __init__(self, report_list): + self.k_neighbors = 5 + self.weights = { + 'steam_identity': 0.35, + 'reason': 0.15, + 'permissions': 0.10, + 'derived_stats': 0.30, + 'server': 0.10 + } + self.historical_reports: List[Dict[str, Any]] = [] + self.report_actions: Dict[int, List[str]] = {} + self.derived_stats: Optional[Dict[str, Dict[str, float]]] = None + self.derived_fields = [ + 'a_kd', 'a_kpm', 'a_dpm', + 'r_kd', 'r_kpm', 'r_dpm', + 'kd_diff', + 'a_session_min', 'r_session_min' + ] + + self.initialize(report_list) + + # ------------------------------------------------------------------ + # Семантическое сравнение причин + # ------------------------------------------------------------------ + @staticmethod + def _reason_similarity(reason1: str, reason2: str) -> float: + """Jaccard-сходство по множествам слов (регистронезависимо).""" + # Извлекаем буквенно-цифровые токены + tokens1 = set(re.findall(r'\w+', reason1.lower())) + tokens2 = set(re.findall(r'\w+', reason2.lower())) + if not tokens1 or not tokens2: + return 1.0 if reason1 == reason2 else 0.0 + intersection = tokens1 & tokens2 + union = tokens1 | tokens2 + return len(intersection) / len(union) + + # ------------------------------------------------------------------ + # Вспомогательные методы + # ------------------------------------------------------------------ + @staticmethod + def _parse_permission_level(value: Union[str, int, None]) -> int: + if isinstance(value, str): + return PERMISSION_LEVEL_MAP.get(value.upper(), 0) + if value is None: + return 0 + try: + return int(value) + except (ValueError, TypeError): + return 0 + + @staticmethod + def _compute_derived_features(report: Dict[str, Any]) -> Dict[str, float]: + a_min = max((report['a_seconds'] if report['a_seconds'] else 0) / 60.0, EPSILON) + r_min = max((report['r_seconds'] if report['r_seconds'] else 0) / 60.0, EPSILON) + a_kd = (report['a_kills'] if report['a_kills'] else 0) / max((report['a_deads'] if report['a_deads'] else 0), EPSILON) + r_kd = (report['r_kills'] if report['r_kills'] else 0) / max((report['r_deads'] if report['r_deads'] else 0), EPSILON) + kd_diff = r_kd - a_kd + return { + 'a_kd': a_kd, + 'a_kpm': (report['a_kills'] if report['a_kills'] else 0) / a_min, + 'a_dpm': (report['a_deads'] if report['a_deads'] else 0) / a_min, + 'r_kd': r_kd, + 'r_kpm': (report['r_kills'] if report['r_kills'] else 0) / r_min, + 'r_dpm': (report['r_deads'] if report['r_deads'] else 0) / r_min, + 'kd_diff': kd_diff, + 'a_session_min': a_min, + 'r_session_min': r_min + } + + def _compute_derived_stats(self) -> Dict[str, Dict[str, float]]: + stats = {f: {'min': float('inf'), 'max': float('-inf')} for f in self.derived_fields} + for r in self.historical_reports: + feats = self._compute_derived_features(r) + for f in self.derived_fields: + val = feats[f] + if val < stats[f]['min']: + stats[f]['min'] = val + if val > stats[f]['max']: + stats[f]['max'] = val + return stats + + def _normalize_derived(self, feats: Dict[str, float]) -> Dict[str, float]: + norm = {} + for f in self.derived_fields: + min_val = self.derived_stats[f]['min'] + max_val = self.derived_stats[f]['max'] + if max_val - min_val < EPSILON: + norm[f] = 0.0 + else: + norm[f] = (feats[f] - min_val) / (max_val - min_val) + return norm + + def _similarity(self, r1: Dict[str, Any], r2: Dict[str, Any]) -> float: + score = 0.0 + + # 1. Совпадение связки игроков + #if r1['a_steam2'] == r2['a_steam2'] and r1['r_steam2'] == r2['r_steam2']: + # identity_score = 1.0 + #elif r1['a_steam2'] == r2['a_steam2'] or r1['r_steam2'] == r2['r_steam2']: + # identity_score = 0.5 + #else: + # identity_score = 0.0 + #score += self.weights['steam_identity'] * identity_score + + # 2. Причина – теперь семантически, через Jaccard + reason_score = self._reason_similarity(r1['reasons'], r2['reasons']) + score += self.weights['reason'] * reason_score + + # 3. Права + a_lvl1 = self._parse_permission_level(r1['a_permition']) + a_lvl2 = self._parse_permission_level(r2['a_permition']) + r_lvl1 = self._parse_permission_level(r1['r_permition']) + r_lvl2 = self._parse_permission_level(r2['r_permition']) + dist_a = abs(a_lvl1 - a_lvl2) / MAX_PERMISSION_LEVEL + dist_r = abs(r_lvl1 - r_lvl2) / MAX_PERMISSION_LEVEL + perm_sim = 1.0 - (dist_a + dist_r) / 2.0 + score += self.weights['permissions'] * perm_sim + + # 4. Производные признаки + f1 = self._normalize_derived(self._compute_derived_features(r1)) + f2 = self._normalize_derived(self._compute_derived_features(r2)) + dist_sq = sum((f1[f] - f2[f]) ** 2 for f in self.derived_fields) + eucl_dist = math.sqrt(dist_sq / len(self.derived_fields)) + derived_sim = 1.0 - min(eucl_dist, 1.0) + score += self.weights['derived_stats'] * derived_sim + + # 5. Сервер + #srv_score = 1.0 if r1['srv'] == r2['srv'] else 0.0 + #score += self.weights['server'] * srv_score + + return score + + # ------------------------------------------------------------------ + # Публичные асинхронные методы + # ------------------------------------------------------------------ + def initialize(self, reports: List[Dict[str, Any]]) -> None: + self.historical_reports = [] + self.report_actions.clear() + + for report in reports: + #if report.get("type", "IN_GAME") != "IN_GAME": + # continue + self.historical_reports.append(report) + + try: + self.report_actions[report["id"]] = report["actions"] + except: + print("Cannot build action", report) + + self.derived_stats = self._compute_derived_stats() + + async def predict(self, new_report: Dict[str, Any]) -> Dict[str, Any]: + if not self.historical_reports or self.derived_stats is None: + raise RuntimeError("System not initialized. Call 'await initialize()' first.") + + similarities = [] + for hist in self.historical_reports: + sim = self._similarity(new_report, hist) + similarities.append((hist['id'], sim)) + + similarities.sort(key=lambda x: x[1], reverse=True) + top_k = similarities[:self.k_neighbors] + + action_counter = Counter() + similar_ids = [] + for rid, sim in top_k: + if sim > 0: + similar_ids.append(rid) + if rid in self.report_actions: + for act in self.report_actions[rid]: + action_counter[act] += 1 + + total = sum(action_counter.values()) + suggestions = [] + if total > 0: + for action, count in action_counter.most_common(): + suggestions.append({ + 'action': action, + 'confidence': round(count / total, 4) + }) + else: + suggestions.append({'action': 'none', 'confidence': 1.0}) + + return { + 'suggestions': suggestions, + 'similar_reports': similar_ids + } + + async def reload_actions(self, new_actions: List[Dict[str, Any]]) -> None: + for act in new_actions: + rid = act['report_id'] + self.report_actions.setdefault(rid, []).append(act['action']) + +#depricated +class KamazAI_v1_2: + """ + Асинхронная система рекомендации действий модератора на основе исторических жалоб. + Использование: + system = ReportDecisionSystem() + await system.initialize(reports, actions) + result = await system.predict(new_report) + """ + + def __init__(self, report_list): + self.k_neighbors = 5 + self.weights = { + 'steam_identity': 0.35, + 'reason': 0.15, + 'permissions': 0.10, + 'derived_stats': 0.30, + 'server': 0.10 + } + self.historical_reports: List[Dict[str, Any]] = [] + self.report_actions: Dict[int, List[str]] = {} + self.derived_stats: Optional[Dict[str, Dict[str, float]]] = None + self.derived_fields = [ + 'a_kd', 'a_kpm', 'a_dpm', + 'r_kd', 'r_kpm', 'r_dpm', + 'kd_diff', + 'a_session_min', 'r_session_min' + ] + self.initialize(report_list) + + # ---------- Вспомогательные методы ---------- + @staticmethod + def _compute_derived_features(report: Dict[str, Any]) -> Dict[str, float]: + """Вычисляет производные признаки для одной записи.""" + try: + a_min = max(report['a_seconds'] / 60.0, EPSILON) + except: + a_min = 1 + + try: + r_min = max(report['r_seconds'] / 60.0, EPSILON) + except: + r_min = 1 + + try: + a_kd = report['a_kills'] / max(report['a_deads'], EPSILON) + except: + a_kd = 1 + + try: + r_kd = report['r_kills'] / max(report['r_deads'], EPSILON) + except: + r_kd = 1 + + kd_diff = r_kd - a_kd + try: + return { + 'a_kd': a_kd, + 'a_kpm': report['a_kills'] / a_min, + 'a_dpm': report['a_deads'] / a_min, + 'r_kd': r_kd, + 'r_kpm': report['r_kills'] / r_min, + 'r_dpm': report['r_deads'] / r_min, + 'kd_diff': kd_diff, + 'a_session_min': a_min, + 'r_session_min': r_min + } + except: + raise AnyFieldIsNotFilled + + def _compute_derived_stats(self) -> Dict[str, Dict[str, float]]: + """Вычисляет min/max для всех производных признаков по историческим данным.""" + stats = {f: {'min': float('inf'), 'max': float('-inf')} for f in self.derived_fields} + for r in self.historical_reports: + try: + feats = self._compute_derived_features(r) + for f in self.derived_fields: + val = feats[f] + if val < stats[f]['min']: + stats[f]['min'] = val + if val > stats[f]['max']: + stats[f]['max'] = val + except AnyFieldIsNotFilled: + continue + return stats + + def _normalize_derived(self, feats: Dict[str, float]) -> Dict[str, float]: + """Нормализует производные признаки в [0,1].""" + norm = {} + for f in self.derived_fields: + min_val = self.derived_stats[f]['min'] + max_val = self.derived_stats[f]['max'] + if max_val - min_val < EPSILON: + norm[f] = 0.0 + else: + norm[f] = (feats[f] - min_val) / (max_val - min_val) + return norm + + def _similarity(self, r1: Dict[str, Any], r2: Dict[str, Any]) -> float: + """Вычисляет взвешенное сходство (0..1) между двумя отчётами.""" + score = 0.0 + + # 1. Совпадение пары Steam + #if r1['a_steam2'] == r2['a_steam2'] and r1['r_steam2'] == r2['r_steam2']: + # identity_score = 1.0 + #elif r1['a_steam2'] == r2['a_steam2'] or r1['r_steam2'] == r2['r_steam2']: + # identity_score = 0.5 + #else: + # identity_score = 0.0 + #score += self.weights['steam_identity'] * identity_score + + # 2. Причина + reason_score = 1.0 if r1['reasons'] == r2['reasons'] else 0.0 + score += self.weights['reason'] * reason_score + + # 3. Права + perm_score = 0.0 + if r1['a_permition'] == r2['a_permition']: + perm_score += 0.5 + if r1['r_permition'] == r2['r_permition']: + perm_score += 0.5 + score += self.weights['permissions'] * perm_score + + # 4. Производные признаки + f1 = self._normalize_derived(self._compute_derived_features(r1)) + f2 = self._normalize_derived(self._compute_derived_features(r2)) + dist_sq = sum((f1[f] - f2[f]) ** 2 for f in self.derived_fields) + eucl_dist = math.sqrt(dist_sq / len(self.derived_fields)) + derived_sim = 1.0 - min(eucl_dist, 1.0) + score += self.weights['derived_stats'] * derived_sim + + # 5. Сервер + #srv_score = 1.0 if r1['srv'] == r2['srv'] else 0.0 + #score += self.weights['server'] * srv_score + + return score + + # ---------- Публичные асинхронные методы ---------- + def initialize(self, reports: List[Dict[str, Any]]) -> None: + """ + Загружает исторические данные и вычисляет нормализацию. + :param reports: список записей из user_reports + :param actions: список записей из user_reports_action + """ + self.historical_reports = reports + self.report_actions.clear() + + for report in self.historical_reports: + try: + self.report_actions[report["id"]] = report["actions"] + except: + print("Cannot build action", report) + + self.derived_stats = self._compute_derived_stats() + + async def predict(self, new_report: Dict[str, Any]) -> Dict[str, Any]: + """ + Возвращает рекомендации для новой жалобы. + :param new_report: словарь с полями нового репорта (без id) + :return: словарь с ключами 'suggestions' и 'similar_reports' + """ + if not self.historical_reports or self.derived_stats is None: + raise RuntimeError("System not initialized. Call 'await initialize()' first.") + + # Расчёт сходства со всеми историческими записями + similarities = [] + for hist in self.historical_reports: + try: + sim = self._similarity(new_report, hist) + similarities.append((hist['id'], sim)) + except AnyFieldIsNotFilled: + pass + + similarities.sort(key=lambda x: x[1], reverse=True) + top_k = similarities[:self.k_neighbors] + + # Сбор действий + action_counter = Counter() + similar_ids = [] + for rid, sim in top_k: + if sim > 0: + similar_ids.append(rid) + if rid in self.report_actions: + for act in self.report_actions[rid]: + action_counter[act] += 1 + + total = sum(action_counter.values()) + suggestions = [] + if total > 0: + for action, count in action_counter.most_common(): + suggestions.append({ + 'action': action, + 'confidence': round(count / total, 4) + }) + else: + suggestions.append({'action': 'none', 'confidence': 1.0}) + + return { + 'suggestions': suggestions, + 'similar_reports': similar_ids + } + + async def reload_actions(self, new_actions: List[Dict[str, Any]]) -> None: + """Обновить только действия, не пересчитывая нормализацию (если поступили новые решения).""" + for act in new_actions: + rid = act['report_id'] + self.report_actions.setdefault(rid, []).append(act['action']) -#ai sloop -class KamazAI: +#depricated +class KamazAI_v1_1: WEIGHTS = { 'steam_identity': 0.4, @@ -230,7 +651,7 @@ class Extension: "secretkey":os.getenv("BACKEND_SECRETKEY")}) as session: async with session.get(f"{os.getenv('BACKEND_URL')}/api/discord/report/s", ssl = False) as response: self.reports_list = await response.json() - self.kamazai = KamazAI(self.reports_list) + self.kamazai = KamazAI_v1(self.reports_list) print("KamazAI Enabled") except: traceback.print_exc() @@ -286,7 +707,15 @@ if __name__ == "__main__": print("run") from json import load with open("/Users/gsd/Downloads/reports.json", "r", encoding="utf8") as report_list: - kamazAi = KamazAI(load(report_list)) + kamazAi = KamazAI_v1_3(load(report_list)) + + perm_list = [] + for r in kamazAi.historical_reports: + if r['a_permition'] not in perm_list: + perm_list.append(r['a_permition']) + if r['r_permition'] not in perm_list: + perm_list.append(r['r_permition']) + print(perm_list) test_report1 = {"id":23059,"a_nickname":"серийный чувак","a_permition":"DONT_HAVE","a_kills":29,"a_deads":38,"a_seconds":3237,"r_nickname":"Корабль Бомж 1","r_permition":"DONT_HAVE","r_kills":44,"r_deads":31,"r_seconds":3598,"reasons":"Читы","utime":1780842262,"srv":"srv9","online":18,"type":"IN_GAME","actions":[],"serverName":"Норильск 2019","a_steam":{"steam3":"[U:1:1675616295]","steam2":"STEAM_0:1:837808147","steam64":"76561199635882023","community_url":"https://steamcommunity.com/profiles/76561199635882023","account_id":1675616295},"r_steam":{"steam3":"[U:1:1546493291]","steam2":"STEAM_0:1:773246645","steam64":"76561199506759019","community_url":"https://steamcommunity.com/profiles/76561199506759019","account_id":1546493291}} test_report2 = {"id":23048,"a_nickname":"Евгений Задротов","a_permition":"FREE","a_kills":63,"a_deads":68,"a_seconds":5031,"r_nickname":"Kapusta_KvSH","r_permition":"VIP","r_kills":11,"r_deads":2,"r_seconds":436,"reasons":"VIP абуз","utime":1780837315,"srv":"srv5","online":21,"type":"IN_GAME","actions":[],"serverName":"Завод Ultimate","a_steam":{"steam3":"[U:1:1623272121]","steam2":"STEAM_0:1:811636060","steam64":"76561199583537849","community_url":"https://steamcommunity.com/profiles/76561199583537849","account_id":1623272121},"r_steam":{"steam3":"[U:1:196806785]","steam2":"STEAM_0:1:98403392","steam64":"76561198157072513","community_url":"https://steamcommunity.com/profiles/76561198157072513","account_id":196806785}}