From 42e5ba0b38b8894dad64833cd93198601582d2df Mon Sep 17 00:00:00 2001 From: zxJin-x Date: Thu, 13 Jan 2022 15:54:36 +0800 Subject: [PATCH] display pulsar name policies --- lib/api/pulsar/pulsar_namespace_api.dart | 141 +++++++++ lib/module/pulsar/pulsar_namespace.dart | 107 +++++++ .../widget/pulsar_namespace_policies.dart | 270 +++++++++++++++++- .../pulsar_namespace_policies_view_model.dart | 224 +++++++++++++++ 4 files changed, 741 insertions(+), 1 deletion(-) diff --git a/lib/api/pulsar/pulsar_namespace_api.dart b/lib/api/pulsar/pulsar_namespace_api.dart index 537e2d2..272923f 100644 --- a/lib/api/pulsar/pulsar_namespace_api.dart +++ b/lib/api/pulsar/pulsar_namespace_api.dart @@ -66,4 +66,145 @@ class PulsarNamespaceApi { throw Exception('ErrorCode is ${response.statusCode}, body is ${response.body}'); } } + + static Future getPolicy(String host, int port, String tenant, String namespace) async { + String url = 'http://$host:${port.toString()}/admin/v2/namespaces/$tenant/$namespace'; + var response = + await http.get(Uri.parse(url), headers: {'Content-Type': 'application/json; charset=utf-8'}); + if (HttpUtil.abnormal(response.statusCode)) { + log('ErrorCode is ${response.statusCode}, body is ${response.body}'); + throw Exception('ErrorCode is ${response.statusCode}, body is ${response.body}'); + } + Map jsonResponse = json.decode(response.body) as Map; + return PolicyResp.fromJson(jsonResponse); + } + + static Future setAutoTopicCreation(String host, int port, String tenant, String namespace, + bool? allowAutoTopicCreation, String? topicType, int? defaultNumPartitions) async { + String url = 'http://$host:${port.toString()}/admin/v2/namespaces/$tenant/$namespace/autoTopicCreation'; + TopicAutoCreateReq topicAutoCreateReq = + new TopicAutoCreateReq(allowAutoTopicCreation, topicType, defaultNumPartitions); + var response = await http.post(Uri.parse(url), + headers: { + 'Content-Type': 'application/json; charset=UTF-8', + }, + body: json.encode(topicAutoCreateReq)); + if (HttpUtil.abnormal(response.statusCode)) { + log('ErrorCode is ${response.statusCode}, body is ${response.body}'); + throw Exception('ErrorCode is ${response.statusCode}, body is ${response.body}'); + } + } + + static Future setMessageTTLSecond( + String host, int port, String tenant, String namespace, int? messageTTLSecond) async { + String url = 'http://$host:${port.toString()}/admin/v2/namespaces/$tenant/$namespace/messageTTL'; + var response = await http.post(Uri.parse(url), + headers: { + 'Content-Type': 'application/json; charset=UTF-8', + }, + body: messageTTLSecond.toString()); + if (HttpUtil.abnormal(response.statusCode)) { + log('ErrorCode is ${response.statusCode}, body is ${response.body}'); + throw Exception('ErrorCode is ${response.statusCode}, body is ${response.body}'); + } + } + + static Future setMaxProducersPerTopic( + String host, int port, String tenant, String namespace, int? maxProducersPerTopic) async { + String url = 'http://$host:${port.toString()}/admin/v2/namespaces/$tenant/$namespace/maxProducersPerTopic'; + var response = await http.post(Uri.parse(url), + headers: { + 'Content-Type': 'application/json; charset=UTF-8', + }, + body: maxProducersPerTopic.toString()); + if (HttpUtil.abnormal(response.statusCode)) { + log('ErrorCode is ${response.statusCode}, body is ${response.body}'); + throw Exception('ErrorCode is ${response.statusCode}, body is ${response.body}'); + } + } + + static Future setMaxConsumersPerTopic( + String host, int port, String tenant, String namespace, int? maxConsumersPerTopic) async { + String url = 'http://$host:${port.toString()}/admin/v2/namespaces/$tenant/$namespace/maxConsumersPerTopic'; + var response = await http.post(Uri.parse(url), + headers: { + 'Content-Type': 'application/json; charset=UTF-8', + }, + body: maxConsumersPerTopic.toString()); + if (HttpUtil.abnormal(response.statusCode)) { + log('ErrorCode is ${response.statusCode}, body is ${response.body}'); + throw Exception('ErrorCode is ${response.statusCode}, body is ${response.body}'); + } + } + + static Future setMaxConsumersPerSubscription( + String host, int port, String tenant, String namespace, int? maxConsumersPerSubscription) async { + String url = 'http://$host:${port.toString()}/admin/v2/namespaces/$tenant/$namespace/maxConsumersPerSubscription'; + var response = await http.post(Uri.parse(url), + headers: { + 'Content-Type': 'application/json; charset=UTF-8', + }, + body: maxConsumersPerSubscription.toString()); + if (HttpUtil.abnormal(response.statusCode)) { + log('ErrorCode is ${response.statusCode}, body is ${response.body}'); + throw Exception('ErrorCode is ${response.statusCode}, body is ${response.body}'); + } + } + + static Future setMaxUnackedMessagesPerConsumer( + String host, int port, String tenant, String namespace, int? maxUnackedMessagesPerConsumer) async { + String url = 'http://$host:${port.toString()}/admin/v2/namespaces/$tenant/$namespace/maxUnackedMessagesPerConsumer'; + var response = await http.post(Uri.parse(url), + headers: { + 'Content-Type': 'application/json; charset=UTF-8', + }, + body: maxUnackedMessagesPerConsumer.toString()); + if (HttpUtil.abnormal(response.statusCode)) { + log('ErrorCode is ${response.statusCode}, body is ${response.body}'); + throw Exception('ErrorCode is ${response.statusCode}, body is ${response.body}'); + } + } + + static Future setMaxUnackedMessagesPerSubscription( + String host, int port, String tenant, String namespace, int? maxUnackedMessagesPerSubscription) async { + String url = + 'http://$host:${port.toString()}/admin/v2/namespaces/$tenant/$namespace/maxUnackedMessagesPerSubscription'; + var response = await http.post(Uri.parse(url), + headers: { + 'Content-Type': 'application/json; charset=UTF-8', + }, + body: maxUnackedMessagesPerSubscription.toString()); + if (HttpUtil.abnormal(response.statusCode)) { + log('ErrorCode is ${response.statusCode}, body is ${response.body}'); + throw Exception('ErrorCode is ${response.statusCode}, body is ${response.body}'); + } + } + + static Future setMaxSubscriptionsPerTopic( + String host, int port, String tenant, String namespace, int? maxSubscriptionsPerTopic) async { + String url = 'http://$host:${port.toString()}/admin/v2/namespaces/$tenant/$namespace/maxSubscriptionsPerTopic'; + var response = await http.post(Uri.parse(url), + headers: { + 'Content-Type': 'application/json; charset=UTF-8', + }, + body: maxSubscriptionsPerTopic.toString()); + if (HttpUtil.abnormal(response.statusCode)) { + log('ErrorCode is ${response.statusCode}, body is ${response.body}'); + throw Exception('ErrorCode is ${response.statusCode}, body is ${response.body}'); + } + } + + static Future setMaxTopicsPerNamespace( + String host, int port, String tenant, String namespace, int? maxTopicsPerNamespace) async { + String url = 'http://$host:${port.toString()}/admin/v2/namespaces/$tenant/$namespace/maxTopicsPerNamespace'; + var response = await http.post(Uri.parse(url), + headers: { + 'Content-Type': 'application/json; charset=UTF-8', + }, + body: maxTopicsPerNamespace.toString()); + if (HttpUtil.abnormal(response.statusCode)) { + log('ErrorCode is ${response.statusCode}, body is ${response.body}'); + throw Exception('ErrorCode is ${response.statusCode}, body is ${response.body}'); + } + } } diff --git a/lib/module/pulsar/pulsar_namespace.dart b/lib/module/pulsar/pulsar_namespace.dart index b3314f5..1ef6180 100644 --- a/lib/module/pulsar/pulsar_namespace.dart +++ b/lib/module/pulsar/pulsar_namespace.dart @@ -31,6 +31,36 @@ class BacklogQuotaReq { } } +class TopicAutoCreateReq { + final bool? allowAutoTopicCreation; + final String? topicType; + final int? defaultNumPartitions; + + TopicAutoCreateReq(this.allowAutoTopicCreation, this.topicType, this.defaultNumPartitions); + + Map toJson() { + Map map = new Map(); + map["allowAutoTopicCreation"] = this.allowAutoTopicCreation; + if (topicType != null) { + map["topicType"] = this.topicType!; + } + map["defaultNumPartitions"] = this.defaultNumPartitions; + return map; + } +} + +class MaxProducersPerTopicReq { + int? maxProducersPerTopic; + + MaxProducersPerTopicReq(this.maxProducersPerTopic); + + Map toJson() { + Map map = new Map(); + map["maxProducersPerTopic"] = this.maxProducersPerTopic; + return map; + } +} + class BacklogQuotaResp { final int? limitSize; final int? limitTime; @@ -42,3 +72,80 @@ class BacklogQuotaResp { return BacklogQuotaResp(map["limitSize"], map["limitTime"], map["policy"]); } } + +class PolicyResp { + final bool? isAllowAutoTopicCreation; + final String? topicType; + final int? defaultNumPartitions; + final List? boundaries; + final int numBundles; + final int? messageTTLInSeconds; + final int? maxProducersPerTopic; + final int? maxConsumersPerTopic; + final int? maxConsumersPerSubscription; + final int? maxUnackedMessagesPerConsumer; + final int? maxUnackedMessagesPerSubscription; + final int? maxSubscriptionsPerTopic; + final int? maxTopicsPerNamespace; + + PolicyResp( + this.isAllowAutoTopicCreation, + this.topicType, + this.defaultNumPartitions, + this.boundaries, + this.numBundles, + this.messageTTLInSeconds, + this.maxProducersPerTopic, + this.maxConsumersPerTopic, + this.maxConsumersPerSubscription, + this.maxUnackedMessagesPerConsumer, + this.maxUnackedMessagesPerSubscription, + this.maxSubscriptionsPerTopic, + this.maxTopicsPerNamespace); + + factory PolicyResp.fromJson(Map map) { + var autoTopicCreate = map["autoTopicCreationOverride"]; + var isAllowAutoTopicCreation; + var topicType; + var defaultNumPartitions; + if (autoTopicCreate != null) { + isAllowAutoTopicCreation = autoTopicCreate["allowAutoTopicCreation"]; + topicType = autoTopicCreate["topicType"]; + defaultNumPartitions = autoTopicCreate["defaultNumPartitions"]; + } + var bundleData = map["bundles"]; + var boundaries; + var numBundles; + if (bundleData != null) { + boundaries = bundleData["boundaries"]; + numBundles = bundleData["numBundles"]; + } + return PolicyResp( + isAllowAutoTopicCreation, + topicType, + defaultNumPartitions, + boundaries, + numBundles, + map["message_ttl_in_seconds"], + map["max_producers_per_topic"], + map["max_consumers_per_topic"], + map["max_consumers_per_subscription"], + map["max_unacked_messages_per_consumer"], + map["max_unacked_messages_per_subscription"], + map["max_subscriptions_per_topic"], + map["max_topics_per_namespace"]); + } +} + +class AutoTopicCreation { + bool? isAllowAutoTopicCreation; + String? topicType; + int? defaultNumPartitions; + + AutoTopicCreation(this.isAllowAutoTopicCreation, this.topicType, this.defaultNumPartitions); +} + +class BundlesData { + List? boundaries; + int? numBundles; +} diff --git a/lib/ui/pulsar/widget/pulsar_namespace_policies.dart b/lib/ui/pulsar/widget/pulsar_namespace_policies.dart index a5a1db3..4875eb5 100644 --- a/lib/ui/pulsar/widget/pulsar_namespace_policies.dart +++ b/lib/ui/pulsar/widget/pulsar_namespace_policies.dart @@ -19,6 +19,7 @@ class PulsarNamespacePoliciesWidgetState extends State(context, listen: false); + vm.fetchPolicy(); } @override @@ -35,7 +36,21 @@ class PulsarNamespacePoliciesWidgetState extends State[ Container( @@ -46,6 +61,259 @@ class PulsarNamespacePoliciesWidgetState extends State? boundaries; + int? numBundles; + int? messageTTLInSeconds; + int? maxProducersPerTopic; + int? maxConsumersPerTopic; + int? maxConsumersPerSubscription; + int? maxUnackedMessagesPerConsumer; + int? maxUnackedMessagesPerSubscription; + int? maxSubscriptionsPerTopic; + int? maxTopicsPerNamespace; PulsarNamespacePoliciesViewModel(this.pulsarInstancePo, this.tenantResp, this.namespaceResp); @@ -15,6 +29,26 @@ class PulsarNamespacePoliciesViewModel extends BaseLoadViewModel { pulsarInstancePo.deepCopy(), tenantResp.deepCopy(), namespaceResp.deepCopy()); } + Future fetchPolicy() async { + try { + final PolicyResp resp = await PulsarNamespaceApi.getPolicy(host, port, tenant, namespace); + this.isAllowAutoTopicCreation = resp.isAllowAutoTopicCreation; + this.messageTTLInSeconds = resp.messageTTLInSeconds; + this.maxProducersPerTopic = resp.maxProducersPerTopic; + this.maxConsumersPerTopic = resp.maxConsumersPerTopic; + this.maxConsumersPerSubscription = resp.maxConsumersPerSubscription; + this.maxUnackedMessagesPerConsumer = resp.maxUnackedMessagesPerConsumer; + this.maxUnackedMessagesPerSubscription = resp.maxUnackedMessagesPerSubscription; + this.maxSubscriptionsPerTopic = resp.maxSubscriptionsPerTopic; + this.maxTopicsPerNamespace = resp.maxTopicsPerNamespace; + loadSuccess(); + } on Exception catch (e) { + loadException = e; + loading = false; + } + notifyListeners(); + } + int get id { return this.pulsarInstancePo.id; } @@ -38,4 +72,194 @@ class PulsarNamespacePoliciesViewModel extends BaseLoadViewModel { String get namespace { return this.namespaceResp.namespace; } + + String get isAllowAutoTopicCreateDisplayStr { + if (this.isAllowAutoTopicCreation == null) { + return "unset"; + } + return this.isAllowAutoTopicCreation.toString(); + } + + String get messageTTLDisplayStr { + if (this.messageTTLInSeconds == null) { + return "unset"; + } + return this.messageTTLInSeconds.toString(); + } + + String get maxProducersPerTopicDisplayStr { + if (this.maxProducersPerTopic == null) { + return "unset"; + } + return this.maxProducersPerTopic.toString(); + } + + String get maxConsumersPerTopicDisplayStr { + if (this.maxConsumersPerTopic == null) { + return "unset"; + } + return this.maxConsumersPerTopic.toString(); + } + + String get maxConsumersPerSubscriptionDisplayStr { + if (this.maxConsumersPerSubscription == null) { + return "unset"; + } + return this.maxConsumersPerSubscription.toString(); + } + + String get maxUnackedMessagesPerConsumerDisplayStr { + if (this.maxUnackedMessagesPerConsumer == null) { + return "unset"; + } + return this.maxUnackedMessagesPerConsumer.toString(); + } + + String get maxUnackedMessagesPerSubscriptionDisplayStr { + if (this.maxUnackedMessagesPerSubscription == null) { + return "unset"; + } + return this.maxUnackedMessagesPerSubscription.toString(); + } + + String get maxSubscriptionsPerTopicDisplayStr { + if (this.maxSubscriptionsPerTopic == null) { + return "unset"; + } + return this.maxSubscriptionsPerTopic.toString(); + } + + String get maxTopicsPerNamespaceDisplayStr { + if (this.maxTopicsPerNamespace == null) { + return "unset"; + } + return this.maxTopicsPerNamespace.toString(); + } + + Future updateAutoTopicCreate() async { + try { + if (isAllowAutoTopicCreation == null) { + return; + } + if (topicType == null) { + return; + } + if (defaultNumPartitions == null) { + return; + } + await PulsarNamespaceApi.setAutoTopicCreation( + host, port, tenant, namespace, isAllowAutoTopicCreation, topicType, defaultNumPartitions); + await fetchPolicy(); + } on Exception catch (e) { + opException = e; + notifyListeners(); + } + } + + Future setMessageTTLSecond() async { + try { + if (messageTTLInSeconds == null) { + return; + } + await PulsarNamespaceApi.setMessageTTLSecond(host, port, tenant, namespace, messageTTLInSeconds); + await fetchPolicy(); + } on Exception catch (e) { + opException = e; + notifyListeners(); + } + } + + Future setMaxProducersPerTopic() async { + try { + if (maxProducersPerTopic == null) { + return; + } + await PulsarNamespaceApi.setMaxProducersPerTopic(host, port, tenant, namespace, maxProducersPerTopic); + await fetchPolicy(); + } on Exception catch (e) { + opException = e; + notifyListeners(); + } + } + + Future setMaxConsumersPerTopic() async { + try { + if (maxConsumersPerTopic == null) { + return; + } + await PulsarNamespaceApi.setMaxConsumersPerTopic(host, port, tenant, namespace, maxConsumersPerTopic); + await fetchPolicy(); + } on Exception catch (e) { + opException = e; + notifyListeners(); + } + } + + Future setMaxConsumersPerSubscription() async { + try { + if (maxConsumersPerSubscription == null) { + return; + } + await PulsarNamespaceApi.setMaxConsumersPerSubscription( + host, port, tenant, namespace, maxConsumersPerSubscription); + await fetchPolicy(); + } on Exception catch (e) { + opException = e; + notifyListeners(); + } + } + + Future setMaxUnackedMessagesPerConsumer() async { + try { + if (maxUnackedMessagesPerConsumer == null) { + return; + } + await PulsarNamespaceApi.setMaxUnackedMessagesPerConsumer( + host, port, tenant, namespace, maxUnackedMessagesPerConsumer); + await fetchPolicy(); + } on Exception catch (e) { + opException = e; + notifyListeners(); + } + } + + Future setMaxUnackedMessagesPerSubscription() async { + try { + if (maxUnackedMessagesPerSubscription == null) { + return; + } + await PulsarNamespaceApi.setMaxUnackedMessagesPerSubscription( + host, port, tenant, namespace, maxUnackedMessagesPerSubscription); + await fetchPolicy(); + } on Exception catch (e) { + opException = e; + notifyListeners(); + } + } + + Future setMaxSubscriptionsPerTopic() async { + try { + if (maxSubscriptionsPerTopic == null) { + return; + } + await PulsarNamespaceApi.setMaxSubscriptionsPerTopic(host, port, tenant, namespace, maxSubscriptionsPerTopic); + await fetchPolicy(); + } on Exception catch (e) { + opException = e; + notifyListeners(); + } + } + + Future setMaxTopicsPerNamespace() async { + try { + if (maxTopicsPerNamespace == null) { + return; + } + await PulsarNamespaceApi.setMaxTopicsPerNamespace(host, port, tenant, namespace, maxTopicsPerNamespace); + await fetchPolicy(); + } on Exception catch (e) { + opException = e; + notifyListeners(); + } + } }