-
Notifications
You must be signed in to change notification settings - Fork 11
/
imapserver.py
72 lines (54 loc) · 2.01 KB
/
imapserver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
from twisted.mail import imap4, maildir
from twisted.internet import reactor, defer, protocol
from twisted.cred import portal, checkers, credentials
from twisted.cred import error as credError
from twisted.python import filepath
from zope.interface import implements
import time, os, random, pickle
from twittermail import TwitterUserAccount, TwitterImapMailbox, TwitterCredentialsChecker, ObjCache
import email
class MailUserRealm(object):
implements(portal.IRealm)
avatarInterfaces = {
imap4.IAccount: TwitterUserAccount,
}
def __init__(self, cache):
self.cache = cache
def requestAvatar(self, avatarId, mind, *interfaces):
for requestedInterface in interfaces:
if self.avatarInterfaces.has_key(requestedInterface):
# return an instance of the correct class
avatarClass = self.avatarInterfaces[requestedInterface]
avatar = avatarClass(self.cache)
# null logout function: take no arguments and do nothing
logout = lambda: None
return defer.succeed((requestedInterface, avatar, logout))
# none of the requested interfaces was supported
raise KeyError("None of the requested interfaces is supported")
class IMAPServerProtocol(imap4.IMAP4Server):
"Subclass of imap4.IMAP4Server that adds debugging."
debug = True
def lineReceived(self, line):
if self.debug:
print "CLIENT:", line
imap4.IMAP4Server.lineReceived(self, line)
def sendLine(self, line):
imap4.IMAP4Server.sendLine(self, line)
if self.debug:
print "SERVER:", line
class IMAPFactory(protocol.Factory):
protocol = IMAPServerProtocol
portal = None # placeholder
def buildProtocol(self, address):
p = self.protocol()
p.portal = self.portal
p.factory = self
return p
if __name__ == "__main__":
cache = ObjCache()
portal = portal.Portal(MailUserRealm(cache))
portal.registerChecker(TwitterCredentialsChecker(cache))
factory = IMAPFactory()
factory.portal = portal
reactor.listenTCP(1143, factory)
reactor.run()