-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathalpn_responder.py
executable file
·61 lines (49 loc) · 2.34 KB
/
alpn_responder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#!/usr/bin/env python3
import ssl
import socketserver
import re
import os
ALPNDIR=os.environ.get("ALPNDIR","/etc/dehydrated/alpn-certs")
PROXY_PROTOCOL=os.environ.get("PROXY_PROTOCOL",False)
FALLBACK_CERTIFICATE=os.environ.get("FALLBACK_CERTIFICATE","/etc/ssl/certs/ssl-cert-snakeoil.pem")
FALLBACK_KEY=os.environ.get("FALLBACK_KEY","/etc/ssl/private/ssl-cert-snakeoil.key")
class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
pass
class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
def create_context(self, certfile, keyfile, first=False):
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.set_ciphers('ECDHE+AESGCM')
ssl_context.set_alpn_protocols(["acme-tls/1"])
ssl_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
if first:
ssl_context.set_servername_callback(self.load_certificate)
ssl_context.load_cert_chain(certfile=certfile, keyfile=keyfile)
return ssl_context
def load_certificate(self, sslsocket, sni_name, sslcontext):
print("Got request for %s" % sni_name)
if not re.match(r'^(([a-zA-Z]{1})|([a-zA-Z]{1}[a-zA-Z]{1})|([a-zA-Z]{1}[0-9]{1})|([0-9]{1}[a-zA-Z]{1})|([a-zA-Z0-9][-_.a-zA-Z0-9]{0,61}[a-zA-Z0-9]))\.([a-zA-Z]{2,13}|[a-zA-Z0-9-]{2,30}.[a-zA-Z]{2,3})$', sni_name):
return
certfile = os.path.join(ALPNDIR, "%s.crt.pem" % sni_name)
keyfile = os.path.join(ALPNDIR, "%s.key.pem" % sni_name)
if not os.path.exists(certfile) or not os.path.exists(keyfile):
return
sslsocket.context = self.create_context(certfile, keyfile)
def handle(self):
if PROXY_PROTOCOL:
buf = b""
while b"\r\n" not in buf:
buf += self.request.recv(1)
ssl_context = self.create_context(FALLBACK_CERTIFICATE, FALLBACK_KEY, True)
newsock = ssl_context.wrap_socket(self.request, server_side=True)
if __name__ == "__main__":
HOST, PORT = "0.0.0.0", 443
server = ThreadedTCPServer((HOST, PORT), ThreadedTCPRequestHandler, bind_and_activate=False)
server.allow_reuse_address = True
print(f"ALPN Server bind on {HOST}:{PORT}")
try:
server.server_bind()
server.server_activate()
server.serve_forever()
except Exception as e:
print(e)
server.shutdown()