| 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
| #!/usr/bin/env python
"""sb_culler.py -- remove spam from POP3 servers, leave ham.
I get about 150 spams a day and 12 viruses as background noise. I use
Apple's Mail.app on my laptop, which filters out most of them. But
when I travel my mailbox starts to accumulate crap, which is annoying
over dial-up. Even at home, during peak periods of a recent virus
shedding I got about 30 viruses an hour, and my 10MB mailbox filled up
while I slept!
I have a server machine at home, which can stay up full time. This
program, sb_culler, uses SpamBayes to run a POP3 email culler. It
connects to my email servers every few minutes, downloads the emails,
classifies each one, and deletes the spam and viruses. (It makes a
local copy of the spam, just in case.)
This program is designed for me, a programmer. The structure should
be helpful enough for other programmers, but even configuration must
be done by editing the code.
Written by Andrew Dalke, November 2003.
Released into the public domain on 2003/11/22.
== NO copyright protection asserted for this code. Share and enjoy! ==
This program requires Python 2.3 or newer.
"""
import sets, traceback
import poplib
import posixpath
from email import Header
from spambayes import mboxutils, hammie
DO_ACTIONS = 1
VERBOSE_LEVEL = 1
APPEND_TO_FILE = "append_to_file"
DELETE = "delete"
KEEP_IN_MAILBOX = "keep in mailbox"
SPAM = "spam"
VIRUS = "virus"
class Logger:
def __init__(self):
self.tests = {}
self.actions = {}
def __nonzero__(self):
return bool(self.tests) and bool(self.actions)
def pass_test(self, name):
self.tests[name] = self.tests.get(name, 0) + 1
def do_action(self, name):
self.actions[name] = self.actions.get(name, 0) + 1
def accept(self, text):
print text
def info(self, text):
print text
class MessageInfo:
"""reference to an email message in a mailbox"""
def __init__(self, mailbox, i, msg, text):
self.mailbox = mailbox
self.i = i
self.msg = msg
self.text = text
class Filter:
"""if message passes test then do the given action"""
def __init__(self, test, action):
self.test = test
self.action = action
def process(self, mi, log):
result = self.test(mi, log)
if result:
self.action(mi, log)
return self.action.descr + " because " + result
return False
class AppendFile:
"""Action: append message text to the given filename"""
def __init__(self, filename):
self.filename = filename
self.descr = "save to %r then delete" % self.filename
def __call__(self, mi, log):
log.do_action(APPEND_TO_FILE)
if not DO_ACTIONS:
return
f = open(self.filename, "a")
try:
f.write(mi.text)
finally:
f.close()
mi.mailbox.dele(mi.i)
def DELETE(mi, log):
"""Action: delete message from mailbox"""
log.do_action(DELETE)
if not DO_ACTIONS:
return
mi.mailbox.dele(mi.i)
DELETE.descr = "delete"
def KEEP(mi, log):
"""Action: keep message in mailbox"""
log.do_action(KEEP_IN_MAILBOX)
KEEP.descr = "keep in mailbox"
class WhiteListFrom:
"""Test: Read a list of email addresses to use a 'from' whitelist"""
def __init__(self, filename):
lines = [line.strip().lower() for line in
open(filename).readlines()]
self.addresses = sets.Set(lines)
def __call__(self, mi, log):
frm = mi.msg["from"]
status = (frm is not None) and (frm.lower() in self.addresses)
if status:
log.pass_test("'from' white list")
return "it is in 'from' white list"
return False
class WhiteListSubstrings:
"""Test: Whitelist message if named field contains one of the substrings"""
def __init__(self, field, substrings):
self.field = field
self.substrings = substrings
def __call__(self, mi, log):
data = mi.msg[self.field]
if data is None:
return False
for s in self.substrings:
if s in data:
log.pass_test("'%s' white list" % (self.field,))
return "it matches '%s' white list" % (self.field,)
return False
class IsSpam:
"""Test: use SpamBayes to tell if something is spam"""
def __init__(self, sb_hammie, spam_cutoff = None):
self.sb_hammie = sb_hammie
if spam_cutoff is None:
spam_cutoff = options["Categorization", "spam_cutoff"]
self.spam_cutoff = spam_cutoff
def __call__(self, mi, log):
prob = self.sb_hammie.score(mi.msg)
if prob > self.spam_cutoff:
log.pass_test(SPAM)
return "it is spam (%4.3f)" % prob
if VERBOSE_LEVEL > 1:
print "not spam (%4.3f)" % prob
return False
# Simple check for executable attachments
def IsVirus(mi, log):
"""Test: a virus is any message with an attached executable
I've also noticed the viruses come in as wav and midi attachements
so I trigger on those as well.
This is a very paranoid detector, since someone might send me a
binary for valid reasons. I white-list everyone who's sent me
email before so it doesn't affect me.
"""
for part in mi.msg.walk():
if part.get_main_type() == 'multipart':
continue
filename = part.get_filename()
if filename is None:
if part.get_type() in ["application/x-msdownload",
"audio/x-wav", "audio/x-midi"]:
# Only viruses send messages to me with these types
log.pass_test(VIRUS)
return ("it has a virus-like content-type (%s)" %
part.get_type())
else:
extensions = "bat com exe pif ref scr vbs wsh".split()
base, ext = posixpath.splitext(filename)
if ext[1:].lower() in extensions:
log.pass_test(VIRUS)
return "it has a virus-like attachment (%s)" % ext[1:]
return False
def open_mailbox(server, username, password, debuglevel = 0):
mailbox = poplib.POP3(server)
try:
mailbox.user(username)
mailbox.pass_(password)
mailbox.set_debuglevel(debuglevel)
if VERBOSE_LEVEL > 1:
count, size = mailbox.stat()
print "Message count: ", count
print "Total bytes : ", size
except:
mailbox.quit()
raise
return mailbox
def _log_subject(mi, log):
encoded_subject = mi.msg.get('subject')
subject, encoding = Header.decode_header(encoded_subject)[0]
if encoding is None or encoding == 'iso-8859-1':
s = subject
else:
s = encoded_subject
log.info("%s Subject: %r" % (mi.i, s))
class Filters(list):
def add(self, test, action):
"""short-cut to make a Filter given the test and action"""
self.append(Filter(test, action))
def process_mailbox(self, mailbox):
count, size = mailbox.stat()
log = Logger()
for i in range(1, count+1):
# Kevin's code used -1, but -1 doesn't work for one of
# my POP accounts, while a million does.
# Don't use retr because that may mark the message as
# read (so says Kevin's code)
message_tuple = mailbox.top(i, 1000000)
text = "\n".join(message_tuple[1])
msg = mboxutils.get_message(text)
mi = MessageInfo(mailbox, i, msg, text)
_log_subject(mi, log)
for filter in self:
result = filter.process(mi, log)
if result:
log.accept(result)
break
else:
# don't know what to do with this so just
# keep it on the server
log.pass_test("unknown")
log.do_action(KEEP_IN_MAILBOX)
log.accept("unknown")
return log
def filter_server( (server, user, pwd), filters):
if VERBOSE_LEVEL:
print "=" * 78
print "Processing %s on %s" % (user, server)
mailbox = open_mailbox(server, user, pwd)
try:
log = filters.process_mailbox(mailbox)
finally:
mailbox.quit()
return log
##### User-specific
import time, sys, urllib
# A simple text interface.
def _unix_stop():
pass
def _ms_stop():
# ^C doesn't seem to work correctly in the DOS box
# so assume any keypress is a break
if msvcrt.kbhit():
raise SystemExit()
try:
import msvcrt
_check_for_stop = _ms_stop
except ImportError:
_check_for_stop = _unix_stop
def restart_network():
# This is called after too many connection failures.
# That usually means my ISP dropped my DHCP and I need to
# bounce my Linksys firewall/DHCP/hub.
print "Network appears to be down. Bringing Linksys down then up..."
try:
# Note this this example uses the default password. YMMV.
urllib.urlopen("http://:admin@192.168.1.1/Gozila.cgi?pppoeAct=2")
urllib.urlopen("http://:admin@192.168.1.1/Gozila.cgi?pppoeAct=1")
except KeyboardInterrupt:
raise
except:
traceback.print_exc()
def wait(t, delta = 10):
"""Wait for 't' seconds"""
assert delta > 0, delta
assert t >= 1
first = True
for i in range(t, -1, -delta):
if VERBOSE_LEVEL:
if not first:
print "..",
print i,
sys.stdout.flush()
time.sleep(min(i, delta))
_check_for_stop()
first = False
print
def main():
filters = Filters()
# A list of everyone who has emailed me this year.
# Keep their messages on the server.
filters.add(WhiteListFrom("good_emails.txt"), KEEP)
# My mailing lists. Edited to make it slightly harder
# for spammers to read this description and figure
# out how to spam me.
filters.add(WhiteListSubstrings("subject",
['[Twisted]', 'CompChem:', '[Bioperl]',
'[BioPy]', '[SALSA CLUB]', '[Open-bio]',
'[StarshipCrew]']), KEEP)
# Get rid of anything which smells like an exectuable.
filters.add(IsVirus, DELETE)
# Use SpamBayes to identify spam. Make a local copy then
# delete from the server.
h = hammie.open("cull.spambayes", "dbm", "r")
filters.add(IsSpam(h, 0.90), AppendFile("spam.mbox"))
# These are my POP3 accounts. (or not ;)
server_configs = [("mail.example.com",
"dalke", "password"),
("mail2.spam.com", "dalke", "1234"), ]
# The main culling loop.
error_count = 0
cumulative_log = {SPAM: 0, VIRUS: 0}
initial_log = None
start_time = None # init'ed only after initial_log is created
while 1:
error_flag = False
for server, user, pwd in server_configs:
try:
log = filter_server( (server, user, pwd), filters)
except StandardError:
raise
except:
error_flag = True
traceback.print_exc()
continue
if VERBOSE_LEVEL > 1 and log:
print " ** Summary **"
for x in (log.tests, log.actions):
items = x.items()
if items:
items.sort()
for k, v in items:
print " %s: %s" % (k, v)
print
cumulative_log[SPAM] += log.tests.get(SPAM, 0)
cumulative_log[VIRUS] += log.tests.get(VIRUS, 0)
if initial_log is None:
initial_log = cumulative_log.copy()
start_time = time.time()
if VERBOSE_LEVEL:
print "Stats: %d spams, %d virus" % (
initial_log[SPAM], initial_log[VIRUS])
else:
if VERBOSE_LEVEL:
delta_t = time.time() - start_time
delta_t = max(delta_t, 1) #
print "Stats: %d spams (%.2f/hr), %d virus (%.2f/hr)" % (
cumulative_log[SPAM],
(cumulative_log[SPAM] - initial_log[SPAM]) /
delta_t * 3600,
cumulative_log[VIRUS],
(cumulative_log[VIRUS] - initial_log[VIRUS]) /
delta_t * 3600)
if error_flag:
error_count += 1
if error_count > 20:
restart_network()
error_count = 0
wait(3*60)
if __name__ == "__main__":
main() |