diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 93bef9c..862e715 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -37,3 +37,4 @@ repos: hooks: - id: mypy files: 'src/.*\.py$' + additional_dependencies: [types-requests] diff --git a/HISTORY.rst b/HISTORY.rst index fc4a375..6c42a16 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -5,13 +5,15 @@ History Unreleased ---------- * Add command line tools for handling dead-letter messages -* ``zocalo.dlq_check`` checks dead-letter queues for messages -* ``zocalo.dlq_purge`` removes messages from specified DLQs and dumps them to a directory - specified in the Zocalo configuration -* ``zocalo.dlq_reinject`` takes a serialised message produced by ``zocalo.dlq_purge`` and +* ``zocalo.dlq_check`` checks dead-letter queues for messages +* ``zocalo.dlq_purge`` removes messages from specified DLQs and dumps them to a directory + specified in the Zocalo configuration +* ``zocalo.dlq_reinject`` takes a serialised message produced by ``zocalo.dlq_purge`` and places it back on a queue -* Use ``argparse`` for all command line tools and make use of ``workflows`` transport +* Use ``argparse`` for all command line tools and make use of ``workflows`` transport argument injection. Minimum ``workflows`` version is now 2.14 +* New ``zocalo.util.rabbitmq.RabbitMQAPI()`` providing a thin wrapper around the + RabbitMQ HTTP API 0.10.0 (2021-10-04) ------------------- diff --git a/requirements_dev.txt b/requirements_dev.txt index adfea44..eb0c2ec 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -1,8 +1,11 @@ PyYAML==6.0 graypy==2.1.0 marshmallow==3.14.0 +pydantic pytest-cov==3.0.0 pytest-mock pytest==6.2.5 +requests +requests_mock setuptools==58.3.0 workflows==2.14 diff --git a/setup.cfg b/setup.cfg index 9f4e124..c5f092d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -30,6 +30,8 @@ install_requires = PyYAML graypy>=1.0 marshmallow + requests + pydantic setuptools workflows>=2.14 packages = find: diff --git a/src/zocalo/cli/dlq_check.py b/src/zocalo/cli/dlq_check.py index 127b9b0..194af82 100644 --- a/src/zocalo/cli/dlq_check.py +++ b/src/zocalo/cli/dlq_check.py @@ -1,12 +1,10 @@ import argparse -import json -import urllib import workflows.transport import zocalo.configuration from zocalo.util.jmxstats import JMXAPI -from zocalo.util.rabbitmq import http_api_request +from zocalo.util.rabbitmq import RabbitMQAPI # # zocalo.dlq_check @@ -50,16 +48,14 @@ def extract_queue_name(namestring): def check_dlq_rabbitmq( zc: zocalo.configuration.Configuration, namespace: str = None ) -> dict: - _api_request = http_api_request(zc, "/queues") - with urllib.request.urlopen(_api_request) as response: - reply = response.read() - queue_info = json.loads(reply) - dlq_info = {} - for q in queue_info: - if q["name"].startswith("dlq."): - if (namespace is None or q["vhost"] == namespace) and int(q["messages"]): - dlq_info[q["name"]] = int(q["messages"]) - return dlq_info + rmq = RabbitMQAPI.from_zocalo_configuration(zc) + return { + q.name: q.messages + for q in rmq.queues() + if q.name.startswith("dlq.") + and (namespace is None or q.vhost == namespace) + and q.messages + } def run() -> None: diff --git a/src/zocalo/cli/dlq_reinject.py b/src/zocalo/cli/dlq_reinject.py index ddcde0f..4cf5c32 100644 --- a/src/zocalo/cli/dlq_reinject.py +++ b/src/zocalo/cli/dlq_reinject.py @@ -15,7 +15,7 @@ import workflows.transport import zocalo.configuration -from zocalo.util.rabbitmq import http_api_request +from zocalo.util.rabbitmq import RabbitMQAPI def run() -> None: @@ -130,12 +130,8 @@ def run() -> None: header = dlqmsg["header"] exchange = header.get("headers", {}).get("x-death", {})[0].get("exchange") if exchange: - import urllib - - _api_request = http_api_request(zc, "/queues") - with urllib.request.urlopen(_api_request) as response: - reply = response.read() - exchange_info = json.loads(reply) + rmqapi = RabbitMQAPI.from_zocalo_configuration(zc) + exchange_info = rmqapi.get("queues").json() for exch in exchange_info: if exch["name"] == exchange: if exch["type"] == "fanout": diff --git a/src/zocalo/util/rabbitmq.py b/src/zocalo/util/rabbitmq.py index 1b22dda..d132aac 100644 --- a/src/zocalo/util/rabbitmq.py +++ b/src/zocalo/util/rabbitmq.py @@ -1,7 +1,548 @@ +import datetime +import enum +import logging +import pathlib +import urllib import urllib.request +from typing import Any, Dict, List, Optional, Tuple, Union + +import requests +from pydantic import BaseModel, Field +from workflows.transport import pika_transport import zocalo.configuration +logger = logging.getLogger("workflows.transport.pika_transport") + + +class MessageStats(BaseModel): + publish: Optional[int] = Field(None, description="Count of messages published.") + + publish_in: Optional[int] = Field( + None, + description='Count of messages published "in" to an exchange, i.e. not taking account of routing.', + ) + publish_out: Optional[int] = Field( + None, + description='Count of messages published "out" of an exchange, i.e. taking account of routing.', + ) + confirm: Optional[int] = Field(None, description="Count of messages confirmed.") + deliver: Optional[int] = Field( + None, + description="Count of messages delivered in acknowledgement mode to consumers.", + ) + deliver_no_ack: Optional[int] = Field( + None, + description="Count of messages delivered in no-acknowledgement mode to consumers.", + ) + get: Optional[int] = Field( + None, + description="Count of messages delivered in acknowledgement mode in response to basic.get.", + ) + get_no_ack: Optional[int] = Field( + None, + description="Count of messages delivered in no-acknowledgement mode in response to basic.get.", + ) + deliver_get: Optional[int] = Field( + None, description="Sum of all four of the above." + ) + redeliver: Optional[int] = Field( + None, + description="Count of subset of messages in deliver_get which had the redelivered flag set.", + ) + drop_unroutable: Optional[int] = Field( + None, description="Count of messages dropped as unroutable." + ) + return_unroutable: Optional[int] = Field( + None, description="Count of messages returned to the publisher as unroutable." + ) + + +class ConnectionState(enum.Enum): + starting = "starting" + tuning = "tuning" + opening = "opening" + running = "running" + flow = "flow" + blocking = "blocking" + blocked = "blocked" + closing = "closing" + closed = "closed" + + +class ConnectionInfo(BaseModel): + """TCP/IP connection statistics.""" + + pid: Optional[int] = Field( + int, description="Id of the Erlang process associated with the connection." + ) + name: str = Field(..., description="Readable name for the connection.") + port: int = Field(..., description="Server port.") + host: str = Field( + ..., + description="Server hostname obtained via reverse DNS, or its IP address if reverse DNS failed or was disabled.", + ) + peer_port: int = Field(..., description="Peer port.") + peer_host: str = Field( + ..., + description="Peer hostname obtained via reverse DNS, or its IP address if reverse DNS failed or was not enabled.", + ) + ssl: bool = Field( + ..., + description="Boolean indicating whether the connection is secured with SSL.", + ) + ssl_protocol: Optional[str] = Field( + None, description='SSL protocol (e.g. "tlsv1").' + ) + ssl_key_exchange: Optional[str] = Field( + None, description='SSL key exchange algorithm (e.g. "rsa").' + ) + ssl_cipher: Optional[str] = Field( + None, description='SSL cipher algorithm (e.g. "aes_256_cbc").' + ) + ssl_hash: Optional[str] = Field(None, description='SSL hash function (e.g. "sha").') + peer_cert_subject: Optional[str] = Field( + None, description="The subject of the peer's SSL certificate, in RFC4514 form." + ) + peer_cert_issuer: Optional[str] = Field( + None, description="The issuer of the peer's SSL certificate, in RFC4514 form." + ) + peer_cert_validity: Optional[str] = Field( + None, description="The period for which the peer's SSL certificate is valid." + ) + state: ConnectionState + channels: int = Field(..., description="Number of channels using the connection.") + protocol: str = Field( + ..., + description="Version of the AMQP protocol in use; currently one of: {0,9,1} {0,8,0}", + ) + auth_mechanism: str = Field( + ..., description='SASL authentication mechanism used, such as "PLAIN".' + ) + user: str = Field(..., description="Username associated with the connection.") + vhost: str = Field( + ..., description="Virtual host name with non-ASCII characters escaped as in C." + ) + timeout: Optional[int] = Field( + None, + description="Connection timeout / negotiated heartbeat interval, in seconds.", + ) + frame_max: int = Field(..., description="Maximum frame size (bytes).") + channel_max: Optional[int] = Field( + None, description="Maximum number of channels on this connection." + ) + # client_properties + # Informational properties transmitted by the client during connection establishment. + # recv_oct: + # Octets received. + # recv_cnt + # Packets received. + # send_oct + # Octets send. + # send_cnt + # Packets sent. + # send_pend + # Send queue size. + connected_at: datetime.datetime = Field( + ..., description="Date and time this connection was established, as timestamp." + ) + + +class NodeType(enum.Enum): + disc = "disc" + ram = "ram" + + +class NodeInfo(BaseModel): + # applications List of all Erlang applications running on the node. + # auth_mechanisms List of all SASL authentication mechanisms installed on the node. + # cluster_links A list of the other nodes in the cluster. For each node, there are details of the TCP connection used to connect to it and statistics on data that has been transferred. + config_files: Optional[List[pathlib.Path]] = Field( + None, description="List of config files read by the node." + ) + # contexts List of all HTTP listeners on the node. + db_dir: Optional[pathlib.Path] = Field( + None, description="Location of the persistent storage used by the node." + ) + disk_free: int = Field(..., description="Disk free space in bytes.") + disk_free_alarm: bool = Field( + ..., description="Whether the disk alarm has gone off." + ) + disk_free_limit: Optional[int] = Field( + None, description="Point at which the disk alarm will go off." + ) + enabled_plugins: Optional[List[str]] = Field( + None, + description="List of plugins which are both explicitly enabled and running.", + ) + # exchange_types Exchange types available on the node. + fd_total: int = Field(..., description="File descriptors available.") + fd_used: int = Field(..., description="Used file descriptors.") + io_read_avg_time: Optional[int] = Field( + None, + ge=0, + description="Average wall time (milliseconds) for each disk read operation in the last statistics interval.", + ) + io_read_bytes: Optional[int] = Field( + None, description="Total number of bytes read from disk by the persister." + ) + io_read_count: Optional[int] = Field( + None, description="Total number of read operations by the persister." + ) + io_reopen_count: Optional[int] = Field( + None, + description="Total number of times the persister has needed to recycle file handles between queues. In an ideal world this number will be zero; if the number is large, performance might be improved by increasing the number of file handles available to RabbitMQ.", + ) + io_seek_avg_time: Optional[int] = Field( + None, + description="Average wall time (milliseconds) for each seek operation in the last statistics interval.", + ) + io_seek_count: Optional[int] = Field( + None, description="Total number of seek operations by the persister." + ) + io_sync_avg_time: Optional[int] = Field( + None, + description="Average wall time (milliseconds) for each fsync() operation in the last statistics interval.", + ) + io_sync_count: Optional[int] = Field( + None, description="Total number of fsync() operations by the persister." + ) + io_write_avg_time: Optional[int] = Field( + None, + description="Average wall time (milliseconds) for each disk write operation in the last statistics interval.", + ) + io_write_bytes: Optional[int] = Field( + None, description="Total number of bytes written to disk by the persister." + ) + io_write_count: Optional[int] = Field( + None, description="Total number of write operations by the persister." + ) + log_files: Optional[List[pathlib.Path]] = Field( + None, + description='List of log files used by the node. If the node also sends messages to stdout, "" is also reported in the list.', + ) + mem_used: int = Field(..., description="Memory used in bytes.") + mem_alarm: bool = Field(..., description="Whether the memory alarm has gone off.") + mem_limit: Optional[int] = Field( + None, description="Point at which the memory alarm will go off." + ) + mnesia_disk_tx_count: Optional[int] = Field( + None, + description="Number of Mnesia transactions which have been performed that required writes to disk. (e.g. creating a durable queue). Only transactions which originated on this node are included.", + ) + mnesia_ram_tx_count: Optional[int] = Field( + None, + description="Number of Mnesia transactions which have been performed that did not require writes to disk. (e.g. creating a transient queue). Only transactions which originated on this node are included.", + ) + msg_store_read_count: Optional[int] = Field( + None, + description="Number of messages which have been read from the message store.", + ) + msg_store_write_count: Optional[int] = Field( + None, + description="Number of messages which have been written to the message store.", + ) + name: str = Field(..., description="Node name.") + net_ticktime: Optional[int] = Field( + None, description="Current kernel net_ticktime setting for the node." + ) + os_pid: Optional[int] = Field( + None, + description="Process identifier for the Operating System under which this node is running.", + ) + # partitions List of network partitions this node is seeing. + proc_total: int = Field(..., description="Maximum number of Erlang processes.") + proc_used: int = Field(..., description="Number of Erlang processes in use.") + processors: Optional[int] = Field( + None, description="Number of cores detected and usable by Erlang." + ) + queue_index_journal_write_count: Optional[int] = Field( + None, + description="Number of records written to the queue index journal. Each record represents a message being published to a queue, being delivered from a queue, and being acknowledged in a queue.", + ) + queue_index_read_count: Optional[int] = Field( + None, description="Number of records read from the queue index." + ) + queue_index_write_count: Optional[int] = Field( + None, description="Number of records written to the queue index." + ) + # rates_mode: 'none', 'basic' or 'detailed'. + run_queue: float = Field( + ..., description="Average number of Erlang processes waiting to run." + ) + running: bool = Field( + ..., + description="Boolean for whether this node is up. Obviously if this is false, most other stats will be missing.", + ) + # sasl_log_file Location of sasl log file. + sockets_total: Optional[int] = Field( + None, description="File descriptors available for use as sockets." + ) + sockets_used: int = Field(..., description="File descriptors used as sockets.") + type: Optional[NodeType] = None + uptime: Optional[int] = Field( + None, description="Time since the Erlang VM started, in milliseconds." + ) + # memory Detailed memory use statistics. Only appears if ?memory=true is appended to the URL. + # binary Detailed breakdown of the owners of binary memory. Only appears if ?binary=true is appended to the URL. Note that this can be an expensive query if there are many small binaries in the system. + + +class ExchangeType(enum.Enum): + direct = "direct" + topic = "topic" + headers = "headers" + fanout = "fanout" + + +class ExchangeSpec(BaseModel): + name: str = Field( + ..., + description="The name of the exchange with non-ASCII characters escaped as in C.", + ) + type: ExchangeType = Field(..., description="The exchange type") + durable: Optional[bool] = Field( + False, description="Whether or not the exchange survives server restarts." + ) + auto_delete: Optional[bool] = Field( + False, + description="Whether the exchange will be deleted automatically when no longer used.", + ) + internal: Optional[bool] = Field( + False, + description="Whether the exchange is internal, i.e. cannot be directly published to by a client.", + ) + arguments: Optional[Dict[str, Any]] = Field(None, description="Exchange arguments.") + vhost: str = Field( + ..., description="Virtual host name with non-ASCII characters escaped as in C." + ) + + class Config: + use_enum_values = True + + +class ExchangeInfo(ExchangeSpec): + policy: Optional[str] = Field( + None, description="Policy name for applying to the exchange." + ) + message_stats: Optional[MessageStats] = None + incoming: Optional[Dict] = Field( + None, + description="Detailed message stats (see section above) for publishes from channels into this exchange.", + ) + outgoing: Optional[Dict] = Field( + None, + description="Detailed message stats for publishes from this exchange into queues.", + ) + + +class PolicyApplyTo(enum.Enum): + """Which types of object this policy should apply to.""" + + queues = "queues" + exchanges = "exchanges" + all = "all" + + +class PolicySpec(BaseModel): + """Sets a policy.""" + + name: str = Field(..., description="The name of the policy.") + pattern: str = Field( + ..., + description="The regular expression, which when matches on a given resources causes the policy to apply.", + ) + definition: Dict[str, Any] = Field( + ..., + description="A set of key/value pairs (think a JSON document) that will be injected into the map of optional arguments of the matching queues and exchanges.", + ) + priority: int = Field( + 0, + description="The priority of the policy as an integer. Higher numbers indicate greater precedence. The default is 0.", + ) + apply_to: PolicyApplyTo = Field( + default=PolicyApplyTo.all, + alias="apply-to", + description="Which types of object this policy should apply to.", + ) + vhost: str = Field( + ..., description="Virtual host name with non-ASCII characters escaped as in C." + ) + + class Config: + use_enum_values = True + validate_all = True + allow_population_by_field_name = True + + +class PolicyInfo(PolicySpec): + pass + + +class QueueState(str, enum.Enum): + 'The state of the queue. Normally "running", but may be "{syncing, message_count}" if the queue is synchronising.' + + running = "running" + syncing = "syncing" + message_count = "message_count" + + +class QueueSpec(BaseModel): + name: str = Field( + ..., + description="The name of the queue with non-ASCII characters escaped as in C.", + ) + durable: Optional[bool] = Field( + False, description="Whether or not the queue survives server restarts." + ) + auto_delete: Optional[bool] = Field( + False, + description="Whether the queue will be deleted automatically when no longer used.", + ) + arguments: Optional[Dict[str, Any]] = Field(None, description="Queue arguments.") + vhost: str = Field( + ..., description="Virtual host name with non-ASCII characters escaped as in C." + ) + + +class QueueInfo(QueueSpec): + policy: Optional[str] = Field( + None, description="Effective policy name for the queue." + ) + pid: Optional[int] = Field( + None, description="Erlang process identifier of the queue." + ) + owner_pid: Optional[int] = Field( + None, + description="Id of the Erlang process of the connection which is the exclusive owner of the queue. Empty if the queue is non-exclusive.", + ) + exclusive: bool = Field( + ..., + description="True if queue is exclusive (i.e. has owner_pid), false otherwise.", + ) + exclusive_consumer_pid: Optional[int] = Field( + None, + description="Id of the Erlang process representing the channel of the exclusive consumer subscribed to this queue. Empty if there is no exclusive consumer.", + ) + exclusive_consumer_tag: Optional[str] = Field( + None, + description="Consumer tag of the exclusive consumer subscribed to this queue. Empty if there is no exclusive consumer.", + ) + messages_ready: Optional[int] = Field( + None, description="Number of messages ready to be delivered to clients." + ) + messages_unacknowledged: Optional[int] = Field( + None, + description="Number of messages delivered to clients but not yet acknowledged.", + ) + messages: Optional[int] = Field( + None, description="Sum of ready and unacknowledged messages (queue depth)." + ) + messages_ready_ram: Optional[int] = Field( + None, + description="Number of messages from messages_ready which are resident in ram.", + ) + messages_unacknowledged_ram: Optional[int] = Field( + None, + description="Number of messages from messages_unacknowledged which are resident in ram.", + ) + messages_ram: Optional[int] = Field( + None, description="Total number of messages which are resident in ram." + ) + messages_persistent: Optional[int] = Field( + None, + description="Total number of persistent messages in the queue (will always be 0 for transient queues).", + ) + message_bytes: Optional[int] = Field( + None, + description="Sum of the size of all message bodies in the queue. This does not include the message properties (including headers) or any overhead.", + ) + message_bytes_ready: Optional[int] = Field( + None, + description="Like message_bytes but counting only those messages ready to be delivered to clients.", + ) + message_bytes_unacknowledged: Optional[int] = Field( + None, + description="Like message_bytes but counting only those messages delivered to clients but not yet acknowledged.", + ) + message_bytes_ram: Optional[int] = Field( + None, + description="Like message_bytes but counting only those messages which are currently held in RAM.", + ) + message_bytes_persistent: Optional[int] = Field( + None, + description="Like message_bytes but counting only those messages which are persistent.", + ) + head_message_timestamp: Optional[datetime.datetime] = Field( + None, + description="The timestamp property of the first message in the queue, if present. Timestamps of messages only appear when they are in the paged-in state.", + ) + disk_reads: Optional[int] = Field( + None, + description="Total number of times messages have been read from disk by this queue since it started.", + ) + disk_writes: Optional[int] = Field( + None, + description="Total number of times messages have been written to disk by this queue since it started.", + ) + consumers: Optional[int] = Field(None, description="Number of consumers.") + consumer_utilisation: Optional[float] = Field( + None, + ge=0, + le=1, + description="Fraction of the time (between 0.0 and 1.0) that the queue is able to immediately deliver messages to consumers. This can be less than 1.0 if consumers are limited by network congestion or prefetch count.", + ) + memory: Optional[int] = Field( + None, + description="Bytes of memory allocated by the runtime for the queue, including stack, heap and internal structures.", + ) + state: Optional[QueueState] = None + message_stats: Optional[MessageStats] = None + incoming: Optional[dict] = Field( + None, + description="Detailed message stats (see section above) for publishes from exchanges into this queue.", + ) + deliveries: Optional[dict] = Field( + None, + description="Detailed message stats for deliveries from this queue into channels.", + ) + consumer_details: Optional[List[Any]] = Field( + None, + description="List of consumers on this channel, with some details on each.", + ) + + +class HashingAlgorithm(enum.Enum): + rabbit_password_hashing_sha256 = "rabbit_password_hashing_sha256" + rabbit_password_hashing_sha512 = "rabbit_password_hashing_sha512" + rabbit_password_hashing_md5 = "rabbit_password_hashing_md5" + + +class UserSpec(BaseModel): + """ + The tags key is mandatory. + Either password or password_hash can be set.If neither are set the user will not be + able to log in with a password, but other mechanisms like client certificates may + be used. Setting password_hash to "" will ensure the user cannot use a password to + log in. tags is a comma-separated list of tags for the user. Currently recognised + tags are administrator, monitoring and management. password_hash must be generated + using the algorithm described here. You may also specify the hash function being used + by adding the hashing_algorithm key to the body. Currently recognised algorithms are + rabbit_password_hashing_sha256, rabbit_password_hashing_sha512, and + rabbit_password_hashing_md5. + """ + + name: str = Field(..., description="Username") + password_hash: str = Field(..., description="Hash of the user password.") + hashing_algorithm: HashingAlgorithm + tags: str + + class Config: + use_enum_values = True + + +class UserInfo(UserSpec): + pass + def http_api_request( zc: zocalo.configuration.Configuration, @@ -33,3 +574,176 @@ def http_api_request( opener = urllib.request.build_opener(handler) urllib.request.install_opener(opener) return urllib.request.Request(f"{zc.rabbitmqapi['base_url']}{api_path}") + + +class RabbitMQAPI: + def __init__(self, url: str, user: str, password: str): + self._auth = (user, password) + self._url = url + + @classmethod + def from_zocalo_configuration(cls, zc: zocalo.configuration.Configuration): + return cls( + url=zc.rabbitmqapi["base_url"], + user=zc.rabbitmqapi["username"], + password=zc.rabbitmqapi["password"], + ) + + @property + def health_checks(self) -> Tuple[Dict[str, Any], Dict[str, str]]: + # https://rawcdn.githack.com/rabbitmq/rabbitmq-server/v3.9.7/deps/rabbitmq_management/priv/www/api/index.html + HEALTH_CHECKS = { + "health/checks/alarms", + "health/checks/local-alarms", + "health/checks/certificate-expiration/1/months", + f"health/checks/port-listener/{pika_transport.PikaTransport.defaults['--rabbit-port']}", + # f"health/checks/port-listener/1234", + "health/checks/protocol-listener/amqp", + "health/checks/virtual-hosts", + "health/checks/node-is-mirror-sync-critical", + "health/checks/node-is-quorum-critical", + } + + success = {} + failure = {} + for health_check in HEALTH_CHECKS: + response = self.get(health_check) + if response.status_code == requests.codes.ok: + success[health_check] = response.json() + else: + failure[health_check] = response.json() + return success, failure + + def get(self, endpoint: str, params: Dict[str, Any] = None) -> requests.Response: + return requests.get(f"{self._url}/{endpoint}", auth=self._auth, params=params) + + def put( + self, endpoint: str, params: Dict[str, Any] = None, json: Dict[str, Any] = None + ) -> requests.Response: + return requests.put( + f"{self._url}/{endpoint}", auth=self._auth, params=params, json=json + ) + + def delete(self, endpoint: str, params: Dict[str, Any] = None) -> requests.Response: + return requests.delete( + f"{self._url}/{endpoint}", auth=self._auth, params=params + ) + + def connections( + self, name: Optional[str] = None + ) -> Union[List[ConnectionInfo], ConnectionInfo]: + endpoint = "connections" + if name is not None: + endpoint = f"{endpoint}/{name}/" + response = self.get(endpoint) + return ConnectionInfo(**response.json()) + response = self.get(endpoint) + return [ConnectionInfo(**qi) for qi in response.json()] + + def nodes(self, name: Optional[str] = None) -> Union[List[NodeInfo], NodeInfo]: + # https://www.rabbitmq.com/monitoring.html#node-metrics + endpoint = "nodes" + if name is not None: + endpoint = f"{endpoint}/{name}" + response = self.get(endpoint) + return NodeInfo(**response.json()) + response = self.get(endpoint) + return [NodeInfo(**qi) for qi in response.json()] + + def exchanges( + self, vhost: Optional[str] = None, name: Optional[str] = None + ) -> Union[List[ExchangeInfo], ExchangeInfo]: + endpoint = "exchanges" + if vhost is not None and name is not None: + endpoint = f"{endpoint}/{vhost}/{name}/" + response = self.get(endpoint) + return ExchangeInfo(**response.json()) + elif vhost is not None: + endpoint = f"{endpoint}/{vhost}/" + elif name is not None: + raise ValueError("name can not be set without vhost") + response = self.get(endpoint) + return [ExchangeInfo(**qi) for qi in response.json()] + + def exchange_declare(self, exchange: ExchangeSpec): + endpoint = f"exchanges/{exchange.vhost}/{exchange.name}/" + self.put( + endpoint, + json=exchange.dict(exclude_defaults=True, exclude={"name", "vhost"}), + ) + + def exchange_delete(self, vhost: str, name: str, if_unused: bool = False): + endpoint = f"exchanges/{vhost}/{name}" + self.delete(endpoint, params={"if_unused": if_unused}) + + def policies( + self, vhost: Optional[str] = None, name: Optional[str] = None + ) -> Union[List[PolicyInfo], PolicyInfo]: + endpoint = "policies" + if vhost is not None and name is not None: + endpoint = f"{endpoint}/{vhost}/{name}/" + response = self.get(endpoint) + return PolicyInfo(**response.json()) + elif vhost is not None: + endpoint = f"{endpoint}/{vhost}/" + elif name is not None: + raise ValueError("name can not be set without vhost") + response = self.get(endpoint) + return [PolicyInfo(**p) for p in response.json()] + + def set_policy(self, policy: PolicySpec): + endpoint = f"policies/{policy.vhost}/{policy.name}/" + self.put( + endpoint, + json=policy.dict( + exclude_defaults=True, exclude={"name", "vhost"}, by_alias=True + ), + ) + + def clear_policy(self, vhost: str, name: str): + endpoint = f"policies/{vhost}/{name}/" + self.delete(endpoint) + + def queues( + self, vhost: Optional[str] = None, name: Optional[str] = None + ) -> Union[List[QueueInfo], QueueInfo]: + endpoint = "queues" + if vhost is not None and name is not None: + endpoint = f"{endpoint}/{vhost}/{name}" + response = self.get(endpoint) + return QueueInfo(**response.json()) + elif vhost is not None: + endpoint = f"{endpoint}/{vhost}" + elif name is not None: + raise ValueError("name can not be set without vhost") + response = self.get(endpoint) + return [QueueInfo(**qi) for qi in response.json()] + + def queue_declare(self, queue: QueueSpec): + endpoint = f"queues/{queue.vhost}/{queue.name}" + self.put( + endpoint, json=queue.dict(exclude_defaults=True, exclude={"name", "vhost"}) + ) + + def queue_delete( + self, vhost: str, name: str, if_unused: bool = False, if_empty: bool = False + ): + endpoint = f"queues/{vhost}/{name}" + self.delete(endpoint, params={"if_unused": if_unused, "if_empty": if_empty}) + + def users(self, name: str = None) -> Union[List[UserInfo], UserInfo]: + endpoint = "users" + if name: + endpoint = f"{endpoint}/{name}/" + response = self.get(endpoint) + return UserInfo(**response.json()) + response = self.get(endpoint) + return [UserInfo(**u) for u in response.json()] + + def add_user(self, user: UserSpec): + endpoint = f"users/{user.name}/" + self.put(endpoint, json=user.dict(exclude_defaults=True, exclude={"name"})) + + def delete_user(self, name: str): + endpoint = f"users/{name}/" + self.delete(endpoint) diff --git a/tests/cli/test_dlq_check.py b/tests/cli/test_dlq_check.py index f162889..615b840 100644 --- a/tests/cli/test_dlq_check.py +++ b/tests/cli/test_dlq_check.py @@ -1,4 +1,3 @@ -import json from unittest import mock import zocalo.cli.dlq_check @@ -21,20 +20,31 @@ def test_activemq_dlq_check(mock_jmx): assert checked == {"images": 2, "transient": 5} -@mock.patch("zocalo.cli.dlq_check.urllib.request.urlopen") -@mock.patch("zocalo.cli.dlq_check.http_api_request") -def test_activemq_dlq_rabbitmq_check(mock_api, mock_url): - cfg = Configuration({}) - _mock = mock.MagicMock() - mock_api.return_value = "" - mock_url.return_value = _mock - mock_url.return_value.__enter__.return_value.read.return_value = json.dumps( - [ - {"name": "images", "vhost": "zocalo", "messages": 10}, - {"name": "dlq.images", "vhost": "zocalo", "messages": 2}, - {"name": "dlq.transient", "vhost": "zocalo", "messages": 5}, - ] +def test_activemq_dlq_rabbitmq_check(requests_mock): + zc = mock.Mock() + zc.rabbitmqapi = { + "base_url": "http://fake.com/api", + "username": "guest", + "password": "guest", + } + requests_mock.get( + "/api/queues", + json=[ + {"name": "images", "vhost": "zocalo", "messages": 10, "exclusive": False}, + { + "name": "dlq.images", + "vhost": "zocalo", + "messages": 2, + "exclusive": False, + }, + { + "name": "dlq.transient", + "vhost": "zocalo", + "messages": 5, + "exclusive": False, + }, + ], ) - checked = zocalo.cli.dlq_check.check_dlq_rabbitmq(cfg, "zocalo") + checked = zocalo.cli.dlq_check.check_dlq_rabbitmq(zc, "zocalo") assert checked == {"dlq.images": 2, "dlq.transient": 5} diff --git a/tests/util/test_rabbitmq.py b/tests/util/test_rabbitmq.py index 6a3718b..cfb23fd 100644 --- a/tests/util/test_rabbitmq.py +++ b/tests/util/test_rabbitmq.py @@ -1,13 +1,371 @@ +import re + +import pytest + import zocalo.configuration -from zocalo.util.rabbitmq import http_api_request +import zocalo.util.rabbitmq as rabbitmq -def test_http_api_request(mocker): +@pytest.fixture +def zocalo_configuration(mocker): zc = mocker.MagicMock(zocalo.configuration.Configuration) zc.rabbitmqapi = { "base_url": "http://rabbitmq.burrow.com:12345/api", "username": "carrots", "password": "carrots", } - request = http_api_request(zc, api_path="/queues") + return zc + + +@pytest.fixture +def rmqapi(zocalo_configuration): + return rabbitmq.RabbitMQAPI.from_zocalo_configuration(zocalo_configuration) + + +def test_http_api_request(zocalo_configuration): + request = rabbitmq.http_api_request(zocalo_configuration, api_path="/queues") assert request.get_full_url() == "http://rabbitmq.burrow.com:12345/api/queues" + + +def test_api_health_checks(requests_mock, rmqapi): + requests_mock.get(re.compile("/health/checks/"), json={"status": "ok"}) + success, failures = rmqapi.health_checks + assert not failures + assert success + for k, v in success.items(): + assert k.startswith("health/checks/") + assert v == {"status": "ok"} + + +def test_api_health_checks_failures(requests_mock, rmqapi): + expected_json = { + "status": "failed", + "reason": "No active listener", + "missing": 1234, + "ports": [25672, 15672, 1883, 15692, 61613, 5672], + } + requests_mock.get(re.compile("/health/checks/"), json={"status": "ok"}) + requests_mock.get( + re.compile("/health/checks/port-listener"), + status_code=503, + reason="No active listener", + json=expected_json, + ) + success, failures = rmqapi.health_checks + assert failures + assert success + assert len(failures) == 1 + for k, v in success.items(): + assert k.startswith("health/checks/") + assert v == {"status": "ok"} + for k, v in failures.items(): + assert k.startswith("health/checks/port-listener/") + assert v == expected_json + + +def test_api_queues(requests_mock, rmqapi): + queue = { + "consumers": 0, + "exclusive": False, + "memory": 110112, + "message_stats": { + "deliver_get": 33, + "deliver_get_details": {"rate": 0}, + "publish": 22, + "publish_details": {"rate": 0}, + }, + "messages": 0, + "messages_ready": 0, + "messages_unacknowledged": 0, + "name": "foo", + "vhost": "zocalo", + } + + # First call rmq.queues() with defaults + requests_mock.get("/api/queues", json=[queue]) + assert rmqapi.queues() == [rabbitmq.QueueInfo(**queue)] + + # Now call with vhost=... + requests_mock.get("/api/queues/zocalo", json=[queue]) + assert rmqapi.queues(vhost="zocalo") == [rabbitmq.QueueInfo(**queue)] + + # Now call with vhost=..., name=... + requests_mock.get(f"/api/queues/zocalo/{queue['name']}", json=queue) + assert rmqapi.queues(vhost="zocalo", name=queue["name"]) == rabbitmq.QueueInfo( + **queue + ) + + +@pytest.fixture +def queue_spec(): + return rabbitmq.QueueSpec( + name="foo", + auto_delete=True, + vhost="zocalo", + arguments={"x-single-active-consumer": True}, + ) + + +def test_api_queue_declare(requests_mock, rmqapi, queue_spec): + requests_mock.put("/api/queues/zocalo/foo") + rmqapi.queue_declare(queue=queue_spec) + assert requests_mock.call_count == 1 + history = requests_mock.request_history[0] + assert history.method == "PUT" + assert history.url.endswith("/api/queues/zocalo/foo") + assert history.json() == { + "auto_delete": True, + "arguments": queue_spec.arguments, + } + + +def test_api_queue_delete(requests_mock, rmqapi, queue_spec): + requests_mock.delete("/api/queues/zocalo/foo") + rmqapi.queue_delete(vhost="zocalo", name="foo", if_unused=True, if_empty=True) + assert requests_mock.call_count == 1 + history = requests_mock.request_history[0] + assert history.method == "DELETE" + assert history.url.endswith("/api/queues/zocalo/foo?if_unused=True&if_empty=True") + + +def test_api_nodes(requests_mock, rmqapi): + node = { + "name": "rabbit@pooter123", + "mem_limit": 80861855744, + "mem_alarm": False, + "mem_used": 143544320, + "disk_free_limit": 50000000, + "disk_free_alarm": False, + "disk_free": 875837644800, + "fd_total": 32768, + "fd_used": 56, + "io_file_handle_open_attempt_count": 647, + "sockets_total": 29401, + "sockets_used": 0, + "gc_num": 153378077, + "gc_bytes_reclaimed": 7998215046336, + "proc_total": 1048576, + "proc_used": 590, + "run_queue": 1, + "running": True, + "type": "disc", + } + + # First call rmq.nodes() with defaults + requests_mock.get("/api/nodes", json=[node]) + assert rmqapi.nodes() == [rabbitmq.NodeInfo(**node)] + + # Now call with name=... + requests_mock.get(f"/api/nodes/{node['name']}", json=node) + assert rmqapi.nodes(name=node["name"]) == rabbitmq.NodeInfo(**node) + + +@pytest.mark.parametrize("name", ["", "foo"]) +def test_api_exchanges(name, requests_mock, rmqapi): + exchange = { + "arguments": {}, + "auto_delete": False, + "durable": True, + "internal": False, + "message_stats": { + "publish_in": 156447, + "publish_in_details": {"rate": 0.4}, + "publish_out": 156445, + "publish_out_details": {"rate": 0.4}, + }, + "name": name, + "type": "direct", + "user_who_performed_action": "rmq-internal", + "vhost": "foo", + } + + # First call rmq.exchanges() with defaults + requests_mock.get("/api/exchanges", json=[exchange]) + assert rmqapi.exchanges() == [rabbitmq.ExchangeInfo(**exchange)] + + # Now call with vhost=... + requests_mock.get("/api/exchanges/zocalo/", json=[exchange]) + assert rmqapi.exchanges(vhost="zocalo") == [rabbitmq.ExchangeInfo(**exchange)] + + # Now call with vhost=..., name=... + requests_mock.get( + f"/api/exchanges/{exchange['vhost']}/{exchange['name']}/", json=exchange + ) + assert rmqapi.exchanges( + vhost=exchange["vhost"], name=exchange["name"] + ) == rabbitmq.ExchangeInfo(**exchange) + + +def exchange_spec(name): + return rabbitmq.ExchangeSpec( + name=name, + type="fanout", + durable=True, + auto_delete=True, + internal=False, + vhost="zocalo", + ) + + +@pytest.mark.parametrize("name", ["", "foo"]) +def test_api_exchange_declare(name, requests_mock, rmqapi): + requests_mock.put(f"/api/exchanges/zocalo/{name}/") + rmqapi.exchange_declare(exchange=exchange_spec(name)) + assert requests_mock.call_count == 1 + history = requests_mock.request_history[0] + assert history.method == "PUT" + assert history.url.endswith(f"/api/exchanges/zocalo/{name}/") + assert history.json() == { + "type": "fanout", + "auto_delete": True, + "durable": True, + } + + +def test_api_exchange_delete(requests_mock, rmqapi): + requests_mock.delete("/api/exchanges/zocalo/foo") + rmqapi.exchange_delete(vhost="zocalo", name="foo", if_unused=True) + assert requests_mock.call_count == 1 + history = requests_mock.request_history[0] + assert history.method == "DELETE" + assert history.url.endswith("/api/exchanges/zocalo/foo?if_unused=True") + + +def test_api_connections(requests_mock, rmqapi): + connection = { + "auth_mechanism": "PLAIN", + "connected_at": 1634716019864, + "frame_max": 131072, + "host": "123.24.5.67", + "name": "123.24.5.67:12345 -> 123.24.5.67:54321", + "node": "rabbit@cs05r-sc-serv-26", + "peer_host": "123.24.5.67", + "peer_port": 12345, + "port": 54321, + "protocol": "AMQP 0-9-1", + "ssl": False, + "state": "running", + "timeout": 60, + "user": "foo", + "vhost": "bar", + "channels": 1, + } + + # First call rmq.connections() with defaults + requests_mock.get("/api/connections", json=[connection]) + assert rmqapi.connections() == [rabbitmq.ConnectionInfo(**connection)] + + # Now call with name=... + requests_mock.get(f"/api/connections/{connection['name']}/", json=connection) + assert rmqapi.connections(name=connection["name"]) == rabbitmq.ConnectionInfo( + **connection + ) + + +def test_api_users(requests_mock, rmqapi): + user = { + "name": "guest", + "password_hash": "guest", + "hashing_algorithm": "rabbit_password_hashing_sha256", + "tags": "administrator", + } + + # First call rmq.users() with defaults + requests_mock.get("/api/users", json=[user]) + assert rmqapi.users() == [rabbitmq.UserInfo(**user)] + + # Now call with name=... + requests_mock.get(f"/api/users/{user['name']}/", json=user) + assert rmqapi.users(name=user["name"]) == rabbitmq.UserInfo(**user) + + +@pytest.fixture +def user_spec(): + return rabbitmq.UserSpec( + name="guest", + password_hash="guest", + hashing_algorithm="rabbit_password_hashing_sha256", + tags="administrator", + ) + + +def test_api_add_user(requests_mock, rmqapi, user_spec): + requests_mock.put(f"/api/users/{user_spec.name}/") + rmqapi.add_user(user=user_spec) + assert requests_mock.call_count == 1 + history = requests_mock.request_history[0] + assert history.method == "PUT" + assert history.url.endswith(f"/api/users/{user_spec.name}/") + assert history.json() == { + "password_hash": "guest", + "hashing_algorithm": "rabbit_password_hashing_sha256", + "tags": "administrator", + } + + +def test_api_delete_user(requests_mock, rmqapi, user_spec): + requests_mock.delete("/api/users/guest/") + rmqapi.delete_user(name="guest") + assert requests_mock.call_count == 1 + history = requests_mock.request_history[0] + assert history.method == "DELETE" + assert history.url.endswith("/api/users/guest/") + + +def test_api_policies(requests_mock, rmqapi): + policy = { + "vhost": "foo", + "name": "redelivery", + "pattern": "^amq.", + "apply-to": "queues", + "definition": {"delivery-limit": 5}, + "priority": 0, + } + + # First call rmq.policies() with defaults + requests_mock.get("/api/policies", json=[policy]) + assert rmqapi.policies() == [rabbitmq.PolicyInfo(**policy)] + + # Now call with vhost=... + requests_mock.get(f"/api/policies/{policy['vhost']}/", json=[policy]) + assert rmqapi.policies(vhost=policy["vhost"]) == [rabbitmq.PolicyInfo(**policy)] + + # Now call with vhost=..., name=... + requests_mock.get(f"/api/policies/{policy['vhost']}/{policy['name']}/", json=policy) + assert rmqapi.policies( + vhost=policy["vhost"], name=policy["name"] + ) == rabbitmq.PolicyInfo(**policy) + + +@pytest.fixture +def policy_spec(): + return rabbitmq.PolicySpec( + name="bar", + pattern="^amq.", + apply_to=rabbitmq.PolicyApplyTo.queues, + definition={"delivery-limit": 5}, + vhost="foo", + ) + + +def test_api_set_policy(requests_mock, rmqapi, policy_spec): + requests_mock.put(f"/api/policies/foo/{policy_spec.name}/") + rmqapi.set_policy(policy=policy_spec) + assert requests_mock.call_count == 1 + history = requests_mock.request_history[0] + assert history.method == "PUT" + assert history.url.endswith(f"/api/policies/foo/{policy_spec.name}/") + assert history.json() == { + "pattern": "^amq.", + "apply-to": "queues", + "definition": {"delivery-limit": 5}, + } + + +def test_api_clear_policy(requests_mock, rmqapi, policy_spec): + requests_mock.delete("/api/policies/foo/bar/") + rmqapi.clear_policy(vhost="foo", name="bar") + assert requests_mock.call_count == 1 + history = requests_mock.request_history[0] + assert history.method == "DELETE" + assert history.url.endswith("/api/policies/foo/bar/")