From a18bb9c19b25e9873dad0a70f863e3ca5ab9d17b Mon Sep 17 00:00:00 2001 From: Yin Congmin Date: Tue, 3 Jan 2023 16:51:04 +0800 Subject: [PATCH] control/discovery: add discovery controller The discovery contrller implement the basic function. Use command "python3 -m control.discovery" to start discovery controller. Client can use command "nvme discover -t tcp -a ip -s port" to get log pages. The configuration is in ceph-nvmeof.conf [discovery] part. feature: https://github.com/ceph/ceph-nvmeof/issues/108 Signed-off-by: Yin Congmin --- ceph-nvmeof.conf | 5 + control/discovery.py | 957 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 962 insertions(+) create mode 100644 control/discovery.py diff --git a/ceph-nvmeof.conf b/ceph-nvmeof.conf index c40f065e..e32bcaff 100644 --- a/ceph-nvmeof.conf +++ b/ceph-nvmeof.conf @@ -17,6 +17,11 @@ enable_auth = False state_update_notify = True state_update_interval_sec = 5 +[discovery] + +addr = +port = 8009 + [ceph] pool = rbd diff --git a/control/discovery.py b/control/discovery.py new file mode 100644 index 00000000..2871156e --- /dev/null +++ b/control/discovery.py @@ -0,0 +1,957 @@ +# +# Copyright (c) 2021 International Business Machines +# All rights reserved. +# +# SPDX-License-Identifier: LGPL-3.0-or-later +# +# Authors: congmin.yin@intel.com +# + +import argparse +import grpc +import json +import logging +from .config import GatewayConfig + +import rados +from typing import Dict, Optional + +import socket +import threading +import time +import enum +import struct + +# NVMe tcp pdu type +class NVME_TCP_PDU(enum.IntFlag): + ICREQ = 0x0 + ICRESP = 0x1 + H2C_TERM = 0x2 + C2H_TERM = 0x3 + CMD = 0x4 + RSP = 0x5 + H2C_DATA = 0x6 + C2H_DATA = 0x7 + TCP_R2T = 0x9 + +# NVMe tcp opcode +class NVME_TCP_OPC(enum.IntFlag): + DELETE_SQ = 0x0 + CREATE_SQ = 0x1 + GET_LOG_PAGE = 0x2 + DELETE_CQ = 0x4 + CREATE_CQ = 0x5 + IDENTIFY = 0x6 + ABORT = 0x8 + SET_FEATURES = 0x9 + GET_FEATURES = 0xa + ASYNC_EVE_REQ = 0xc + NS_MGMT = 0xd + FW_COMMIT = 0x10 + FW_IMG_DOWNLOAD = 0x11 + NS_ATTACH = 0x15 + KEEP_ALIVE = 0x18 + FABRIC_TYPE = 0x7F + +# NVMe tcp fabric command type +class NVME_TCP_FCTYPE(enum.IntFlag): + PROP_SET = 0x0 + CONNECT = 0x1 + PROP_GET = 0x4 + AUTH_SEND = 0x5 + AUTH_RECV = 0x6 + DISCONNECT = 0x8 + +# NVMe controller register space offsets +class NVME_CTL(enum.IntFlag): + CAPABILITIES = 0x0 + VERSION = 0x08 + CONFIGURATION = 0x14 + STATUS = 0x1c + + +# NVM subsystem types +class NVMF_SUBTYPE(enum.IntFlag): + # Discovery type for NVM subsystem + DISCOVERY = 0x1 + # NVMe type for NVM subsystem + NVME = 0x2 + +# NVMe over Fabrics transport types +class NVMF_TRTYPE(enum.IntFlag): + RDMA = 0x1 + FC = 0x2 + TCP = 0x3 + INTRA_HOST = 0xfe + +# Address family types +class NVMF_ADRFAM(enum.IntFlag): + IPV4 = 0x1 + IPV6 = 0x2 + IB = 0x3 + FC = 0x4 + INTRA_HOST = 0xfe + +# Transport requirement, secure channel requirements +# Connections shall be made over a fabric secure channel +class NVMF_TREQ_SECURE_CHANNEL(enum.IntFlag): + NOT_SPECIFIED = 0x0 + REQUIRED = 0x1 + NOT_REQUIRED = 0x2 + +# NVMe tcp package length, refer: MTU = 1500 bytes +NVME_TCP_PDU_UNIT = 1024 + +# Max SQ head pointer +SQ_HEAD_MAX = 128 + +lock = threading.Lock() + +# Global controller id +GLOBAL_CNLID = 0x1 + +# Global generation counter +GLOBAL_GEN_CNT = 0x1 + +class Pdu: + def __init__(self): + # PDU type + self.type = bytearray(1) + # PDU specical flag + self.specical_flag = bytearray(1) + # PDU header length + self.header_length = bytearray(1) + # PDU data offset + self.data_offset = bytearray(1) + # packet length + self.packet_length = bytearray(4) + + def compose_reply(self): + return self.type + self.specical_flag + self.header_length + \ + self.data_offset + self.packet_length + +class ICResp: + def __init__(self): + # pdu version format + self.version_format = bytearray(2) + # controller Pdu data alignment + self.data_alignment = bytearray(1) + # digest types enabled + self.digest_types = bytearray(1) + # Maximum data capsules per r2t supported + self.maximum_data_capsules = bytearray(4) + + def compose_reply(self): + return self.version_format + self.data_alignment + \ + self.digest_types + self.maximum_data_capsules + +class CqeConnect: + def __init__(self): + # controller id + self.controller_id = bytearray(2) + # authentication required + self.authentication = bytearray(2) + self.reserved = bytearray(4) + # SQ head pointer + self.sq_head_ptr = bytearray(2) + # SQ identifier + self.sq_id = bytearray(2) + # command identifier + self.cmd_id = bytearray(2) + # status field: 0 = successful completion + self.status = bytearray(2) + + def compose_reply(self): + return self.controller_id + self.authentication + self.reserved + \ + self.sq_head_ptr + self.sq_id + self.cmd_id + self.status + +class CqePropertyGetSet: + def __init__(self): + # property data for property get, reserved for property set + self.property_data = bytearray(8) + # SQ head pointer + self.sq_head_ptr = bytearray(2) + # SQ identifier + self.sq_id = bytearray(2) + # command identifier + self.cmd_id = bytearray(2) + # status field + self.status = bytearray(2) + def compose_reply(self): + return self.property_data + self.sq_head_ptr + self.sq_id + \ + self.cmd_id + self.status + +class NVMeTcpDataPdu: + def __init__(self): + # command id + self.cmd_id = bytearray(2) + # transfer tag + self.transfer_tag = bytearray(2) + # data offset + self.data_offset = bytearray(4) + # data length + self.data_length = bytearray(4) + # reserved + self.reserved = bytearray(4) + + def compose_reply(self): + return self.cmd_id + self.transfer_tag + \ + self.data_offset + self.data_length + self.reserved + +class NVMeIdentify: + def __init__(self): + # skip some fields, include VID, SSVID, SN, MN + self.todo_fields1 = bytearray(64) + # firmware revision + self.firmware_revision = bytearray(8) + # RAB, IEEE, CMIC + self.todo_fields2 = bytearray(5) + # maximum data transfer size + self.mdts = bytearray(1) + # controller id + self.controller_id = bytearray(2) + # version + self.version = bytearray(4) + # RTD3R, RTD3E + self.todo_fields3 = bytearray(8) + # optional asynchronous events supported + self.oaes = bytearray(4) + # CTRATT, RRLS, CNTRLTYPE, FGUID, NVMe Management Interface, OACS, ACL + self.todo_fields4 = bytearray(163) + # asynchronous events request limit + self.aerl = bytearray(1) + # firmware updates + self.firmware_updates = bytearray(1) + # log page attributes + self.lpa = bytearray(1) + # error log page entries(ELPE) + self.elpe = bytearray(1) + # NPSS, AVSCC, APSTA, WCTEMP, CCTEMP, MTFA, HMPRE, HMIN, TNVMCAP... + self.todo_fields5 = bytearray(251) + # maximum outstanding commands + self.maxcmd = bytearray(2) + # number of namespace, optional NVM command support + self.todo_fields6 = bytearray(6) + # fused operation support + self.fused_operation = bytearray(2) + # FNA, VWC, AWUN, AWUPF, NVSCC, NWPC + self.todo_fields7 = bytearray(8) + # atomic compare & write unit + self.acwu = bytearray(2) + self.reserved1 = bytearray(2) + # SGL support + self.sgls = bytearray(4) + # maxinum number of allowed namespaces + self.mnan = bytearray(4) + self.reserved2 = bytearray(224) + # NVM subsystem NVMe qualified name + self.subnqn = bytearray(256) + self.reserved3 = bytearray(768) + # NVMeOF attributes + self.nvmeof_attributes = bytearray(256) + # power state attributes + self.power_state_attributes = bytearray(1024) + # vendor specific + self.vendor_specific = bytearray(1024) + + def compose_reply(self): + return self.todo_fields1 + self.firmware_revision + \ + self.todo_fields2 + self.mdts + self.controller_id + \ + self.version + self.todo_fields3 + self.oaes + \ + self.todo_fields4 + self.aerl + self.firmware_updates + \ + self.lpa + self.elpe + self.todo_fields5 + self.maxcmd + \ + self.todo_fields6 + self.fused_operation + self.todo_fields7 + \ + self.acwu + self.reserved1 + self.sgls + self.mnan +\ + self.reserved2 + self.subnqn + self.reserved3 + \ + self.nvmeof_attributes + self.power_state_attributes + \ + self.vendor_specific + + +class CqeSetFeature: + def __init__(self): + # DWORD0 + self.dword0 = bytearray(4) + # DWORD1 + self.dword1 = bytearray(4) + # SQ head pointer + self.sq_head_ptr = bytearray(2) + # SQ identifier + self.sq_id = bytearray(2) + # command identifier + self.cmd_id = bytearray(2) + # status field + self.status = bytearray(2) + + def compose_reply(self): + return self.dword0 + self.dword1 + self.sq_head_ptr + \ + self.sq_id + self.cmd_id + self.status + +class NVMeGetLogPage: + def __init__(self): + # generation counter + self.genctr = bytearray(8) + # number of records + self.numrec = bytearray(8) + + #record format + self.recfmt = bytearray(2) + self.reserved = bytearray(1006) + + def compose_short_reply(self): + return self.genctr + self.numrec + + def compose_data_reply(self): + return self.genctr + self.numrec + self.recfmt + self.reserved + +class DiscoveryLogEntry: + def __init__(self): + # transport type + self.trtype = bytearray(1) + # adress family + self.adrfam = bytearray(1) + # subsystem type + self.subtype = bytearray(1) + # transport requirement + self.treq = bytearray(1) + # port ID + self.port_id = bytearray(2) + # controller ID + self.controller_id = bytearray(2) + # admin max SQ size + self.asqsz = bytearray(2) + self.reserved1 = bytearray(22) + # transport service indentifier + self.trsvcid = bytearray(32) + self.reserved2 = bytearray(192) + # NVM subsystem qualified name + self.subnqn = bytearray(256) + # Transport address(TRADDR) + self.traddr = bytearray(256) + # Transport specific address subtype + self.tsas = bytearray(256) + + def compose_reply(self): + return self.trtype + self.adrfam + self.subtype + \ + self.treq + self.port_id + self.controller_id + \ + self.asqsz + self.reserved1 + self.trsvcid + \ + self.reserved2 + self.subnqn + self.traddr + self.tsas + +class DiscoveryService: + """Implements discovery controller. + + Response discover request from initiator. + + Instance attributes: + version: Discovery controller version + config: Basic gateway parameters + logger: Logger instance to track discovery controller events + omap_name: OMAP object name + ioctx: I/O context which allows OMAP access + discovery_addr: Discovery controller addr which allows initiator send command + discovery_port: Discovery controller's listening port + """ + + BDEV_PREFIX = "bdev_" + NAMESPACE_PREFIX = "namespace_" + SUBSYSTEM_PREFIX = "subsystem_" + HOST_PREFIX = "host_" + LISTENER_PREFIX = "listener_" + + def __init__(self, config): + self.logger = logging.getLogger(__name__) + self.logger.setLevel(level=logging.DEBUG) + + self.version = 1 + self.config = config + + gateway_group = self.config.get("gateway", "group") + self.omap_name = f"nvmeof.{gateway_group}.state" if gateway_group else "nvmeof.state" + self.logger.info(f"omap_name: {self.omap_name}") + + ceph_pool = self.config.get("ceph", "pool") + ceph_conf = self.config.get("ceph", "config_file") + conn = rados.Rados(conffile=ceph_conf) + conn.connect() + self.ioctx = conn.open_ioctx(ceph_pool) + + self.discovery_addr = self.config.get("discovery", "addr") + self.discovery_port = self.config.get("discovery", "port") + if self.discovery_addr == '' or self.discovery_port == '': + self.logger.error(f"discovery addr/port are empty.") + assert 0 + self.logger.info(f"discovery addr: {self.discovery_addr} port: {self.discovery_port}") + + def _read_all(self) -> Dict[str, str]: + """Reads OMAP and returns dict of all keys and values.""" + + with rados.ReadOpCtx() as read_op: + iter, _ = self.ioctx.get_omap_vals(read_op, "", "", -1) + self.ioctx.operate_read_op(read_op, self.omap_name) + omap_dict = dict(iter) + return omap_dict + + def _get_vals(self, omap_dict, prefix): + """Read bdevs/subsystems/namespaces/hosts/listeners from the OMAP dict.""" + + vals = [] + for (key, val) in omap_dict.items(): + if key.startswith(prefix): + val_text = val.decode('utf-8') + js = json.loads(val_text) + vals.append(js) + return vals + + def reply_initialize(self, sock): + """Reply initialize request.""" + + self.logger.debug("handle ICreq.") + + pdu_reply = Pdu() + pdu_reply.type = bytearray([NVME_TCP_PDU.ICRESP]) + pdu_reply.header_length = b'\x80' + pdu_reply.packet_length = b'\x80\x00\x00\x00' + + icresp_reply = ICResp() + # Maximum data capsules per r2t supported: 131072 + icresp_reply.maximum_data_capsules = b'\x00\x00\x02\x00' + + reply = pdu_reply.compose_reply() + icresp_reply.compose_reply() + bytearray(112) + + try: + sock.sendall(reply) + except BrokenPipeError: + self.logger.error("client disconnected unexpectedly.") + return -1 + self.logger.debug("reply initialize connection request.") + return 0 + + def reply_fc_cmd_connect(self, sock, data, self_cnlid, sq_head_ptr): + """Reply connect request.""" + + self.logger.debug("handle connect request.") + hf_nvmeof_cmd_connect_rsvd1 = struct.unpack_from('<19B', data, 13) + SIGL1 = struct.unpack_from('> 6) & 0x3 + + pdu_reply = Pdu() + pdu_reply.type = bytearray([NVME_TCP_PDU.RSP]) + pdu_reply.header_length = b'\x18' + pdu_reply.packet_length = b'\x18\x00\x00\x00' + + # Cqe for cmd property set + # property set only support controller configruration request + property_set = CqePropertyGetSet() + if NVME_CTL(nvmeof_prop_get_set_offset) == NVME_CTL.CONFIGURATION: + property_set.sq_head_ptr = struct.pack('> 8) & 0x1F + get_logpage_lsi = nvme_get_logpage_dword11 >> 16 + get_logpage_uid_idx = nvme_get_logpage_dword14 & 0x3F + + if get_logpage_lid != 0x70: + self.logger.error(f"request type error, not discovery request.") + return -1, 0, 0 + + # Prepare all log page data segments + # TODO: Filter log entries based on access permissions + if log_page_len == 0 and nvme_data_len > 16: + log_page_len = 1024 * (len(listeners) + 1) + log_page = bytearray(log_page_len) + + nvme_get_log_page_reply = NVMeGetLogPage() + nvme_get_log_page_reply.genctr = struct.pack(' 16 and nvme_data_len % 1024 == 0: + # class Pdu and NVMeTcpDataPdu + pdu_and_nvme_pdu_len = 8 + 16 + + pdu_reply = Pdu() + pdu_reply.type = bytearray([NVME_TCP_PDU.C2H_DATA]) + pdu_reply.specical_flag = b'\x0c' + pdu_reply.header_length = b'\x18' + pdu_reply.data_offset = b'\x18' + pdu_reply.packet_length = struct.pack(' SQ_HEAD_MAX: + sq_head_ptr = 1 + head = sock.recv(NVME_TCP_PDU_UNIT) + if not head: + time.sleep(0.01) + break + + PDU = struct.unpack_from(' NVME_TCP_PDU_UNIT: + i = package_len // NVME_TCP_PDU_UNIT + if package_len % NVME_TCP_PDU_UNIT == 0: + i -= 1 + while i > 0: + d = sock.recv(NVME_TCP_PDU_UNIT) + if d: + buffer.append(d) + i -= 1 + else: + break + data = b''.join(buffer) + + # ICreq + if NVME_TCP_PDU(pdu_type) == NVME_TCP_PDU.ICREQ: + err = self.reply_initialize(sock) + + # CMD + elif NVME_TCP_PDU(pdu_type) == NVME_TCP_PDU.CMD: + CMD1 = struct.unpack_from('