Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support produce message to partition #50

Merged
merged 3 commits into from
Jan 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions lib/api/pulsar/pulsar_topic_api.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import 'dart:developer';
import 'package:http/http.dart' as http;
import 'package:paas_dashboard_flutter/api/http_util.dart';
import 'package:paas_dashboard_flutter/api/pulsar/pulsar_stat_api.dart';
import 'package:paas_dashboard_flutter/module/pulsar/const.dart';
import 'package:paas_dashboard_flutter/module/pulsar/pulsar_consume.dart';
import 'package:paas_dashboard_flutter/module/pulsar/pulsar_produce.dart';
import 'package:paas_dashboard_flutter/module/pulsar/pulsar_producer.dart';
import 'package:paas_dashboard_flutter/module/pulsar/pulsar_subscription.dart';
import 'package:paas_dashboard_flutter/module/pulsar/pulsar_topic.dart';
Expand Down Expand Up @@ -236,4 +238,23 @@ class PulsarTopicApi {
return new PulsarTopicBaseResp(
topicName, partitionNum, msgRateIn, msgRateOut, msgInCounter, msgOutCounter, storageSize);
}

static Future<String> sendMsg(
String host, int port, String tenant, String namespace, String topic, String partition, key, value) async {
ProducerMessage producerMessage = new ProducerMessage(key, value);
List<ProducerMessage> messageList = new List.empty(growable: true);
messageList.add(producerMessage);
PublishMessagesReq messagesReq = new PublishMessagesReq(PulsarConst.defaultProducerName, messageList);
var url = 'http://$host:${port.toString()}/topics/persistent/$tenant/$namespace/$topic/partitions/$partition';
var response = await http.post(Uri.parse(url),
headers: <String, String>{
'Content-Type': 'application/json; charset=UTF-8',
},
body: json.encode(messagesReq));
if (HttpUtil.abnormal(response.statusCode)) {
log('ErrorCode is ${response.statusCode}, body is ${response.body}');
return "send msg failed, " + response.body;
}
return "send msg success";
}
}
10 changes: 9 additions & 1 deletion lib/ui/pulsar/screen/pulsar_topic.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ import 'package:paas_dashboard_flutter/generated/l10n.dart';
import 'package:paas_dashboard_flutter/ui/pulsar/widget/pulsar_topic_basic.dart';
import 'package:paas_dashboard_flutter/ui/pulsar/widget/pulsar_topic_consume.dart';
import 'package:paas_dashboard_flutter/ui/pulsar/widget/pulsar_topic_consumer.dart';
import 'package:paas_dashboard_flutter/ui/pulsar/widget/pulsar_topic_produce.dart';
import 'package:paas_dashboard_flutter/ui/pulsar/widget/pulsar_topic_producer.dart';
import 'package:paas_dashboard_flutter/ui/pulsar/widget/pulsar_topic_subscription.dart';
import 'package:paas_dashboard_flutter/vm/pulsar/pulsar_topic_basic_view_model.dart';
import 'package:paas_dashboard_flutter/vm/pulsar/pulsar_topic_consume_view_model.dart';
import 'package:paas_dashboard_flutter/vm/pulsar/pulsar_topic_consumer_view_model.dart';
import 'package:paas_dashboard_flutter/vm/pulsar/pulsar_topic_produce_view_model.dart';
import 'package:paas_dashboard_flutter/vm/pulsar/pulsar_topic_producer_view_model.dart';
import 'package:paas_dashboard_flutter/vm/pulsar/pulsar_topic_subscription_view_model.dart';
import 'package:paas_dashboard_flutter/vm/pulsar/pulsar_topic_view_model.dart';
Expand All @@ -29,7 +31,7 @@ class _PulsarTopicState extends State<PulsarTopic> {
Widget build(BuildContext context) {
final vm = Provider.of<PulsarTopicViewModel>(context);
return DefaultTabController(
length: 5,
length: 6,
child: Scaffold(
appBar: AppBar(
title: Text(
Expand All @@ -41,6 +43,7 @@ class _PulsarTopicState extends State<PulsarTopic> {
Tab(text: S.of(context).consumer),
Tab(text: S.of(context).producer),
Tab(text: S.of(context).consume),
Tab(text: S.of(context).produce),
],
),
),
Expand Down Expand Up @@ -71,6 +74,11 @@ class _PulsarTopicState extends State<PulsarTopic> {
PulsarTopicConsumeViewModel(vm.pulsarInstancePo, vm.tenantResp, vm.namespaceResp, vm.topicResp),
child: PulsarTopicConsumeWidget(),
).build(context),
ChangeNotifierProvider(
create: (context) =>
PulsarTopicProduceViewModel(vm.pulsarInstancePo, vm.tenantResp, vm.namespaceResp, vm.topicResp),
child: PulsarTopicProduceWidget(),
).build(context),
],
),
),
Expand Down
59 changes: 59 additions & 0 deletions lib/ui/pulsar/widget/pulsar_topic_produce.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import 'package:flutter/material.dart';
import 'package:paas_dashboard_flutter/ui/util/exception_util.dart';
import 'package:paas_dashboard_flutter/ui/util/form_util.dart';
import 'package:paas_dashboard_flutter/ui/util/spinner_util.dart';
import 'package:paas_dashboard_flutter/vm/pulsar/pulsar_topic_produce_view_model.dart';
import 'package:provider/provider.dart';

class PulsarTopicProduceWidget extends StatefulWidget {
PulsarTopicProduceWidget();

@override
State<StatefulWidget> createState() {
return new PulsarTopicProduceWidgetState();
}
}

class PulsarTopicProduceWidgetState extends State<PulsarTopicProduceWidget> {
@override
void initState() {
super.initState();
}

@override
Widget build(BuildContext context) {
final vm = Provider.of<PulsarTopicProduceViewModel>(context);
if (vm.loading) {
WidgetsBinding.instance!.addPostFrameCallback((timeStamp) {
SpinnerUtil.create();
});
}
ExceptionUtil.processLoadException(vm, context);
ExceptionUtil.processOpException(vm, context);
var produceMsgButton = createInstanceButton(context);
var body = ListView(
children: [
Container(
height: 50,
child: ListView(
scrollDirection: Axis.horizontal,
shrinkWrap: true,
children: [produceMsgButton],
),
),
],
);
return Scaffold(body: body);
}

ButtonStyleButton createInstanceButton(BuildContext context) {
final vm = Provider.of<PulsarTopicProduceViewModel>(context, listen: false);
var list = [
FormFieldDef('message key'),
FormFieldDef('message value'),
];
return FormUtil.createButton2NoText("Send Message To Pulsar", list, context, (key, value) {
vm.sendMsg(key, value);
});
}
}
59 changes: 59 additions & 0 deletions lib/vm/pulsar/pulsar_topic_produce_view_model.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import 'package:paas_dashboard_flutter/api/pulsar/pulsar_topic_api.dart';
import 'package:paas_dashboard_flutter/module/pulsar/pulsar_namespace.dart';
import 'package:paas_dashboard_flutter/module/pulsar/pulsar_tenant.dart';
import 'package:paas_dashboard_flutter/module/pulsar/pulsar_topic.dart';
import 'package:paas_dashboard_flutter/persistent/po/pulsar_instance_po.dart';
import 'package:paas_dashboard_flutter/vm/base_load_list_view_model.dart';

class PulsarTopicProduceViewModel extends BaseLoadListViewModel {
final PulsarInstancePo pulsarInstancePo;
final TenantResp tenantResp;
final NamespaceResp namespaceResp;
final TopicResp topicResp;

PulsarTopicProduceViewModel(this.pulsarInstancePo, this.tenantResp, this.namespaceResp, this.topicResp);

int get id {
return this.pulsarInstancePo.id;
}

String get name {
return this.pulsarInstancePo.name;
}

String get host {
return this.pulsarInstancePo.host;
}

int get port {
return this.pulsarInstancePo.port;
}

String get tenant {
return this.tenantResp.tenant;
}

String get namespace {
return this.namespaceResp.namespace;
}

String get topic {
return this.topicResp.topicName;
}

String get message {
return this.topicResp.topicName;
}

Future<String> sendMsg(key, value) {
return PulsarTopicApi.sendMsg(host, port, tenant, namespace, getTopic(), getPartition(), key, value);
}

String getTopic() {
return topic.split("-partition-")[0];
}

String getPartition() {
return topic.split("-partition-")[1];
}
}