Skip to content

Commit

Permalink
display pulsar name policies
Browse files Browse the repository at this point in the history
  • Loading branch information
tyluffy committed Jan 13, 2022
1 parent acdaba0 commit 8848d61
Show file tree
Hide file tree
Showing 6 changed files with 718 additions and 1 deletion.
132 changes: 132 additions & 0 deletions lib/api/pulsar/pulsar_namespace_api.dart
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,138 @@ class PulsarNamespaceApi {
return BacklogQuotaResp.fromJson(destinationStorageResp);
}

static Future<PolicyResp> getPolicy(String host, int port, String tenant, String namespace) async {
String url = 'http://$host:${port.toString()}/admin/v2/namespaces/$tenant/$namespace';
var response =
await http.get(Uri.parse(url), headers: <String, String>{'Content-Type': 'application/json; charset=utf-8'});
if (HttpUtil.abnormal(response.statusCode)) {
log('ErrorCode is ${response.statusCode}, body is ${response.body}');
throw Exception('ErrorCode is ${response.statusCode}, body is ${response.body}');
}
Map jsonResponse = json.decode(response.body) as Map;
return PolicyResp.fromJson(jsonResponse);
}

static Future<void> setAutoTopicCreation(String host, int port, String tenant, String namespace,
bool? allowAutoTopicCreation, String? topicType, int? defaultNumPartitions) async {
String url = 'http://$host:${port.toString()}/admin/v2/namespaces/$tenant/$namespace/autoTopicCreation';
TopicAutoCreateReq topicAutoCreateReq =
new TopicAutoCreateReq(allowAutoTopicCreation, topicType, defaultNumPartitions);
var response = await http.post(Uri.parse(url),
headers: <String, String>{
'Content-Type': 'application/json; charset=UTF-8',
},
body: json.encode(topicAutoCreateReq));
if (HttpUtil.abnormal(response.statusCode)) {
log('ErrorCode is ${response.statusCode}, body is ${response.body}');
throw Exception('ErrorCode is ${response.statusCode}, body is ${response.body}');
}
}

static Future<void> setMessageTTLSecond(String host, int port, String tenant, String namespace, int? messageTTLSecond) async {
String url = 'http://$host:${port.toString()}/admin/v2/namespaces/$tenant/$namespace/messageTTL';
var response = await http.post(Uri.parse(url),
headers: <String, String>{
'Content-Type': 'application/json; charset=UTF-8',
},
body: messageTTLSecond.toString());
if (HttpUtil.abnormal(response.statusCode)) {
log('ErrorCode is ${response.statusCode}, body is ${response.body}');
throw Exception('ErrorCode is ${response.statusCode}, body is ${response.body}');
}
}

static Future<void> setMaxProducersPerTopic(String host, int port, String tenant, String namespace, int? maxProducersPerTopic) async {
String url = 'http://$host:${port.toString()}/admin/v2/namespaces/$tenant/$namespace/maxProducersPerTopic';
var response = await http.post(Uri.parse(url),
headers: <String, String>{
'Content-Type': 'application/json; charset=UTF-8',
},
body: maxProducersPerTopic.toString());
if (HttpUtil.abnormal(response.statusCode)) {
log('ErrorCode is ${response.statusCode}, body is ${response.body}');
throw Exception('ErrorCode is ${response.statusCode}, body is ${response.body}');
}
}

static Future<void> setMaxConsumersPerTopic(String host, int port, String tenant, String namespace, int? maxConsumersPerTopic) async {
String url = 'http://$host:${port.toString()}/admin/v2/namespaces/$tenant/$namespace/maxConsumersPerTopic';
var response = await http.post(Uri.parse(url),
headers: <String, String>{
'Content-Type': 'application/json; charset=UTF-8',
},
body: maxConsumersPerTopic.toString());
if (HttpUtil.abnormal(response.statusCode)) {
log('ErrorCode is ${response.statusCode}, body is ${response.body}');
throw Exception('ErrorCode is ${response.statusCode}, body is ${response.body}');
}
}

static Future<void> setMaxConsumersPerSubscription(String host, int port, String tenant, String namespace, int? maxConsumersPerSubscription) async {
String url = 'http://$host:${port.toString()}/admin/v2/namespaces/$tenant/$namespace/maxConsumersPerSubscription';
var response = await http.post(Uri.parse(url),
headers: <String, String>{
'Content-Type': 'application/json; charset=UTF-8',
},
body: maxConsumersPerSubscription.toString());
if (HttpUtil.abnormal(response.statusCode)) {
log('ErrorCode is ${response.statusCode}, body is ${response.body}');
throw Exception('ErrorCode is ${response.statusCode}, body is ${response.body}');
}
}

static Future<void> setMaxUnackedMessagesPerConsumer(String host, int port, String tenant, String namespace, int? maxUnackedMessagesPerConsumer) async {
String url = 'http://$host:${port.toString()}/admin/v2/namespaces/$tenant/$namespace/maxUnackedMessagesPerConsumer';
var response = await http.post(Uri.parse(url),
headers: <String, String>{
'Content-Type': 'application/json; charset=UTF-8',
},
body: maxUnackedMessagesPerConsumer.toString());
if (HttpUtil.abnormal(response.statusCode)) {
log('ErrorCode is ${response.statusCode}, body is ${response.body}');
throw Exception('ErrorCode is ${response.statusCode}, body is ${response.body}');
}
}

static Future<void> setMaxUnackedMessagesPerSubscription(String host, int port, String tenant, String namespace, int? maxUnackedMessagesPerSubscription) async {
String url = 'http://$host:${port.toString()}/admin/v2/namespaces/$tenant/$namespace/maxUnackedMessagesPerSubscription';
var response = await http.post(Uri.parse(url),
headers: <String, String>{
'Content-Type': 'application/json; charset=UTF-8',
},
body: maxUnackedMessagesPerSubscription.toString());
if (HttpUtil.abnormal(response.statusCode)) {
log('ErrorCode is ${response.statusCode}, body is ${response.body}');
throw Exception('ErrorCode is ${response.statusCode}, body is ${response.body}');
}
}

static Future<void> setMaxSubscriptionsPerTopic(String host, int port, String tenant, String namespace, int? maxSubscriptionsPerTopic) async {
String url = 'http://$host:${port.toString()}/admin/v2/namespaces/$tenant/$namespace/maxSubscriptionsPerTopic';
var response = await http.post(Uri.parse(url),
headers: <String, String>{
'Content-Type': 'application/json; charset=UTF-8',
},
body: maxSubscriptionsPerTopic.toString());
if (HttpUtil.abnormal(response.statusCode)) {
log('ErrorCode is ${response.statusCode}, body is ${response.body}');
throw Exception('ErrorCode is ${response.statusCode}, body is ${response.body}');
}
}

static Future<void> setMaxTopicsPerNamespace(String host, int port, String tenant, String namespace, int? maxTopicsPerNamespace) async {
String url = 'http://$host:${port.toString()}/admin/v2/namespaces/$tenant/$namespace/maxTopicsPerNamespace';
var response = await http.post(Uri.parse(url),
headers: <String, String>{
'Content-Type': 'application/json; charset=UTF-8',
},
body: maxTopicsPerNamespace.toString());
if (HttpUtil.abnormal(response.statusCode)) {
log('ErrorCode is ${response.statusCode}, body is ${response.body}');
throw Exception('ErrorCode is ${response.statusCode}, body is ${response.body}');
}
}

static Future<void> updateBacklogQuota(
String host, int port, String tenant, String namespace, int limit, int? limitTime, String policy) async {
String url = 'http://$host:${port.toString()}/admin/v2/namespaces/$tenant/$namespace/backlogQuota';
Expand Down
86 changes: 86 additions & 0 deletions lib/module/pulsar/pulsar_namespace.dart
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

class NamespaceResp {
final String namespace;

Expand Down Expand Up @@ -31,6 +32,36 @@ class BacklogQuotaReq {
}
}

class TopicAutoCreateReq {
final bool? allowAutoTopicCreation;
final String? topicType;
final int? defaultNumPartitions;

TopicAutoCreateReq(this.allowAutoTopicCreation, this.topicType, this.defaultNumPartitions);

Map toJson() {
Map map = new Map();
map["allowAutoTopicCreation"] = this.allowAutoTopicCreation;
if (topicType != null) {
map["topicType"] = this.topicType!;
}
map["defaultNumPartitions"] = this.defaultNumPartitions;
return map;
}
}

class MaxProducersPerTopicReq {
int? maxProducersPerTopic;

MaxProducersPerTopicReq(this.maxProducersPerTopic);

Map toJson() {
Map map = new Map();
map["maxProducersPerTopic"] = this.maxProducersPerTopic;
return map;
}
}

class BacklogQuotaResp {
final int? limitSize;
final int? limitTime;
Expand All @@ -42,3 +73,58 @@ class BacklogQuotaResp {
return BacklogQuotaResp(map["limitSize"], map["limitTime"], map["policy"]);
}
}

class PolicyResp {
final bool? isAllowAutoTopicCreation;
final String? topicType;
final int? defaultNumPartitions;
final List<dynamic>? boundaries;
final int numBundles;
final int? messageTTLInSeconds;
final int? maxProducersPerTopic;
final int? maxConsumersPerTopic;
final int? maxConsumersPerSubscription;
final int? maxUnackedMessagesPerConsumer;
final int? maxUnackedMessagesPerSubscription;
final int? maxSubscriptionsPerTopic;
final int? maxTopicsPerNamespace;

PolicyResp(this.isAllowAutoTopicCreation, this.topicType, this.defaultNumPartitions, this.boundaries, this.numBundles, this.messageTTLInSeconds, this.maxProducersPerTopic,
this.maxConsumersPerTopic, this.maxConsumersPerSubscription, this.maxUnackedMessagesPerConsumer, this.maxUnackedMessagesPerSubscription,
this.maxSubscriptionsPerTopic, this.maxTopicsPerNamespace);

factory PolicyResp.fromJson(Map map) {
var autoTopicCreate = map["autoTopicCreationOverride"];
var isAllowAutoTopicCreation;
var topicType;
var defaultNumPartitions;
if (autoTopicCreate != null) {
isAllowAutoTopicCreation = autoTopicCreate["allowAutoTopicCreation"];
topicType = autoTopicCreate["topicType"];
defaultNumPartitions = autoTopicCreate["defaultNumPartitions"];
}
var bundleData = map["bundles"];
var boundaries;
var numBundles;
if (bundleData != null) {
boundaries = bundleData["boundaries"];
numBundles = bundleData["numBundles"];
}
return PolicyResp(isAllowAutoTopicCreation, topicType, defaultNumPartitions, boundaries, numBundles, map["message_ttl_in_seconds"], map["max_producers_per_topic"],
map["max_consumers_per_topic"], map["max_consumers_per_subscription"], map["max_unacked_messages_per_consumer"],
map["max_unacked_messages_per_subscription"], map["max_subscriptions_per_topic"], map["max_topics_per_namespace"]);
}
}

class AutoTopicCreation {
bool? isAllowAutoTopicCreation;
String? topicType;
int? defaultNumPartitions;

AutoTopicCreation(this.isAllowAutoTopicCreation, this.topicType, this.defaultNumPartitions);
}

class BundlesData {
List<String>? boundaries;
int? numBundles;
}
Loading

0 comments on commit 8848d61

Please sign in to comment.