Skip to content

Commit

Permalink
Merge pull request #61 from w3c/jgraham/stash
Browse files Browse the repository at this point in the history
Create a context manager for the shared stash.
  • Loading branch information
jgraham committed Jul 13, 2015
2 parents a9fda3d + 477b5a9 commit b4d51fb
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 28 deletions.
12 changes: 9 additions & 3 deletions wptserve/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,15 @@ class Server(object):
config = None

def __init__(self, request):
host, port, authkey = stash.load_env_config()
address = (host, port)
self.stash = stash.Stash(request.url_parts.path, address, authkey)
self._stash = None
self._request = request

@property
def stash(self):
if self._stash is None:
address, authkey = stash.load_env_config()
self._stash = stash.Stash(self._request.url_parts.path, address, authkey)
return self._stash


class InputFile(object):
Expand Down
71 changes: 46 additions & 25 deletions wptserve/stash.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,57 @@
import base64
import json
import os
import uuid
from multiprocessing import Process
from multiprocessing.managers import SyncManager, DictProxy
import os
import json
from multiprocessing.managers import BaseManager, DictProxy

class ServerDictManager(BaseManager):
shared_data = {}

WPT_STASH_CONFIG = "WPT_STASH_CONFIG"
def _get_shared():
return ServerDictManager.shared_data

def load_env_config():
return json.loads(os.environ[WPT_STASH_CONFIG])
ServerDictManager.register("get_dict",
callable=_get_shared,
proxytype=DictProxy)

def store_env_config(config):
os.environ[WPT_STASH_CONFIG] = json.dumps(config)
class ClientDictManager(BaseManager):
pass

def start_server(address=None, authkey=None):
shared_data = {}
class DictManager(SyncManager):
pass
ClientDictManager.register("get_dict")

DictManager.register("get_dict",
callable=lambda:shared_data,
proxytype=DictProxy)
manager = DictManager(address, authkey)
server = manager.get_server()
server_process = Process(target=server.serve_forever)
server_process.start()
class StashServer(object):
def __init__(self, address=None, authkey=None):
self.address = address
self.authkey = authkey
self.manager = None

return (server_process, manager._address, manager._authkey)
def __enter__(self):
self.manager, self.address, self.authkey = start_server(self.address, self.authkey)
store_env_config(self.address, self.authkey)

def __exit__(self, *args, **kwargs):
if self.manager is not None:
self.manager.shutdown()

def load_env_config():
address, authkey = json.loads(os.environ["WPT_STASH_CONFIG"])
if isinstance(address, list):
address = tuple(address)
else:
address = str(address)
authkey = base64.decodestring(authkey)
return address, authkey

def store_env_config(address, authkey):
authkey = base64.encodestring(authkey)
os.environ["WPT_STASH_CONFIG"] = json.dumps((address, authkey))

def start_server(address=None, authkey=None):
manager = ServerDictManager(address, authkey)
manager.start()

return (manager, manager._address, manager._authkey)


#TODO: Consider expiring values after some fixed time for long-running
Expand Down Expand Up @@ -66,11 +91,7 @@ def _get_proxy(self, address=None, authkey=None):
Stash._proxy = {}

if Stash._proxy is None:
class DictManager(SyncManager):
pass

DictManager.register("get_dict")
manager = DictManager(address, authkey)
manager = ClientDictManager(address, authkey)
manager.connect()
Stash._proxy = manager.get_dict()

Expand Down

0 comments on commit b4d51fb

Please sign in to comment.