From b3805a2f3ce9426dd01ec7303bf5c66be2acf784 Mon Sep 17 00:00:00 2001 From: Jim Fulton Date: Mon, 26 Jul 1999 20:49:04 +0000 Subject: [PATCH] just getting started --- src/ZEO/ClientStorage.py | 252 +++++++++++++++++++++++++++++++++++++++ src/ZEO/StorageServer.py | 177 +++++++++++++++++++++++++++ src/ZEO/__init__.py | 0 src/ZEO/smac.py | 64 ++++++++++ src/ZEO/zrpc.py | 112 +++++++++++++++++ src/ZEO/zrpc/smac.py | 64 ++++++++++ 6 files changed, 669 insertions(+) create mode 100644 src/ZEO/ClientStorage.py create mode 100644 src/ZEO/StorageServer.py create mode 100644 src/ZEO/__init__.py create mode 100644 src/ZEO/smac.py create mode 100644 src/ZEO/zrpc.py create mode 100644 src/ZEO/zrpc/smac.py diff --git a/src/ZEO/ClientStorage.py b/src/ZEO/ClientStorage.py new file mode 100644 index 000000000..40a0b9dfa --- /dev/null +++ b/src/ZEO/ClientStorage.py @@ -0,0 +1,252 @@ +############################################################################## +# +# Copyright (c) 1996-1998, Digital Creations, Fredericksburg, VA, USA. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# o Redistributions of source code must retain the above copyright +# notice, this list of conditions, and the disclaimer that follows. +# +# o Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions, and the following disclaimer in +# the documentation and/or other materials provided with the +# distribution. +# +# o Neither the name of Digital Creations nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# +# THIS SOFTWARE IS PROVIDED BY DIGITAL CREATIONS AND CONTRIBUTORS *AS IS* +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED +# TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A +# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL DIGITAL +# CREATIONS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS +# OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR +# TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +# USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH +# DAMAGE. +# +# +# If you have questions regarding this software, contact: +# +# Digital Creations, L.C. +# 910 Princess Ann Street +# Fredericksburge, Virginia 22401 +# +# info@digicool.com +# +# (540) 371-6909 +# +############################################################################## +"""Network ZODB storage client +""" +__version__='$Revision: 1.1 $'[11:-2] + +import struct, time, os, socket, cPickle, string, Sync, zrpc +now=time.time +from struct import pack, unpack +from ZODB import POSException, BaseStorage + +TupleType=type(()) + +class UnrecognizedResult(POSException.StorageError): + """A server call returned an unrecognized result + """ + +class ClientStorage(BaseStorage.BaseStorage): + + def __init__(self, connection, async=0): + + if async: self._call=zrpc.async(connection) + else: self._call=zrpc.sync(connection) + + info=self._call('get_info') + self._len=info.get('length',0) + self._size=info.get('size',0) + self.__name__=info.get('name', str(connection)) + self._supportsUndo=info.get('supportsUndo',0) + self._supportsVersions=info.get('supportsVersions',0) + + BaseStorage.BaseStorage.__init__(self, + info.get('name', str(connection)), + ) + + def registerDB(self, db, limit): + + def invalidate(code, args, + invalidate=db.invalidate, + limit=limit, + release=self._commit_lock_release, + ): + if code == 'I': + for oid, serial, version in args: + invalidate(oid, version=version) + elif code == 'U': + release() + + self._call.setOutOfBand(invalidate) + + + def __len__(self): return self._len + + def abortVersion(self, src, transaction): + if transaction is not self._transaction: + raise POSException.StorageTransactionError(self, transaction) + self._lock_acquire() + try: return self._call('abortVersion', src, transaction.id) + finally: self._lock_release() + + def close(self): + self._lock_acquire() + try: self._call.close() + finally: self._lock_release() + + def commitVersion(self, src, dest, transaction): + if transaction is not self._transaction: + raise POSException.StorageTransactionError(self, transaction) + self._lock_acquire() + try: return self._call('commitVersion', src, dest, transaction.id) + finally: self._lock_release() + + def getName(self): return self.__name__ + + def getSize(self): return self._size + + def history(self, oid, version, length=1): + self._lock_acquire() + try: return self._call('history', oid, version, length) + finally: self._lock_release() + + def load(self, oid, version, _stuff=None): + self._lock_acquire() + try: return self._call('load', oid, version) + finally: self._lock_release() + + def modifiedInVersion(self, oid): + self._lock_acquire() + try: return self._call('modifiedInVersion', oid) + finally: self._lock_release() + + def new_oid(self, last=None): + self._lock_acquire() + try: return self._call('new_oid') + finally: self._lock_release() + + def pack(self, t, rf): + # Note that we ignore the rf argument. The server + # will provide it's own implementation. + self._lock_acquire() + try: return self._call('pack', t) + finally: self._lock_release() + + def store(self, oid, serial, data, version, transaction): + if transaction is not self._transaction: + raise POSException.StorageTransactionError(self, transaction) + self._lock_acquire() + try: return self._call('store', oid, serial, + data, version, transaction.id) + finally: self._lock_release() + + def supportsUndo(self): return self._supportsUndo + def supportsVersions(self): return self._supportsVersions + + def tpc_abort(self, transaction): + self._lock_acquire() + try: + if transaction is not self._transaction: return + self._call('tpc_abort', id) + self._transaction=None + self._commit_lock_release() + finally: self._lock_release() + + def tpc_begin(self, transaction): + self._lock_acquire() + try: + if self._transaction is transaction: return + + while 1: + self._lock_release() + self._commit_lock_acquire() + self._lock_acquire() + if self._call('tpc_begin', id, user, desc, ext) is None: + break + + self._transaction=transaction + self._clear_temp() + + user=transaction.user + desc=transaction.description + ext=transaction._extension + if ext: ext=dumps(ext,1) + else: ext="" + self._ude=user, desc, ext + + t=time.time() + t=apply(TimeStamp,(time.gmtime(t)[:5]+(t%60,))) + self._ts=t=t.laterThan(self._ts) + self._serial=`t` + + self._begin(self._serial, user, desc, ext) + + finally: self._lock_release() + + + def tpc_finish(self, transaction, f=None): + self._lock_acquire() + try: + if transaction is not self._transaction: return + if f is not None: f() + + u,d,e=self._ude + self._finish(self._serial, u, d, e) + + self._clear_temp() + self._ude=None + self._transaction=None + self._commit_lock_release() + finally: self._lock_release() + + def _finish(self, tid, u, d, e): + pass + + + + def _finish(self, id, user, desc, ext): + return self._call('tpc_finish', id, user, desc, ext) + + + def undo(self, transaction_id): + return self._call('undo', transaction_id) + finally: self._lock_release() + + def undoLog(self, version, first, last, filter=None): + # Waaaa, we really need to get the filter through + # but how can we send it over the wire? + + # I suppose we could try to run the filter in a restricted execution + # env. + + # Maybe .... we are really going to want to pass lambdas, hm. + + self._lock_acquire() + try: return self._call('undoLog', version, first, last) # Eek! + finally: self._lock_release() + + def versionEmpty(self, version): + self._lock_acquire() + try: return self._call('versionEmpty', version) + finally: self._lock_release() + + def versions(self, max=None): + self._lock_acquire() + try: return self._call('versionEmpty', max) + finally: self._lock_release() + + diff --git a/src/ZEO/StorageServer.py b/src/ZEO/StorageServer.py new file mode 100644 index 000000000..19f631927 --- /dev/null +++ b/src/ZEO/StorageServer.py @@ -0,0 +1,177 @@ + +import asyncore, socket, string, sys, cPickle +from smac import smac +from ZODB import POSException +from ZODB.Transaction import Transaction +import traceback + +class StorageServerError(POSException.ServerError): pass + + +class Server(asyncore.dispatcher): + + def __init__(self, connection, storages): + + self.host, self.port = connection + self.__storages=storages + + self.__connections={} + self.__get_connections=self.__connections.get + + + asyncore.dispatcher.__init__(self) + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + + self.bind((self.host, self.port)) + + self.listen(5) + + def register_connection(self, connection, storage_id): + storage=self.__storages.get(storage_id, None) + if storage is None: + connection.close() + return None, None + + connections=self.__get_connections(storage_id, None) + if connections is None: + self.__connections[storage_id]=connections=[] + connections.append(connection) + + def invalidate(self, connection, storage_id, invalidated, + dumps=cPickle.dumps): + for c in self.__connections[storage_id]: + if c is connection: continue + c.message_output('I'+dumps(invalidated)) + + def writable(self): return 0 + + def handle_read(self): pass + + def readable(self): return 1 + + def handle_connect (self): pass + + def handle_accept(self): + try: + sock, addr = self.accept() + except socket.error: + sys.stderr.write('warning: accept failed\n') + + Connection(self, sock, addr) + +storage_methods={} +for n in ('get_info', 'abortVersion', 'commitVersion', 'history', + 'load', 'modifiedInVersion', 'new_oid', 'pack', 'store', + 'tpc_abort', 'tpc_begin', 'tpc_finish', 'undo', 'undoLog', + 'versionEmpty'): + storage_methods[n]=1 +storage_method=storage_methods.has_key + + +class Connection(smac): + + _transaction=None + __storage=__storage_id=None + + def __init__(self, server, sock, addr): + smac.__init__(self, sock, addr) + self.__server=server + self.__storage=server.storage + self.__invalidated=[] + + def close(self): + self.__server.unregister_connection(self, self.__storage_id) + smac.close(self) + + def message_input(self, message): + if self.__storage is None: + self.__storage, self.__storage_id = ( + self.__server.register_connection(self, message)) + return + + rt='R' + try: + args=cPickle.loads(message) + name, args = args[0], args[1:] + if not storage_method(name): + raise 'Invalid Method Name', name + if hasattr(self, name): + r=apply(getattr(self, name), args) + else: + r=apply(getattr(self.__storage, name), args) + except: + traceback.print_exc() + t, r = sys.exc_info()[:2] + if type(r) is not type(self): r=t,r + rt='E' + + r=cPickle.dumps(r,1) + self.message_output(rt+r) + + def get_info(self): + storage=self.__storage + return { + 'length': len(storage), + 'size': storage.getSize(), + 'name': storage.getName(), + 'supportsUndo': storage.supportsUndo(), + 'supportsVersions': storage.supportsVersions(), + } + + def store(self, oid, serial, data, version, id): + t=self._transaction + if t is None or id != t.id: + raise POSException.StorageTransactionError(self, id) + newserial=self.__storage.store(oid, data, serial, version, t) + if serial != '\0\0\0\0\0\0\0\0': + self.__invalidated.append(oid, serial, version) + return newserial + + def unlock(self): + self.message_output('UN') + + def tpc_abort(self, id): + t=self._transaction + if t is None or id != t.id: return + r=self.__storage.tpc_abort(t) + for c in self.__storage.__waiting: c.unlock() + self._transaction=None + self.__invalidated=[] + + + def tpc_begin(self, id, user, description, ext): + t=self._transaction + if t is not None and id == t.id: return + storage=self.__storage + if storage._transaction is not None: + storage.__waiting.append(self) + return 1 + + self._transaction=t=Transaction() + t.id=id + t.user=user + t.description=description + storage.tpc_begin(t) + storage.__waiting=[] + self.__invalidated=[] + + + def tpc_finish(self, id, user, description, ext): + t=self._transaction + if id != t.id: return + t.user=user + t.description=description + r=self.__storage.tpc_finish(t) + for c in self.__storage.__waiting: c.unlock() + self._transaction=None + self.__server.invalidate(self, self.__storage_id, self.__invalidated) + self.__invalidated=[] + + +if __name__=='__main__': + import ZODB.FileStorage + name, port = sys.argv[1:3] + try: port='',string.atoi(port) + except: pass + Server(port, ZODB.FileStorage.FileStorage(name)) + asyncore.loop() diff --git a/src/ZEO/__init__.py b/src/ZEO/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/ZEO/smac.py b/src/ZEO/smac.py new file mode 100644 index 000000000..a4ad3df11 --- /dev/null +++ b/src/ZEO/smac.py @@ -0,0 +1,64 @@ +"""Sized message async connections +""" + +import asyncore, string, struct + +class smac(asyncore.dispatcher): + + def __init__(self, sock, addr): + asyncore.dispatcher.__init__(self, sock) + self.addr=addr + self.__state=None + self.__inp=None + self.__l=4 + self.__output=output=[] + self.__append=output.append + self.__pop=output.pop + + def handle_read(self, + join=string.join, StringType=type('')): + l=self.__l + d=self.recv(l) + inp=self.__inp + if inp is None: + inp=d + elif type(inp) is StringType: + inp=[inp,d] + else: + inp.append(d) + + l=l-len(d) + if l <= 0: + if type(inp) is not StringType: inp=join(inp,'') + if self.__state is None: + # waiting for message + self.__l=struct.unpack(">i",inp)[0] + self.__state=1 + self.__inp=None + else: + self.__inp=None + self.__l=4 + self.__state=None + self.message_input(inp) + else: + self.__l=l + self.__inp=inp + + def readable(self): return 1 + def writable(self): return not not self.__output + + def handle_write(self): + output=self.__output + if output: + v=output[0] + n=self.send(v) + if n < len(v): + output[0]=v[n:] + else: + del output[0] + + def handle_close(self): self.close() + + def message_output(self, message, + pack=struct.pack, len=len): + self.__append(pack(">i",len(message))+message) diff --git a/src/ZEO/zrpc.py b/src/ZEO/zrpc.py new file mode 100644 index 000000000..ad02a05fa --- /dev/null +++ b/src/ZEO/zrpc.py @@ -0,0 +1,112 @@ +"""But simple rpc mechanisms +""" + +from cPickle import dumps, loads +from ThreadLock import allocate_lock +import socket, smac, string, struct +TupleType=type(()) + +class sync: + """Synchronous rpc""" + + _outOfBand=None + + def __init__(self, connection, outOfBand=None): + host, port = connection + s=socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.connect(host, port) + self.__s=s + self._outOfBand=outOfBand + + def setOutOfBand(self, f): self._outOfBand=f + + def __call__(self, *args): + args=dumps(args,1) + self._write(args) + while 1: + r=self._read() + c=r[:1] + if c=='R': + return loads(r[1:]) + if c=='E': + r=loads(r[1:]) + if type(r) is TupleType: raise r[0], r[1] + raise r + oob=self._outOfBand + if oob is not None: + oob(c, loads(r[1:])) + else: + raise UnrecognizedResult, r + + def _write(self, data, pack=struct.pack): + send=self.__s.send + h=pack(">i", len(data)) + l=len(h) + while l > 0: + sent=send(h) + h=h[sent:] + l=l-sent + l=len(data) + while l > 0: + sent=send(data) + data=data[sent:] + l=l-sent + + def _read(self, _st=type(''), join=string.join, unpack=struct.unpack): + recv=self.__s.recv + + l=4 + + data=None + while l > 0: + d=recv(l) + if data is None: data=d + elif type(data) is _st: data=[data, d] + else: data.append(d) + l=l-len(d) + if type(data) is not _st: data=join(data,'') + + l,=unpack(">i", data) + + data=None + while l > 0: + d=recv(l) + if data is None: data=d + elif type(data) is st: data=[data, d] + else: data.append(d) + l=l-len(d) + if type(data) is not _st: data=join(data,'') + + return data + +class async(smac.smac, sync): + + def __init__(self, connection, outOfBand=None): + host, port = connection + s=socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.connect(host, port) + self._outOfBand=outOfBand + l=allocate_lock() + self.__la=l.acquire + self.__lr=l.release + self.__r=None + l.acquire() + smac.__init__(self, s, None) + + def _write(self, data): self.message_output(data) + + def message_input(self, m): + c=m[:1] + if c in 'RE': + self.__r=m + self.__lr() + else: + oob=self._outOfBand + if oob is not None: oob(c, loads(m[1:])) + + def _read(self): + self.__la() + return self.__r + + + diff --git a/src/ZEO/zrpc/smac.py b/src/ZEO/zrpc/smac.py new file mode 100644 index 000000000..a4ad3df11 --- /dev/null +++ b/src/ZEO/zrpc/smac.py @@ -0,0 +1,64 @@ +"""Sized message async connections +""" + +import asyncore, string, struct + +class smac(asyncore.dispatcher): + + def __init__(self, sock, addr): + asyncore.dispatcher.__init__(self, sock) + self.addr=addr + self.__state=None + self.__inp=None + self.__l=4 + self.__output=output=[] + self.__append=output.append + self.__pop=output.pop + + def handle_read(self, + join=string.join, StringType=type('')): + l=self.__l + d=self.recv(l) + inp=self.__inp + if inp is None: + inp=d + elif type(inp) is StringType: + inp=[inp,d] + else: + inp.append(d) + + l=l-len(d) + if l <= 0: + if type(inp) is not StringType: inp=join(inp,'') + if self.__state is None: + # waiting for message + self.__l=struct.unpack(">i",inp)[0] + self.__state=1 + self.__inp=None + else: + self.__inp=None + self.__l=4 + self.__state=None + self.message_input(inp) + else: + self.__l=l + self.__inp=inp + + def readable(self): return 1 + def writable(self): return not not self.__output + + def handle_write(self): + output=self.__output + if output: + v=output[0] + n=self.send(v) + if n < len(v): + output[0]=v[n:] + else: + del output[0] + + def handle_close(self): self.close() + + def message_output(self, message, + pack=struct.pack, len=len): + self.__append(pack(">i",len(message))+message)