From f274bbfe689879c81c5a2414a82fd40360e632cc Mon Sep 17 00:00:00 2001 From: Yin Congmin Date: Tue, 3 Jan 2023 16:51:04 +0800 Subject: [PATCH] control/discovery: add discovery controller The discovery contrller implement the basic function. Use command "python3 -m control.discovery" to start discovery controller. Client can use command "nvme discover -t tcp -a ip -s port" to get log pages. The configuration is in ceph-nvmeof.conf [discovery] part. feature: https://github.com/ceph/ceph-nvmeof/issues/108 Signed-off-by: Yin Congmin --- ceph-nvmeof.conf | 5 + control/discovery.py | 1110 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 1115 insertions(+) create mode 100644 control/discovery.py diff --git a/ceph-nvmeof.conf b/ceph-nvmeof.conf index c40f065e..e32bcaff 100644 --- a/ceph-nvmeof.conf +++ b/ceph-nvmeof.conf @@ -17,6 +17,11 @@ enable_auth = False state_update_notify = True state_update_interval_sec = 5 +[discovery] + +addr = +port = 8009 + [ceph] pool = rbd diff --git a/control/discovery.py b/control/discovery.py new file mode 100644 index 00000000..55ecd64d --- /dev/null +++ b/control/discovery.py @@ -0,0 +1,1110 @@ +# +# Copyright (c) 2021 International Business Machines +# All rights reserved. +# +# SPDX-License-Identifier: LGPL-3.0-or-later +# +# Authors: congmin.yin@intel.com +# + +import argparse +import grpc +import json +import logging +from .config import GatewayConfig +from .state import GatewayState, LocalGatewayState, OmapGatewayState, GatewayStateHandler + +import rados +from typing import Dict, Optional + +import socket +import threading +import time +import enum +import struct +import selectors +from ctypes import Structure, c_bool, c_ubyte, c_uint8, c_uint16, c_uint32, c_uint64, c_float + +# NVMe tcp pdu type +class NVME_TCP_PDU(enum.IntFlag): + ICREQ = 0x0 + ICRESP = 0x1 + H2C_TERM = 0x2 + C2H_TERM = 0x3 + CMD = 0x4 + RSP = 0x5 + H2C_DATA = 0x6 + C2H_DATA = 0x7 + TCP_R2T = 0x9 + +# NVMe tcp opcode +class NVME_TCP_OPC(enum.IntFlag): + DELETE_SQ = 0x0 + CREATE_SQ = 0x1 + GET_LOG_PAGE = 0x2 + DELETE_CQ = 0x4 + CREATE_CQ = 0x5 + IDENTIFY = 0x6 + ABORT = 0x8 + SET_FEATURES = 0x9 + GET_FEATURES = 0xa + ASYNC_EVE_REQ = 0xc + NS_MGMT = 0xd + FW_COMMIT = 0x10 + FW_IMG_DOWNLOAD = 0x11 + NS_ATTACH = 0x15 + KEEP_ALIVE = 0x18 + FABRIC_TYPE = 0x7F + +# NVMe tcp fabric command type +class NVME_TCP_FCTYPE(enum.IntFlag): + PROP_SET = 0x0 + CONNECT = 0x1 + PROP_GET = 0x4 + AUTH_SEND = 0x5 + AUTH_RECV = 0x6 + DISCONNECT = 0x8 + +# NVMe controller register space offsets +class NVME_CTL(enum.IntFlag): + CAPABILITIES = 0x0 + VERSION = 0x08 + CONFIGURATION = 0x14 + STATUS = 0x1c + + +# NVM subsystem types +class NVMF_SUBTYPE(enum.IntFlag): + # Discovery type for NVM subsystem + DISCOVERY = 0x1 + # NVMe type for NVM subsystem + NVME = 0x2 + +# NVMe over Fabrics transport types +transport_types = { + "RDMA": 0x1, + "FC": 0x2, + "TCP": 0x3, + "INTRA_HOST": 0xfe +} + +# Address family types +adrfam_types = { + "ipv4": 0x1, + "ipv6": 0x2, + "ib": 0x3, + "fc": 0x4, + "intra_host": 0xfe +} + +# Transport requirement, secure channel requirements +# Connections shall be made over a fabric secure channel +class NVMF_TREQ_SECURE_CHANNEL(enum.IntFlag): + NOT_SPECIFIED = 0x0 + REQUIRED = 0x1 + NOT_REQUIRED = 0x2 + +# maximum number of connections +MAX_CONNECTION = 10240 + +# NVMe tcp package length, refer: MTU = 1500 bytes +NVME_TCP_PDU_UNIT = 1024 + +# Max SQ head pointer +SQ_HEAD_MAX = 128 + +lock = threading.Lock() + +# Global controller id +GLOBAL_CNLID = 0x1 + +# Global generation counter +GLOBAL_GEN_CNT = 0x1 + +class Connection(Structure): + """Data used multiple times in each connection.""" + + _fields_ = [ + ("nvmeof_connect_data_hostid", c_ubyte * 16), + ("nvmeof_connect_data_cntlid", c_uint16), + ("nvmeof_connect_data_subnqn", c_ubyte * 256), + ("nvmeof_connect_data_hostnqn", c_ubyte * 256), + ("sq_head_ptr", c_uint16), + ("unsent_log_page_len", c_uint32), + ("property_data", c_ubyte * 8), + ("property_set", c_bool), + ("shutdown_now", c_bool), + ("cnlid", c_uint16), + ("gen_cnt", c_uint16), + ("recv_async", c_bool), + ("async_cmd_id", c_uint16), + ("keep_alive_time", c_float), + ("keep_alive_timeout", c_uint32), + ] + + def __init__(self): + self.connection = 0 + self.allow_listeners = [] + self.log_page = bytearray() + self.recv_buffer = b'' + + self.unsent_log_page_len = 0 + self.property_set = False + self.shutdown_now = False + self.recv_async = False + +class Pdu(Structure): + _fields_ = [ + ("type", c_uint8), + ("specical_flag", c_uint8), + ("header_length", c_uint8), + ("data_offset", c_uint8), + ("packet_length", c_uint32), + ] + + def compose_reply(self): + return bytes(self) + +class ICResp(Structure): + _fields_ = [ + # pdu version format + ("version_format", c_uint16), + # controller Pdu data alignment + ("data_alignment", c_uint8), + # digest types enabled + ("digest_types", c_uint8), + # Maximum data capsules per r2t supported + ("maximum_data_capsules", c_uint32), + ] + + def compose_reply(self): + return bytes(self) + +class CqeConnect(Structure): + _fields_ = [ + ("controller_id", c_uint16), + ("authentication", c_uint16), + ("reserved", c_uint32), + ("sq_head_ptr", c_uint16), + ("sq_id", c_uint16), + ("cmd_id", c_uint16), + ("status", c_uint16) + ] + + def compose_reply(self): + return bytes(self) + +class CqePropertyGetSet(Structure): + _fields_ = [ + # property data for property get, reserved for property set + ("property_data", c_ubyte * 8), + ("sq_head_ptr", c_uint16), + ("sq_id", c_uint16), + ("cmd_id", c_uint16), + ("status", c_uint16) + ] + + def compose_reply(self): + return bytes(self) + +class NVMeTcpDataPdu(Structure): + _fields_ = [ + ("cmd_id", c_uint16), + ("transfer_tag", c_uint16), + ("data_offset", c_uint32), + ("data_length", c_uint32), + ("reserved", c_uint32) + ] + + def compose_reply(self): + return bytes(self) + +class NVMeIdentify(Structure): + _fields_ = [ + # skip some fields, include VID, SSVID, SN, MN + ("todo_fields1", c_ubyte * 64), + ("firmware_revision", c_ubyte * 8), + # RAB, IEEE, CMIC + ("todo_fields2", c_ubyte * 5), + # maximum data transfer size + ("mdts", c_uint8), + ("controller_id", c_uint16), + ("version", c_uint8 * 4), + # RTD3R, RTD3E + ("todo_fields3", c_ubyte * 8), + # optional asynchronous events supported + ("oaes", c_ubyte * 4), + # CTRATT, RRLS, CNTRLTYPE, FGUID, NVMe Management Interface, OACS, ACL + ("todo_fields4", c_ubyte * 163), + # asynchronous events request limit + ("aerl", c_uint8), + ("firmware_updates", c_uint8), + # log page attributes + ("lpa", c_uint8), + # error log page entries(ELPE) + ("elpe", c_uint8), + # NPSS, AVSCC, APSTA, WCTEMP, CCTEMP, MTFA, HMPRE, HMIN, TNVMCAP... + # TODO: keep alive support - timer value(KAS)? + ("todo_fields5", c_ubyte * 251), + # maximum outstanding commands + ("max_cmd", c_uint16), + # number of namespace, optional NVM command support + ("todo_fields6", c_uint8 * 6), + # fused operation support + ("fused_operation", c_uint16), + # FNA, VWC, AWUN, AWUPF, NVSCC, NWPC + ("todo_fields7", c_uint8 * 8), + # atomic compare & write unit + ("acwu", c_uint16), + ("reserved1", c_uint16), + # SGL support + ("sgls", c_uint8 * 4), + # maxinum number of allowed namespaces + ("mnan", c_uint32), + ("reserved2", c_ubyte * 224), + ("subnqn", c_ubyte * 256), + ("reserved3", c_ubyte * 768), + ("nvmeof_attributes", c_ubyte * 256), + ("power_state_attributes", c_ubyte * 1024), + ("vendor_specific", c_ubyte * 1024) + ] + + def compose_reply(self): + return bytes(self) + +# for set feature, keep alive and async +class CqeNVMe(Structure): + _fields_ = [ + ("dword0", c_uint32), + ("dword1", c_uint32), + ("sq_head_ptr", c_uint16), + ("sq_id", c_uint16), + ("cmd_id", c_uint16), + ("status", c_uint16) + ] + + def compose_reply(self): + return bytes(self) + +class NVMeGetLogPage(Structure): + _fields_ = [ + # generation counter + ("genctr", c_uint64), + # number of records + ("numrec", c_uint64), + #record format + ("recfmt", c_uint16), + ("reserved", c_ubyte * 1006) + ] + + def compose_short_reply(self): + return bytes(self)[:16] + + def compose_data_reply(self): + return bytes(self) + +class DiscoveryLogEntry(Structure): + _fields_ = [ + ("trtype", c_uint8), + ("adrfam", c_uint8), + ("subtype", c_uint8), + ("treq", c_uint8), + ("port_id", c_uint16), + ("controller_id", c_uint16), + # admin max SQ size + ("asqsz", c_uint16), + ("reserved1", c_ubyte * 22), + ("trsvcid", c_ubyte * 32), + ("reserved2", c_ubyte * 192), + ("subnqn", c_ubyte * 256), + ("traddr", c_ubyte * 256), + # Transport specific address subtype + ("tsas", c_ubyte * 256) + ] + + def compose_reply(self): + return bytes(self) + +class DiscoveryService: + """Implements discovery controller. + + Response discover request from initiator. + + Instance attributes: + version: Discovery controller version + config: Basic gateway parameters + logger: Logger instance to track discovery controller events + omap_name: OMAP object name + ioctx: I/O context which allows OMAP access + discovery_addr: Discovery controller addr which allows initiator send command + discovery_port: Discovery controller's listening port + """ + + BDEV_PREFIX = "bdev_" + NAMESPACE_PREFIX = "namespace_" + SUBSYSTEM_PREFIX = "subsystem_" + HOST_PREFIX = "host_" + LISTENER_PREFIX = "listener_" + + def __init__(self, config): + self.logger = logging.getLogger(__name__) + self.logger.setLevel(level=logging.DEBUG) + + self.version = 1 + self.config = config + + gateway_group = self.config.get("gateway", "group") + self.omap_name = f"nvmeof.{gateway_group}.state" \ + if gateway_group else "nvmeof.state" + self.logger.info(f"log pages info from omap: {self.omap_name}") + + ceph_pool = self.config.get("ceph", "pool") + ceph_conf = self.config.get("ceph", "config_file") + conn = rados.Rados(conffile=ceph_conf) + conn.connect() + self.ioctx = conn.open_ioctx(ceph_pool) + + self.discovery_addr = self.config.get("discovery", "addr") + self.discovery_port = self.config.get("discovery", "port") + if self.discovery_addr == '' or self.discovery_port == '': + self.logger.error("discovery addr/port are empty.") + assert 0 + self.logger.info(f"discovery addr: {self.discovery_addr} port: {self.discovery_port}") + + self.conn_vals = {} + self.selector = selectors.DefaultSelector() + + def _read_all(self) -> Dict[str, str]: + """Reads OMAP and returns dict of all keys and values.""" + + with rados.ReadOpCtx() as read_op: + iter, _ = self.ioctx.get_omap_vals(read_op, "", "", -1) + self.ioctx.operate_read_op(read_op, self.omap_name) + omap_dict = dict(iter) + return omap_dict + + def _get_vals(self, omap_dict, prefix): + """Read values from the OMAP dict.""" + + vals = [] + for (key, val) in omap_dict.items(): + if key.startswith(prefix): + val_text = val.decode('utf-8') + js = json.loads(val_text) + vals.append(js) + return vals + + def reply_initialize(self, conn): + """Reply initialize request.""" + + self.logger.debug("handle ICreq.") + pdu_reply = Pdu() + pdu_reply.type = NVME_TCP_PDU.ICRESP + pdu_reply.header_length = 128 + pdu_reply.packet_length = 128 + + icresp_reply = ICResp() + icresp_reply.maximum_data_capsules = 131072 + + reply = bytes(pdu_reply) + bytes(icresp_reply) + bytes(112) + + try: + conn.sendall(reply) + except BrokenPipeError: + self.logger.error("client disconnected unexpectedly.") + return -1 + self.logger.debug("reply initialize connection request.") + return 0 + + def reply_fc_cmd_connect(self, conn, data, cmd_id): + """Reply connect request.""" + + self.logger.debug("handle connect request.") + self_conn = self.conn_vals[conn.fileno()] + hf_nvmeof_cmd_connect_rsvd1 = struct.unpack_from('<19B', data, 13) + SIGL1 = struct.unpack_from('> 6) & 0x3 + if shutdown_notification == 0: + if self_conn.property_set is True: + # controller status: ready + property_get.property_data = (c_ubyte * 8)(0x01, 0x00, \ + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00) + else: + property_get.property_data = (c_ubyte * 8)(0x00, 0x00, \ + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00) + else: + # here shutdown_notification should be 0x1 + property_get.property_data = (c_ubyte * 8)(0x09, 0x00, \ + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00) + self_conn.shutdown_now = True + elif NVME_CTL(nvmeof_prop_get_set_offset) == NVME_CTL.VERSION: + # nvme version: 1.3 + property_get.property_data = (c_ubyte * 8)(0x00, 0x03, \ + 0x01, 0x00, 0x00, 0x00, 0x00, 0x00) + else: + self.logger.error("unsupported type for property getting.") + property_get.sq_head_ptr = self_conn.sq_head_ptr + property_get.cmd_id = cmd_id + + reply = bytes(pdu_reply) + bytes(property_get) + try: + conn.sendall(reply) + except BrokenPipeError: + self.logger.error("client disconnected unexpectedly.") + return -1 + self.logger.debug("reply property get request.") + return 0 + + def reply_fc_cmd_prop_set(self, conn, data, cmd_id): + """Reply property set request.""" + + self.logger.debug("handle property set request.") + self_conn = self.conn_vals[conn.fileno()] + nvmeof_prop_get_set_rsvd0 = struct.unpack_from('<35B', data, 13) + nvmeof_prop_get_set_attrib = struct.unpack_from('<1B', data, 48)[0] + nvmeof_prop_get_set_rsvd1 = struct.unpack_from('<3B', data, 49) + nvmeof_prop_get_set_offset = struct.unpack_from('> 8) & 0x1F + get_logpage_lsi = nvme_get_logpage_dword11 >> 16 + get_logpage_uid_idx = nvme_get_logpage_dword14 & 0x3F + + if get_logpage_lid != 0x70: + self.logger.error("request type error, not discovery request.") + return -1 + + # Filter listeners based on host access permissions + allow_listeners = self_conn.allow_listeners + if len(allow_listeners) == 0: + for host in hosts: + a = host["host_nqn"] + if host["host_nqn"] == '*' or host["host_nqn"] == hostnqn: + for listener in listeners: + # TODO: It is better to change nqn in the "listener" + # to subsystem_nqn to avoid confusion + if host["subsystem_nqn"] == listener["nqn"]: + allow_listeners += [listener,] + self_conn.allow_listeners = allow_listeners + + # Prepare all log page data segments + if self_conn.unsent_log_page_len == 0 and nvme_data_len > 16: + self_conn.unsent_log_page_len = 1024 * (len(allow_listeners) + 1) + self_conn.log_page = bytearray(self_conn.unsent_log_page_len) + + nvme_get_log_page_reply = NVMeGetLogPage() + nvme_get_log_page_reply.genctr = self_conn.gen_cnt + nvme_get_log_page_reply.numrec = len(allow_listeners) + self_conn.log_page[0:1024] = bytes(nvme_get_log_page_reply) + + # log entries + log_entry_counter = 0 + while log_entry_counter < len(allow_listeners): + log_entry = DiscoveryLogEntry() + trtype = transport_types.get(allow_listeners[log_entry_counter]["trtype"]) + if trtype is None: + self.logger.error("unsupported transport type") + else: + log_entry.trtype = trtype + adrfam = adrfam_types.get(allow_listeners[log_entry_counter]["adrfam"]) + if adrfam is None: + self.logger.error("unsupported adress family") + else: + log_entry.adrfam = adrfam + log_entry.subtype = NVMF_SUBTYPE.NVME + log_entry.treq = NVMF_TREQ_SECURE_CHANNEL.NOT_REQUIRED + log_entry.port_id = log_entry_counter + log_entry.controller_id = 0xffff + log_entry.asqsz = 128 + # transport service indentifier + log_entry.trsvcid = (c_ubyte * 32)(*[c_ubyte(x) for x \ + in allow_listeners[log_entry_counter]["trsvcid"].encode()]) + log_entry.trsvcid[len(allow_listeners[log_entry_counter]["trsvcid"]):] = \ + [c_ubyte(0x20)] * (32 - len(allow_listeners[log_entry_counter]["trsvcid"])) + # NVM subsystem qualified name + log_entry.subnqn = (c_ubyte * 256)(*[c_ubyte(x) for x \ + in allow_listeners[log_entry_counter]["nqn"].encode()]) + log_entry.subnqn[len(allow_listeners[log_entry_counter]["nqn"]):] = \ + [c_ubyte(0x00)] * (256 - len(allow_listeners[log_entry_counter]["nqn"])) + # Transport address + log_entry.traddr = (c_ubyte * 256)(*[c_ubyte(x) for x \ + in allow_listeners[log_entry_counter]["traddr"].encode()]) + log_entry.traddr[len(allow_listeners[log_entry_counter]["traddr"]):] = \ + [c_ubyte(0x20)] * (256 - len(allow_listeners[log_entry_counter]["traddr"])) + + self_conn.log_page[1024*(log_entry_counter+1): \ + 1024*(log_entry_counter+2)] = bytes(log_entry) + log_entry_counter += 1 + else: + self.logger.debug("in the process of sending log pages...") + + reply = b'' + pdu_and_nvme_pdu_len = 8 + 16 + pdu_reply = Pdu() + pdu_reply.type = NVME_TCP_PDU.C2H_DATA + pdu_reply.specical_flag = 0x0c + pdu_reply.header_length = pdu_and_nvme_pdu_len + pdu_reply.data_offset = pdu_and_nvme_pdu_len + pdu_reply.packet_length = pdu_and_nvme_pdu_len + nvme_data_len + + nvme_tcp_data_pdu = NVMeTcpDataPdu() + nvme_tcp_data_pdu.cmd_id = cmd_id + nvme_tcp_data_pdu.data_length = nvme_data_len + + # reply based on the received get log page request packet(length) + if nvme_data_len == 16: + # nvme cli version: 1.x + nvme_get_log_page_reply = NVMeGetLogPage() + nvme_get_log_page_reply.genctr = self_conn.gen_cnt + nvme_get_log_page_reply.numrec = len(listeners) + + reply = bytes(pdu_reply) + bytes(nvme_tcp_data_pdu) + \ + bytes(nvme_get_log_page_reply)[:16] + elif nvme_data_len == 1024 and nvme_logpage_offset == 0: + # nvme cli version: 2.x + nvme_get_log_page_reply = NVMeGetLogPage() + nvme_get_log_page_reply.genctr = self_conn.gen_cnt + nvme_get_log_page_reply.numrec = len(listeners) + + reply = bytes(pdu_reply) + bytes(nvme_tcp_data_pdu) + \ + bytes(nvme_get_log_page_reply) + elif nvme_data_len % 1024 == 0: + # reply log pages + reply = bytes(pdu_reply) + bytes(nvme_tcp_data_pdu) + \ + self_conn.log_page[nvme_logpage_offset:nvme_logpage_offset+nvme_data_len] + self_conn.unsent_log_page_len -= nvme_data_len + if self_conn.unsent_log_page_len == 0: + self_conn.log_page = b'' + self_conn.property_set = False + self_conn.allow_listeners = [] + else: + self.logger.error("lenghth error. It need to be 16 or n*1024") + return -1 + try: + conn.sendall(reply) + except BrokenPipeError: + self.logger.error("client disconnected unexpectedly.") + return -1 + self.logger.debug("reply get log page request.") + return 0 + + def reply_keep_alive(self, conn, data, cmd_id): + """Reply keep alive request.""" + + self.logger.debug("handle keep alive request.") + self_conn = self.conn_vals[conn.fileno()] + self_conn.keep_alive_time = time.time() + nvme_sgl = struct.unpack_from('<16B', data, 32) + nvme_sgl_desc_type = nvme_sgl[15] & 0xF0 + nvme_sgl_desc_sub_type = nvme_sgl[15] & 0x0F + nvme_keep_alive_dword10 = struct.unpack_from('= \ + self.conn_vals[key].keep_alive_timeout / 1000: + self.logger.debug(f"discover request from {self.conn_vals[key].connection} timeout.") + self.selector.unregister(self.conn_vals[key].connection) + self.conn_vals[key].connection.close() + del self.conn_vals[key] + time.sleep(0.1) + + def reply_fabric_request(self, conn, data, cmd_id): + """Reply fabric request.""" + + fabric_type = struct.unpack_from(' SQ_HEAD_MAX: + self_conn.sq_head_ptr = 1 + + if NVME_TCP_PDU(pdu_type) == NVME_TCP_PDU.ICREQ: + err = self.reply_initialize(conn) + + elif NVME_TCP_PDU(pdu_type) == NVME_TCP_PDU.CMD: + CMD1 = struct.unpack_from('