Skip to content

Commit

Permalink
Merge pull request #179 from uncovertruth/feature/set_many_result2
Browse files Browse the repository at this point in the history
fix django2.0
  • Loading branch information
jogo authored Aug 15, 2018
2 parents f8a9850 + 8ce0737 commit 88e5bf5
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 64 deletions.
4 changes: 4 additions & 0 deletions ChangeLog.rst
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
Change Log
==========
New in draft
--------------------
* Change set_many and set_multi api return value. see [pr](https://github.com/pinterest/pymemcache/pull/179)

New in version 1.4.4
--------------------
* pypy3 to travis test matrix
Expand Down
28 changes: 16 additions & 12 deletions pymemcache/client/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

RECV_SIZE = 4096
VALID_STORE_RESULTS = {
b'set': (b'STORED',),
b'set': (b'STORED', b'NOT_STORED'),
b'add': (b'STORED', b'NOT_STORED'),
b'replace': (b'STORED', b'NOT_STORED'),
b'append': (b'STORED', b'NOT_STORED'),
Expand Down Expand Up @@ -107,11 +107,11 @@ def _check_key(key, allow_unicode_keys, key_prefix=b''):
)
elif c == ord(b'\00'):
raise MemcacheIllegalInputError(
"Key contains null character: '%r'" % (key,)
"Key contains null character: '%r'" % (key,)
)
elif c == ord(b'\r'):
raise MemcacheIllegalInputError(
"Key contains carriage return: '%r'" % (key,)
"Key contains carriage return: '%r'" % (key,)
)
return key

Expand Down Expand Up @@ -309,17 +309,20 @@ def set_many(self, values, expire=0, noreply=None):
self.default_noreply).
Returns:
If no exception is raised, always returns True. Otherwise all, some
or none of the keys have been successfully set. If noreply is True
then a successful return does not guarantee that any keys were
successfully set (just that the keys were successfully sent).
Returns a list of keys that failed to be inserted.
If noreply is True, alwais returns empty list.
"""

# TODO: make this more performant by sending all the values first, then
# waiting for all the responses.
if noreply is None:
noreply = self.default_noreply

failed = []
for key, value in six.iteritems(values):
self.set(key, value, expire, noreply)
return True
result = self.set(key, value, expire, noreply)
if not result:
failed.append(key)
return failed

set_multi = set_many

Expand Down Expand Up @@ -656,7 +659,7 @@ def version(self):

if not result.startswith(b'VERSION '):
raise MemcacheUnknownError(
"Received unexpected response: %s" % (result, ))
"Received unexpected response: %s" % (result, ))

return result[8:]

Expand Down Expand Up @@ -930,7 +933,8 @@ def set(self, key, value, expire=0, noreply=None):

def set_many(self, values, expire=0, noreply=None):
with self.client_pool.get_and_release(destroy_on_fail=True) as client:
return client.set_many(values, expire=expire, noreply=noreply)
failed = client.set_many(values, expire=expire, noreply=noreply)
return failed

set_multi = set_many

Expand Down
153 changes: 115 additions & 38 deletions pymemcache/client/hash.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import socket
import time
import logging
import six

from pymemcache.client.base import Client, PooledClient, _check_key
from pymemcache.client.rendezvous import RendezvousHash
Expand All @@ -13,6 +14,7 @@ class HashClient(object):
"""
A client for communicating with a cluster of memcached servers
"""

def __init__(
self,
servers,
Expand Down Expand Up @@ -171,34 +173,7 @@ def _safely_run_func(self, client, func, default_val, *args, **kwargs):
# Connecting to the server fail, we should enter
# retry mode
except socket.error:
# This client has never failed, lets mark it for failure
if (
client.server not in self._failed_clients and
self.retry_attempts > 0
):
self._failed_clients[client.server] = {
'failed_time': time.time(),
'attempts': 0,
}
# We aren't allowing any retries, we should mark the server as
# dead immediately
elif (
client.server not in self._failed_clients and
self.retry_attempts <= 0
):
self._failed_clients[client.server] = {
'failed_time': time.time(),
'attempts': 0,
}
logger.debug("marking server as dead %s", client.server)
self.remove_server(*client.server)
# This client has failed previously, we need to update the metadata
# to reflect that we have attempted it again
else:
failed_metadata = self._failed_clients[client.server]
failed_metadata['attempts'] += 1
failed_metadata['failed_time'] = time.time()
self._failed_clients[client.server] = failed_metadata
self._mark_failed_server(client.server)

# if we haven't enabled ignore_exc, don't move on gracefully, just
# raise the exception
Expand All @@ -214,6 +189,95 @@ def _safely_run_func(self, client, func, default_val, *args, **kwargs):

return default_val

def _safely_run_set_many(self, client, values, *args, **kwargs):
failed = []
succeeded = []
try:
if client.server in self._failed_clients:
# This server is currently failing, lets check if it is in
# retry or marked as dead
failed_metadata = self._failed_clients[client.server]

# we haven't tried our max amount yet, if it has been enough
# time lets just retry using it
if failed_metadata['attempts'] < self.retry_attempts:
failed_time = failed_metadata['failed_time']
if time.time() - failed_time > self.retry_timeout:
logger.debug(
'retrying failed server: %s', client.server
)
succeeded, failed, err = self._set_many(
client, values, *args, **kwargs)
if err is not None:
raise err
# we were successful, lets remove it from the failed
# clients
self._failed_clients.pop(client.server)
return failed
return values.keys()
else:
# We've reached our max retry attempts, we need to mark
# the sever as dead
logger.debug('marking server as dead: %s', client.server)
self.remove_server(*client.server)

succeeded, failed, err = self._set_many(
client, values, *args, **kwargs
)
if err is not None:
raise err

return failed

# Connecting to the server fail, we should enter
# retry mode
except socket.error:
self._mark_failed_server(client.server)

# if we haven't enabled ignore_exc, don't move on gracefully, just
# raise the exception
if not self.ignore_exc:
raise

return list(set(values.keys()) - set(succeeded))
except Exception:
# any exceptions that aren't socket.error we need to handle
# gracefully as well
if not self.ignore_exc:
raise

return list(set(values.keys()) - set(succeeded))

def _mark_failed_server(self, server):
# This client has never failed, lets mark it for failure
if (
server not in self._failed_clients and
self.retry_attempts > 0
):
self._failed_clients[server] = {
'failed_time': time.time(),
'attempts': 0,
}
# We aren't allowing any retries, we should mark the server as
# dead immediately
elif (
server not in self._failed_clients and
self.retry_attempts <= 0
):
self._failed_clients[server] = {
'failed_time': time.time(),
'attempts': 0,
}
logger.debug("marking server as dead %s", server)
self.remove_server(*server)
# This client has failed previously, we need to update the metadata
# to reflect that we have attempted it again
else:
failed_metadata = self._failed_clients[server]
failed_metadata['attempts'] += 1
failed_metadata['failed_time'] = time.time()
self._failed_clients[server] = failed_metadata

def _run_cmd(self, cmd, key, default_val, *args, **kwargs):
client = self._get_client(key)

Expand All @@ -227,6 +291,22 @@ def _run_cmd(self, cmd, key, default_val, *args, **kwargs):
client, func, default_val, *args, **kwargs
)

def _set_many(self, client, values, *args, **kwargs):
failed = []
succeeded = []

try:
for key, value in six.iteritems(values):
result = client.set(key, value, *args, **kwargs)
if result:
succeeded.append(key)
else:
failed.append(key)
except Exception as e:
return succeeded, failed, e

return succeeded, failed, None

def set(self, key, *args, **kwargs):
return self._run_cmd('set', key, False, *args, **kwargs)

Expand All @@ -241,13 +321,13 @@ def decr(self, key, *args, **kwargs):

def set_many(self, values, *args, **kwargs):
client_batches = {}
end = []
failed = []

for key, value in values.items():
for key, value in six.iteritems(values):
client = self._get_client(key)

if client is None:
end.append(False)
failed.append(key)
continue

if client.server not in client_batches:
Expand All @@ -257,15 +337,12 @@ def set_many(self, values, *args, **kwargs):

for server, values in client_batches.items():
client = self.clients['%s:%s' % server]
new_args = list(args)
new_args.insert(0, values)
result = self._safely_run_func(
client,
client.set_many, False, *new_args, **kwargs

failed += self._safely_run_set_many(
client, values, *args, **kwargs
)
end.append(result)

return all(end)
return failed

set_multi = set_many

Expand Down
39 changes: 27 additions & 12 deletions pymemcache/test/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,13 @@ def test_set_noreply(self):
def test_set_many_success(self):
client = self.make_client([b'STORED\r\n'])
result = client.set_many({b'key': b'value'}, noreply=False)
assert result is True
assert result == []

def test_set_multi_success(self):
# Should just map to set_many
client = self.make_client([b'STORED\r\n'])
result = client.set_multi({b'key': b'value'}, noreply=False)
assert result is True
assert result == []

def test_add_stored(self):
client = self.make_client([b'STORED\r', b'\n'])
Expand Down Expand Up @@ -602,7 +602,7 @@ def _set():
def test_set_many_socket_handling(self):
client = self.make_client([b'STORED\r\n'])
result = client.set_many({b'key': b'value'}, noreply=False)
assert result is True
assert result == []
assert client.sock.closed is False
assert len(client.sock.send_bufs) == 1

Expand Down Expand Up @@ -738,18 +738,26 @@ def _default_noreply_true(self, cmd, args, response):
result = getattr(client, cmd)(*args)
assert result is True

def _default_noreply_true_and_empty_list(self, cmd, args, response):
client = self.make_client(response, default_noreply=True)
result = getattr(client, cmd)(*args)
assert result == []

def test_default_noreply_set(self):
with pytest.raises(MemcacheUnknownError):
self._default_noreply_false(
'set', (b'key', b'value'), [b'NOT_STORED\r\n'])
'set', (b'key', b'value'), [b'UNKNOWN\r\n'])
self._default_noreply_false(
'set', (b'key', b'value'), [b'NOT_STORED\r\n'])
self._default_noreply_true(
'set', (b'key', b'value'), [b'NOT_STORED\r\n'])

def test_default_noreply_set_many(self):
with pytest.raises(MemcacheUnknownError):
self._default_noreply_false(
'set_many', ({b'key': b'value'},), [b'NOT_STORED\r\n'])
self._default_noreply_true(
client = self.make_client([b'UNKNOWN\r\n'], default_noreply=False)
result = client.set_many({b'key': b'value'})
assert result == [b'key']
self._default_noreply_true_and_empty_list(
'set_many', ({b'key': b'value'},), [b'NOT_STORED\r\n'])

def test_default_noreply_add(self):
Expand Down Expand Up @@ -854,18 +862,25 @@ def _default_noreply_true(self, cmd, args, response):
result = getattr(client, cmd)(*args)
assert result is True

def _default_noreply_true_and_empty_list(self, cmd, args, response):
client = self.make_client(response, default_noreply=True)
result = getattr(client, cmd)(*args)
assert result == []

def test_default_noreply_set(self):
with pytest.raises(MemcacheUnknownError):
self._default_noreply_false(
'set', (b'key', b'value'), [b'NOT_STORED\r\n'])
'set', (b'key', b'value'), [b'UNKNOWN\r\n'])
self._default_noreply_false(
'set', (b'key', b'value'), [b'NOT_STORED\r\n'])
self._default_noreply_true(
'set', (b'key', b'value'), [b'NOT_STORED\r\n'])

def test_default_noreply_set_many(self):
with pytest.raises(MemcacheUnknownError):
self._default_noreply_false(
'set_many', ({b'key': b'value'},), [b'NOT_STORED\r\n'])
self._default_noreply_true(
client = self.make_client([b'UNKNOWN\r\n'], default_noreply=False)
client.set_many({b'key': b'value'})
self._default_noreply_true_and_empty_list(
'set_many', ({b'key': b'value'},), [b'NOT_STORED\r\n'])

def test_default_noreply_add(self):
Expand Down Expand Up @@ -1008,5 +1023,5 @@ def test_recv(self):
b'key1 0 6\r\nval',
socket.error(errno.EINTR, "Interrupted system call"),
b'ue1\r\nEND\r\n',
])
])
assert client[b'key1'] == b'value1'
Loading

0 comments on commit 88e5bf5

Please sign in to comment.