Skip to content

Commit

Permalink
Create a context manager for the shared stash.
Browse files Browse the repository at this point in the history
This ensures that the stash server is killed when it is no longer being used.
Also allow the use of non-ip sockets for the the manager which is likely
to be faster.
  • Loading branch information
jgraham committed Jul 13, 2015
1 parent a9fda3d commit 477b5a9
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 28 deletions.
12 changes: 9 additions & 3 deletions wptserve/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,15 @@ class Server(object):
config = None

def __init__(self, request):
host, port, authkey = stash.load_env_config()
address = (host, port)
self.stash = stash.Stash(request.url_parts.path, address, authkey)
self._stash = None
self._request = request

@property
def stash(self):
if self._stash is None:
address, authkey = stash.load_env_config()
self._stash = stash.Stash(self._request.url_parts.path, address, authkey)
return self._stash


class InputFile(object):
Expand Down
71 changes: 46 additions & 25 deletions wptserve/stash.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,57 @@
import base64
import json
import os
import uuid
from multiprocessing import Process
from multiprocessing.managers import SyncManager, DictProxy
import os
import json
from multiprocessing.managers import BaseManager, DictProxy

class ServerDictManager(BaseManager):
shared_data = {}

WPT_STASH_CONFIG = "WPT_STASH_CONFIG"
def _get_shared():
return ServerDictManager.shared_data

def load_env_config():
return json.loads(os.environ[WPT_STASH_CONFIG])
ServerDictManager.register("get_dict",
callable=_get_shared,
proxytype=DictProxy)

def store_env_config(config):
os.environ[WPT_STASH_CONFIG] = json.dumps(config)
class ClientDictManager(BaseManager):
pass

def start_server(address=None, authkey=None):
shared_data = {}
class DictManager(SyncManager):
pass
ClientDictManager.register("get_dict")

DictManager.register("get_dict",
callable=lambda:shared_data,
proxytype=DictProxy)
manager = DictManager(address, authkey)
server = manager.get_server()
server_process = Process(target=server.serve_forever)
server_process.start()
class StashServer(object):
def __init__(self, address=None, authkey=None):
self.address = address
self.authkey = authkey
self.manager = None

return (server_process, manager._address, manager._authkey)
def __enter__(self):
self.manager, self.address, self.authkey = start_server(self.address, self.authkey)
store_env_config(self.address, self.authkey)

def __exit__(self, *args, **kwargs):
if self.manager is not None:
self.manager.shutdown()

def load_env_config():
address, authkey = json.loads(os.environ["WPT_STASH_CONFIG"])
if isinstance(address, list):
address = tuple(address)
else:
address = str(address)
authkey = base64.decodestring(authkey)
return address, authkey

def store_env_config(address, authkey):
authkey = base64.encodestring(authkey)
os.environ["WPT_STASH_CONFIG"] = json.dumps((address, authkey))

def start_server(address=None, authkey=None):
manager = ServerDictManager(address, authkey)
manager.start()

return (manager, manager._address, manager._authkey)


#TODO: Consider expiring values after some fixed time for long-running
Expand Down Expand Up @@ -66,11 +91,7 @@ def _get_proxy(self, address=None, authkey=None):
Stash._proxy = {}

if Stash._proxy is None:
class DictManager(SyncManager):
pass

DictManager.register("get_dict")
manager = DictManager(address, authkey)
manager = ClientDictManager(address, authkey)
manager.connect()
Stash._proxy = manager.get_dict()

Expand Down

0 comments on commit 477b5a9

Please sign in to comment.