-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy paththreat_detector.py
More file actions
122 lines (112 loc) · 4.73 KB
/
Copy paththreat_detector.py
File metadata and controls
122 lines (112 loc) · 4.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# threat_detector.py — Main detection pipeline
# Location: ~/threat-demo/threat_detector.py
# Monitors log file, enriches with AbuseIPDB, fires Slack alerts
import re, time, requests, logging
from datetime import datetime
from collections import defaultdict
from config import ABUSEIPDB_API_KEY, SLACK_WEBHOOK_URL, LOG_FILE
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[logging.StreamHandler(),
logging.FileHandler('threat_detector.log')]
)
log = logging.getLogger(__name__)
ALERT_THRESHOLD = 3
ABUSEIPDB_MIN_SCORE = 25
failed_attempts = defaultdict(int)
alerted_ips = set()
SSH_FAIL_PATTERN = re.compile(
r'Failed password for .+ from (\d+\.\d+\.\d+\.\d+) port'
)
def check_abuseipdb(ip):
try:
r = requests.get(
'https://api.abuseipdb.com/api/v2/check',
headers={'Accept':'application/json','Key':ABUSEIPDB_API_KEY},
params={'ipAddress':ip,'maxAgeInDays':90},
timeout=10
)
if r.status_code == 200:
d = r.json()['data']
return {
'abuse_score': d.get('abuseConfidenceScore', 0),
'total_reports': d.get('totalReports', 0),
'country': d.get('countryCode', 'Unknown'),
'isp': d.get('isp', 'Unknown'),
'last_reported': d.get('lastReportedAt', 'Never')
}
except Exception as e:
log.warning(f'AbuseIPDB query failed for {ip}: {e}')
return None
def calculate_threat_level(attempts, abuse_score):
if abuse_score >= 75 or attempts >= 10:
return 'CRITICAL', ':red_circle:'
elif abuse_score >= 50 or attempts >= 7:
return 'HIGH', ':orange_circle:'
elif abuse_score >= 25 or attempts >= 3:
return 'MEDIUM', ':yellow_circle:'
return 'LOW', ':white_circle:'
def send_slack_alert(ip, attempts, intel):
abuse_score = intel.get('abuse_score', 0) if intel else 0
level, emoji = calculate_threat_level(attempts, abuse_score)
ts = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')
message = {
'text': f'{emoji} *THREAT DETECTED — {level}*',
'blocks': [
{'type':'header',
'text':{'type':'plain_text',
'text':f'{emoji} THREAT ALERT — {level}'}},
{'type':'section','fields':[
{'type':'mrkdwn','text':f'*Attacker IP:*\n`{ip}`'},
{'type':'mrkdwn','text':f'*Threat Level:*\n{level}'},
{'type':'mrkdwn','text':f'*Failed Attempts:*\n{attempts}'},
{'type':'mrkdwn','text':f'*Abuse Score:*\n{abuse_score}%'},
{'type':'mrkdwn','text':f'*Country:*\n{intel.get("country","N/A")}' if intel else '*Country:*\nN/A'},
{'type':'mrkdwn','text':f'*ISP:*\n{intel.get("isp","N/A")}' if intel else '*ISP:*\nN/A'},
{'type':'mrkdwn','text':f'*Total Reports:*\n{intel.get("total_reports",0)}' if intel else '*Reports:*\n0'},
{'type':'mrkdwn','text':f'*Detected At:*\n{ts}'}
]},
{'type':'section','text':{'type':'mrkdwn',
'text':'*Action:* Block IP at firewall. Investigate auth logs.'}}
]
}
try:
resp = requests.post(SLACK_WEBHOOK_URL, json=message, timeout=10)
if resp.status_code == 200:
log.info(f'Slack alert sent for {ip} — Level: {level}')
else:
log.warning(f'Slack failed: {resp.status_code}')
except Exception as e:
log.error(f'Slack send error: {e}')
def process_log_line(line):
match = SSH_FAIL_PATTERN.search(line)
if not match:
return
ip = match.group(1)
failed_attempts[ip] += 1
count = failed_attempts[ip]
log.info(f'Failed login from {ip} — attempt #{count}')
if count >= ALERT_THRESHOLD and ip not in alerted_ips:
log.warning(f'THRESHOLD REACHED for {ip} — querying AbuseIPDB...')
intel = check_abuseipdb(ip)
if intel:
log.info(f'AbuseIPDB score for {ip}: {intel["abuse_score"]}%')
if (intel and intel['abuse_score'] >= ABUSEIPDB_MIN_SCORE) or count >= ALERT_THRESHOLD:
send_slack_alert(ip, count, intel)
alerted_ips.add(ip)
def tail_log(filepath):
log.info(f'Threat detector started — watching: {filepath}')
log.info(f'Alert threshold: {ALERT_THRESHOLD} failed attempts')
log.info('Waiting for suspicious activity...')
print()
with open(filepath, 'r') as f:
f.seek(0, 2)
while True:
line = f.readline()
if line:
process_log_line(line)
else:
time.sleep(0.1)
if __name__ == '__main__':
tail_log(LOG_FILE)