Skip to content

Commit

Permalink
handle http connections in dedicated threads, related to #31
Browse files Browse the repository at this point in the history
HTTP connections are done using blocking calls. These blocking calls could have rather long timeouts (10 min for offline server to ensure that it works on slow hardware and allows to handle hardware suspend to some limited extent) and could result in hanging application exit. By pushing HTTP connections into separate threads, the connections can be terminated with the program exit through thread termination. All idle connections are closed properly.
  • Loading branch information
rinigus committed Sep 5, 2018
1 parent 1e286b5 commit 443b37c
Showing 1 changed file with 98 additions and 9 deletions.
107 changes: 98 additions & 9 deletions poor/http.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-

# Copyright (C) 2014 Osmo Salomaa
# Copyright (C) 2014 Osmo Salomaa, 2018 Rinigus
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
Expand All @@ -17,6 +17,22 @@

"""Managed persistent HTTP connections."""

# IMPLEMENTATION COMMENTS
#
# Connections are collected in pools, one queue of pools per each
# host. All requests are handled by one of the threads in a dedicated
# thread pool. This allows to use blocking HTTP requests for
# simplicity and, at the same time, ignore ongoing blocked connections
# when its time to stop the program.
#
# Turns out that calling connection close method can be either ignored
# or this call is ignored. For example, if the server has been
# suspended for one reason or another. As a result, the blocking call
# could result in blocking exit of all program if run in the main
# thread. By pushing all connection handling into daemon threads, the
# program is closed, as expected.


import http.client
import json
import poor
Expand All @@ -25,6 +41,7 @@
import sys
import threading
import urllib.parse
from poor.attrdict import AttrDict

BROKEN_CONNECTION_ERRORS = [
BrokenPipeError,
Expand All @@ -39,6 +56,7 @@

RE_LOCALHOST = re.compile(r"://(127.0.0.1|localhost)\b")

NTHREADS=4

class ConnectionPool:

Expand All @@ -47,7 +65,6 @@ class ConnectionPool:
def __init__(self, threads):
"""Initialize a :class:`ConnectionPool` instance."""
self._alive = True
self._all_connections = set()
self._lock = threading.Lock()
self._queue = {}
self._threads = threads
Expand Down Expand Up @@ -101,7 +118,6 @@ def _new(self, url):
# https://github.com/otsaloma/poor-maps/issues/23
timeout = (600 if RE_LOCALHOST.search(url) else 15)
connection = cls(components.netloc, timeout=timeout)
self._all_connections.add(connection)
return connection

def put(self, url, connection):
Expand All @@ -121,18 +137,87 @@ def reset(self, url):

@poor.util.locked_method
def terminate(self):
"""Close all connections and terminate."""
"""Mark pool as dead to stop all ongoing connections in threads. All inactive connections are closed."""
if not self._alive: return
for connection in self._all_connections:
with poor.util.silent(Exception):
connection.close()
# Mark as dead so that subsequent operations fail.
# Mark as dead so that subsequent operations fail and current
# connections will be dropped by the worker threads.
self._alive = False
# connection.close can sometimes block when its active in
# another thread. Since we are terminating, closing of the
# threads should take care of all active connections. here,
# only passive connections are closed
for key, q in self._queue.items():
while not q.empty():
with poor.util.silent(Exception):
connection = q.get_nowait()
if connection is not None:
print("Closing connection {}:{}".format(key, id(connection)))
connection.close()
q.task_done()
print("Connecttion {}:{} closed".format(key, id(connection)))


pool = ConnectionPool(10)
def _request_worker(task_queue, result_queue):
"""Worker for filling requests"""
while True:
task = task_queue.get()
if task is None: break
try:
result = _request_real(method=task['method'], url=task['url'],
body=task['body'], encoding=task['encoding'],
retry=task['retry'],
headers=task['headers'])
result_queue.put({'result': result})
except Exception as e:
result_queue.put({'exception': e})
task_queue.task_done()


class ThreadPool:
"""Pool of threads used to perform connections"""
def __init__(self, threads):
self._thread_queues = queue.LifoQueue()
for i in range(threads):
task = queue.Queue()
result = queue.Queue()
q = AttrDict( dict(task=task, result=result) )
thread = threading.Thread( target=_request_worker,
kwargs=dict(task_queue=q.task,
result_queue=q.result),
daemon=True )
thread.start()
self._thread_queues.put(q)

def request(self, method, url, body=None, encoding=None, retry=1, headers=None):
task = dict(method=method, url=url, body=body,
encoding=encoding, retry=retry, headers=headers)
while True:
try:
q = self._thread_queues.get(timeout=1)
break
except queue.Empty:
if not pool.is_alive():
raise Exception("Connection pool closed")
if q is None: raise Exception("No thread queue found")
q.task.put(task)
while True:
try:
result = q.result.get(timeout=1)
q.result.task_done()
except queue.Empty:
if not pool.is_alive():
raise Exception("Connection pool closed")
else: continue
self._thread_queues.task_done()
self._thread_queues.put(q)
if 'exception' in result: raise result['exception']
return result['result']


# keep at most the same number of connections to the same host as we have worker threads
pool = ConnectionPool(NTHREADS)
thread_pool = ThreadPool(NTHREADS)

def get(url, encoding=None, retry=1, headers=None):
"""Make a HTTP GET request at `url` and return response."""
return _request("GET",
Expand Down Expand Up @@ -170,6 +255,10 @@ def post_json(url, body, encoding="utf_8", retry=1, headers=None):
headers=headers)

def _request(method, url, body=None, encoding=None, retry=1, headers=None):
return thread_pool.request(method=method, url=url, body=body,
encoding=encoding, retry=retry, headers=headers)

def _request_real(method, url, body, encoding, retry, headers):
"""
Make a HTTP request at `url` using `method`.
Expand Down

0 comments on commit 443b37c

Please sign in to comment.