Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix django2.0 #179

Merged
merged 14 commits into from
Aug 15, 2018
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ChangeLog.rst
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
Change Log
==========
New in draft
--------------------
* Change set_many and set_multi api return value. see [pr](https://github.com/pinterest/pymemcache/pull/179)

New in version 1.4.4
--------------------
* pypy3 to travis test matrix
Expand Down
27 changes: 15 additions & 12 deletions pymemcache/client/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

RECV_SIZE = 4096
VALID_STORE_RESULTS = {
b'set': (b'STORED',),
b'set': (b'STORED', b'NOT_STORED'),
b'add': (b'STORED', b'NOT_STORED'),
b'replace': (b'STORED', b'NOT_STORED'),
b'append': (b'STORED', b'NOT_STORED'),
Expand Down Expand Up @@ -107,11 +107,11 @@ def _check_key(key, allow_unicode_keys, key_prefix=b''):
)
elif c == ord(b'\00'):
raise MemcacheIllegalInputError(
"Key contains null character: '%r'" % (key,)
"Key contains null character: '%r'" % (key,)
)
elif c == ord(b'\r'):
raise MemcacheIllegalInputError(
"Key contains carriage return: '%r'" % (key,)
"Key contains carriage return: '%r'" % (key,)
)
return key

Expand Down Expand Up @@ -309,17 +309,19 @@ def set_many(self, values, expire=0, noreply=None):
self.default_noreply).

Returns:
If no exception is raised, always returns True. Otherwise all, some
or none of the keys have been successfully set. If noreply is True
then a successful return does not guarantee that any keys were
successfully set (just that the keys were successfully sent).
Empty list
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't this now return a list of failed sets.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx! i fixed it.

"""

# TODO: make this more performant by sending all the values first, then
# waiting for all the responses.
if noreply is None:
noreply = self.default_noreply

failed = []
for key, value in six.iteritems(values):
self.set(key, value, expire, noreply)
return True
result = self.set(key, value, expire, noreply)
if not result:
failed.append(key)
return failed

set_multi = set_many

Expand Down Expand Up @@ -656,7 +658,7 @@ def version(self):

if not result.startswith(b'VERSION '):
raise MemcacheUnknownError(
"Received unexpected response: %s" % (result, ))
"Received unexpected response: %s" % (result, ))

return result[8:]

Expand Down Expand Up @@ -930,7 +932,8 @@ def set(self, key, value, expire=0, noreply=None):

def set_many(self, values, expire=0, noreply=None):
with self.client_pool.get_and_release(destroy_on_fail=True) as client:
return client.set_many(values, expire=expire, noreply=noreply)
failed = client.set_many(values, expire=expire, noreply=noreply)
return failed

set_multi = set_many

Expand Down
153 changes: 115 additions & 38 deletions pymemcache/client/hash.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import socket
import time
import logging
import six

from pymemcache.client.base import Client, PooledClient, _check_key
from pymemcache.client.rendezvous import RendezvousHash
Expand All @@ -13,6 +14,7 @@ class HashClient(object):
"""
A client for communicating with a cluster of memcached servers
"""

def __init__(
self,
servers,
Expand Down Expand Up @@ -171,34 +173,7 @@ def _safely_run_func(self, client, func, default_val, *args, **kwargs):
# Connecting to the server fail, we should enter
# retry mode
except socket.error:
# This client has never failed, lets mark it for failure
if (
client.server not in self._failed_clients and
self.retry_attempts > 0
):
self._failed_clients[client.server] = {
'failed_time': time.time(),
'attempts': 0,
}
# We aren't allowing any retries, we should mark the server as
# dead immediately
elif (
client.server not in self._failed_clients and
self.retry_attempts <= 0
):
self._failed_clients[client.server] = {
'failed_time': time.time(),
'attempts': 0,
}
logger.debug("marking server as dead %s", client.server)
self.remove_server(*client.server)
# This client has failed previously, we need to update the metadata
# to reflect that we have attempted it again
else:
failed_metadata = self._failed_clients[client.server]
failed_metadata['attempts'] += 1
failed_metadata['failed_time'] = time.time()
self._failed_clients[client.server] = failed_metadata
self._mark_failed_server(client.server)

# if we haven't enabled ignore_exc, don't move on gracefully, just
# raise the exception
Expand All @@ -214,6 +189,95 @@ def _safely_run_func(self, client, func, default_val, *args, **kwargs):

return default_val

def _safely_run_set_many(self, client, values, *args, **kwargs):
failed = []
succeeded = []
try:
if client.server in self._failed_clients:
# This server is currently failing, lets check if it is in
# retry or marked as dead
failed_metadata = self._failed_clients[client.server]

# we haven't tried our max amount yet, if it has been enough
# time lets just retry using it
if failed_metadata['attempts'] < self.retry_attempts:
failed_time = failed_metadata['failed_time']
if time.time() - failed_time > self.retry_timeout:
logger.debug(
'retrying failed server: %s', client.server
)
succeeded, failed, err = self._set_many(
client, values, *args, **kwargs)
if err is not None:
raise err
# we were successful, lets remove it from the failed
# clients
self._failed_clients.pop(client.server)
return failed
return values.keys()
else:
# We've reached our max retry attempts, we need to mark
# the sever as dead
logger.debug('marking server as dead: %s', client.server)
self.remove_server(*client.server)

succeeded, failed, err = self._set_many(
client, values, *args, **kwargs
)
if err is not None:
raise err

return failed

# Connecting to the server fail, we should enter
# retry mode
except socket.error:
self._mark_failed_server(client.server)

# if we haven't enabled ignore_exc, don't move on gracefully, just
# raise the exception
if not self.ignore_exc:
raise

return list(set(values.keys()) - set(succeeded))
except Exception:
# any exceptions that aren't socket.error we need to handle
# gracefully as well
if not self.ignore_exc:
raise

return list(set(values.keys()) - set(succeeded))

def _mark_failed_server(self, server):
# This client has never failed, lets mark it for failure
if (
server not in self._failed_clients and
self.retry_attempts > 0
):
self._failed_clients[server] = {
'failed_time': time.time(),
'attempts': 0,
}
# We aren't allowing any retries, we should mark the server as
# dead immediately
elif (
server not in self._failed_clients and
self.retry_attempts <= 0
):
self._failed_clients[server] = {
'failed_time': time.time(),
'attempts': 0,
}
logger.debug("marking server as dead %s", server)
self.remove_server(*server)
# This client has failed previously, we need to update the metadata
# to reflect that we have attempted it again
else:
failed_metadata = self._failed_clients[server]
failed_metadata['attempts'] += 1
failed_metadata['failed_time'] = time.time()
self._failed_clients[server] = failed_metadata

def _run_cmd(self, cmd, key, default_val, *args, **kwargs):
client = self._get_client(key)

Expand All @@ -227,6 +291,22 @@ def _run_cmd(self, cmd, key, default_val, *args, **kwargs):
client, func, default_val, *args, **kwargs
)

def _set_many(self, client, values, *args, **kwargs):
failed = []
succeeded = []

try:
for key, value in six.iteritems(values):
result = client.set(key, value, *args, **kwargs)
if result:
succeeded.append(key)
else:
failed.append(key)
except Exception as e:
return succeeded, failed, e

return succeeded, failed, None

def set(self, key, *args, **kwargs):
return self._run_cmd('set', key, False, *args, **kwargs)

Expand All @@ -241,13 +321,13 @@ def decr(self, key, *args, **kwargs):

def set_many(self, values, *args, **kwargs):
client_batches = {}
end = []
failed = []

for key, value in values.items():
for key, value in six.iteritems(values):
client = self._get_client(key)

if client is None:
end.append(False)
failed.append(key)
continue

if client.server not in client_batches:
Expand All @@ -257,15 +337,12 @@ def set_many(self, values, *args, **kwargs):

for server, values in client_batches.items():
client = self.clients['%s:%s' % server]
new_args = list(args)
new_args.insert(0, values)
result = self._safely_run_func(
client,
client.set_many, False, *new_args, **kwargs

failed += self._safely_run_set_many(
client, values, *args, **kwargs
)
end.append(result)

return all(end)
return failed

set_multi = set_many

Expand Down
39 changes: 27 additions & 12 deletions pymemcache/test/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,13 @@ def test_set_noreply(self):
def test_set_many_success(self):
client = self.make_client([b'STORED\r\n'])
result = client.set_many({b'key': b'value'}, noreply=False)
assert result is True
assert result == []

def test_set_multi_success(self):
# Should just map to set_many
client = self.make_client([b'STORED\r\n'])
result = client.set_multi({b'key': b'value'}, noreply=False)
assert result is True
assert result == []

def test_add_stored(self):
client = self.make_client([b'STORED\r', b'\n'])
Expand Down Expand Up @@ -602,7 +602,7 @@ def _set():
def test_set_many_socket_handling(self):
client = self.make_client([b'STORED\r\n'])
result = client.set_many({b'key': b'value'}, noreply=False)
assert result is True
assert result == []
assert client.sock.closed is False
assert len(client.sock.send_bufs) == 1

Expand Down Expand Up @@ -738,18 +738,26 @@ def _default_noreply_true(self, cmd, args, response):
result = getattr(client, cmd)(*args)
assert result is True

def _default_noreply_true_and_empty_list(self, cmd, args, response):
client = self.make_client(response, default_noreply=True)
result = getattr(client, cmd)(*args)
assert result == []

def test_default_noreply_set(self):
with pytest.raises(MemcacheUnknownError):
self._default_noreply_false(
'set', (b'key', b'value'), [b'NOT_STORED\r\n'])
'set', (b'key', b'value'), [b'UNKNOWN\r\n'])
self._default_noreply_false(
'set', (b'key', b'value'), [b'NOT_STORED\r\n'])
self._default_noreply_true(
'set', (b'key', b'value'), [b'NOT_STORED\r\n'])

def test_default_noreply_set_many(self):
with pytest.raises(MemcacheUnknownError):
self._default_noreply_false(
'set_many', ({b'key': b'value'},), [b'NOT_STORED\r\n'])
self._default_noreply_true(
client = self.make_client([b'UNKNOWN\r\n'], default_noreply=False)
result = client.set_many({b'key': b'value'})
assert result == [b'key']
self._default_noreply_true_and_empty_list(
'set_many', ({b'key': b'value'},), [b'NOT_STORED\r\n'])

def test_default_noreply_add(self):
Expand Down Expand Up @@ -854,18 +862,25 @@ def _default_noreply_true(self, cmd, args, response):
result = getattr(client, cmd)(*args)
assert result is True

def _default_noreply_true_and_empty_list(self, cmd, args, response):
client = self.make_client(response, default_noreply=True)
result = getattr(client, cmd)(*args)
assert result == []

def test_default_noreply_set(self):
with pytest.raises(MemcacheUnknownError):
self._default_noreply_false(
'set', (b'key', b'value'), [b'NOT_STORED\r\n'])
'set', (b'key', b'value'), [b'UNKNOWN\r\n'])
self._default_noreply_false(
'set', (b'key', b'value'), [b'NOT_STORED\r\n'])
self._default_noreply_true(
'set', (b'key', b'value'), [b'NOT_STORED\r\n'])

def test_default_noreply_set_many(self):
with pytest.raises(MemcacheUnknownError):
self._default_noreply_false(
'set_many', ({b'key': b'value'},), [b'NOT_STORED\r\n'])
self._default_noreply_true(
client = self.make_client([b'UNKNOWN\r\n'], default_noreply=False)
client.set_many({b'key': b'value'})
self._default_noreply_true_and_empty_list(
'set_many', ({b'key': b'value'},), [b'NOT_STORED\r\n'])

def test_default_noreply_add(self):
Expand Down Expand Up @@ -1008,5 +1023,5 @@ def test_recv(self):
b'key1 0 6\r\nval',
socket.error(errno.EINTR, "Interrupted system call"),
b'ue1\r\nEND\r\n',
])
])
assert client[b'key1'] == b'value1'
Loading