Browse Source

more

master
gsd 3 weeks ago
parent
commit
2ecae9429c
  1. 228
      other_ext/kamaz_ai.py

228
other_ext/kamaz_ai.py

@ -122,6 +122,7 @@ class KamazAI_v1_3:
def _similarity(self, r1: Dict[str, Any], r2: Dict[str, Any]) -> float:
score = 0.0
score_list = []
# 1. Совпадение связки игроков
#if r1['a_steam2'] == r2['a_steam2'] and r1['r_steam2'] == r2['r_steam2']:
@ -131,10 +132,12 @@ class KamazAI_v1_3:
#else:
# identity_score = 0.0
#score += self.weights['steam_identity'] * identity_score
score_list.append(0.0)
# 2. Причина – теперь семантически, через Jaccard
reason_score = self._reason_similarity(r1['reasons'], r2['reasons'])
score += self.weights['reason'] * reason_score
score_list.append(self.weights['reason'] * reason_score)
# 3. Права
a_lvl1 = self._parse_permission_level(r1['a_permition'])
@ -145,6 +148,7 @@ class KamazAI_v1_3:
dist_r = abs(r_lvl1 - r_lvl2) / MAX_PERMISSION_LEVEL
perm_sim = 1.0 - (dist_a + dist_r) / 2.0
score += self.weights['permissions'] * perm_sim
score_list.append(self.weights['permissions'] * perm_sim)
# 4. Производные признаки
f1 = self._normalize_derived(self._compute_derived_features(r1))
@ -153,12 +157,14 @@ class KamazAI_v1_3:
eucl_dist = math.sqrt(dist_sq / len(self.derived_fields))
derived_sim = 1.0 - min(eucl_dist, 1.0)
score += self.weights['derived_stats'] * derived_sim
score_list.append(self.weights['derived_stats'] * derived_sim)
# 5. Сервер
#srv_score = 1.0 if r1['srv'] == r2['srv'] else 0.0
#score += self.weights['server'] * srv_score
score_list.append(0.0)
return score
return score, score_list
# ------------------------------------------------------------------
# Публичные асинхронные методы
@ -185,17 +191,19 @@ class KamazAI_v1_3:
similarities = []
for hist in self.historical_reports:
sim = self._similarity(new_report, hist)
similarities.append((hist['id'], sim))
sim, score_list = self._similarity(new_report, hist)
similarities.append((hist['id'], sim, score_list))
similarities.sort(key=lambda x: x[1], reverse=True)
top_k = similarities[:self.k_neighbors]
action_counter = Counter()
action_weight = []
similar_ids = []
for rid, sim in top_k:
for rid, sim, sl in top_k:
if sim > 0:
similar_ids.append(rid)
action_weight.append(sl)
if rid in self.report_actions:
for act in self.report_actions[rid]:
action_counter[act] += 1
@ -211,9 +219,11 @@ class KamazAI_v1_3:
else:
suggestions.append({'action': 'none', 'confidence': 1.0})
print(suggestions, similar_ids, action_weight)
return {
'suggestions': suggestions,
'similar_reports': similar_ids
'similar_reports': similar_ids,
'similar_weight': action_weight
}
async def reload_actions(self, new_actions: List[Dict[str, Any]]) -> None:
@ -221,214 +231,6 @@ class KamazAI_v1_3:
rid = act['report_id']
self.report_actions.setdefault(rid, []).append(act['action'])
#depricated
class KamazAI_v1_2:
"""
Асинхронная система рекомендации действий модератора на основе исторических жалоб.
Использование:
system = ReportDecisionSystem()
await system.initialize(reports, actions)
result = await system.predict(new_report)
"""
def __init__(self, report_list):
self.k_neighbors = 5
self.weights = {
'steam_identity': 0.35,
'reason': 0.15,
'permissions': 0.10,
'derived_stats': 0.30,
'server': 0.10
}
self.historical_reports: List[Dict[str, Any]] = []
self.report_actions: Dict[int, List[str]] = {}
self.derived_stats: Optional[Dict[str, Dict[str, float]]] = None
self.derived_fields = [
'a_kd', 'a_kpm', 'a_dpm',
'r_kd', 'r_kpm', 'r_dpm',
'kd_diff',
'a_session_min', 'r_session_min'
]
self.initialize(report_list)
# ---------- Вспомогательные методы ----------
@staticmethod
def _compute_derived_features(report: Dict[str, Any]) -> Dict[str, float]:
"""Вычисляет производные признаки для одной записи."""
try:
a_min = max(report['a_seconds'] / 60.0, EPSILON)
except:
a_min = 1
try:
r_min = max(report['r_seconds'] / 60.0, EPSILON)
except:
r_min = 1
try:
a_kd = report['a_kills'] / max(report['a_deads'], EPSILON)
except:
a_kd = 1
try:
r_kd = report['r_kills'] / max(report['r_deads'], EPSILON)
except:
r_kd = 1
kd_diff = r_kd - a_kd
try:
return {
'a_kd': a_kd,
'a_kpm': report['a_kills'] / a_min,
'a_dpm': report['a_deads'] / a_min,
'r_kd': r_kd,
'r_kpm': report['r_kills'] / r_min,
'r_dpm': report['r_deads'] / r_min,
'kd_diff': kd_diff,
'a_session_min': a_min,
'r_session_min': r_min
}
except:
raise AnyFieldIsNotFilled
def _compute_derived_stats(self) -> Dict[str, Dict[str, float]]:
"""Вычисляет min/max для всех производных признаков по историческим данным."""
stats = {f: {'min': float('inf'), 'max': float('-inf')} for f in self.derived_fields}
for r in self.historical_reports:
try:
feats = self._compute_derived_features(r)
for f in self.derived_fields:
val = feats[f]
if val < stats[f]['min']:
stats[f]['min'] = val
if val > stats[f]['max']:
stats[f]['max'] = val
except AnyFieldIsNotFilled:
continue
return stats
def _normalize_derived(self, feats: Dict[str, float]) -> Dict[str, float]:
"""Нормализует производные признаки в [0,1]."""
norm = {}
for f in self.derived_fields:
min_val = self.derived_stats[f]['min']
max_val = self.derived_stats[f]['max']
if max_val - min_val < EPSILON:
norm[f] = 0.0
else:
norm[f] = (feats[f] - min_val) / (max_val - min_val)
return norm
def _similarity(self, r1: Dict[str, Any], r2: Dict[str, Any]) -> float:
"""Вычисляет взвешенное сходство (0..1) между двумя отчётами."""
score = 0.0
# 1. Совпадение пары Steam
#if r1['a_steam2'] == r2['a_steam2'] and r1['r_steam2'] == r2['r_steam2']:
# identity_score = 1.0
#elif r1['a_steam2'] == r2['a_steam2'] or r1['r_steam2'] == r2['r_steam2']:
# identity_score = 0.5
#else:
# identity_score = 0.0
#score += self.weights['steam_identity'] * identity_score
# 2. Причина
reason_score = 1.0 if r1['reasons'] == r2['reasons'] else 0.0
score += self.weights['reason'] * reason_score
# 3. Права
perm_score = 0.0
if r1['a_permition'] == r2['a_permition']:
perm_score += 0.5
if r1['r_permition'] == r2['r_permition']:
perm_score += 0.5
score += self.weights['permissions'] * perm_score
# 4. Производные признаки
f1 = self._normalize_derived(self._compute_derived_features(r1))
f2 = self._normalize_derived(self._compute_derived_features(r2))
dist_sq = sum((f1[f] - f2[f]) ** 2 for f in self.derived_fields)
eucl_dist = math.sqrt(dist_sq / len(self.derived_fields))
derived_sim = 1.0 - min(eucl_dist, 1.0)
score += self.weights['derived_stats'] * derived_sim
# 5. Сервер
#srv_score = 1.0 if r1['srv'] == r2['srv'] else 0.0
#score += self.weights['server'] * srv_score
return score
# ---------- Публичные асинхронные методы ----------
def initialize(self, reports: List[Dict[str, Any]]) -> None:
"""
Загружает исторические данные и вычисляет нормализацию.
:param reports: список записей из user_reports
:param actions: список записей из user_reports_action
"""
self.historical_reports = reports
self.report_actions.clear()
for report in self.historical_reports:
try:
self.report_actions[report["id"]] = report["actions"]
except:
print("Cannot build action", report)
self.derived_stats = self._compute_derived_stats()
async def predict(self, new_report: Dict[str, Any]) -> Dict[str, Any]:
"""
Возвращает рекомендации для новой жалобы.
:param new_report: словарь с полями нового репорта (без id)
:return: словарь с ключами 'suggestions' и 'similar_reports'
"""
if not self.historical_reports or self.derived_stats is None:
raise RuntimeError("System not initialized. Call 'await initialize()' first.")
# Расчёт сходства со всеми историческими записями
similarities = []
for hist in self.historical_reports:
try:
sim = self._similarity(new_report, hist)
similarities.append((hist['id'], sim))
except AnyFieldIsNotFilled:
pass
similarities.sort(key=lambda x: x[1], reverse=True)
top_k = similarities[:self.k_neighbors]
# Сбор действий
action_counter = Counter()
similar_ids = []
for rid, sim in top_k:
if sim > 0:
similar_ids.append(rid)
if rid in self.report_actions:
for act in self.report_actions[rid]:
action_counter[act] += 1
total = sum(action_counter.values())
suggestions = []
if total > 0:
for action, count in action_counter.most_common():
suggestions.append({
'action': action,
'confidence': round(count / total, 4)
})
else:
suggestions.append({'action': 'none', 'confidence': 1.0})
return {
'suggestions': suggestions,
'similar_reports': similar_ids
}
async def reload_actions(self, new_actions: List[Dict[str, Any]]) -> None:
"""Обновить только действия, не пересчитывая нормализацию (если поступили новые решения)."""
for act in new_actions:
rid = act['report_id']
self.report_actions.setdefault(rid, []).append(act['action'])
#depricated
class KamazAI_v1_1:
WEIGHTS = {

Loading…
Cancel
Save