-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Core][Metrics] Add Seatunnel Metrics module #2888
Conversation
Please solved ci problems. Thanks |
if(!config.hasPath(ConfigKeyName.Metric_Class)){ | ||
return StreamExecutionEnvironment.getExecutionEnvironment(); | ||
} | ||
//构建flink-metrics参数 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use english only
@@ -322,4 +326,61 @@ private void setCheckpoint() { | |||
} | |||
} | |||
|
|||
public StreamExecutionEnvironment creatMetricStreamEEnvironment() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public StreamExecutionEnvironment creatMetricStreamEEnvironment() { | |
public StreamExecutionEnvironment creatStreamEnvironment() { |
Not only have metrics config will run this method.
@@ -45,4 +45,9 @@ private ConfigKeyName() { | |||
public static final String MIN_STATE_RETENTION_TIME = "execution.query.state.min-retention"; | |||
public static final String STATE_BACKEND = "execution.state.backend"; | |||
public static final String PLANNER = "execution.planner"; | |||
public static final String Metric_Interval = "execution.metrics.interval"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use unified name in spark/flink. There config name is for SeaTunnel
, not only for Flink
or Spark
.
.stringType() | ||
.noDefaultValue(); | ||
|
||
Configuration seatunnel_reporter = new Configuration().set(REPORTERS_LIST, "seatunnel_reporter").set(REPORTER_CLASS, "org.apache.seatunnel.metrics.flink.SeatunnelMetricReporter"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use lower camel case
for field and parameter. Change seatunnel_reporter
to seatunnelReporter
return StreamExecutionEnvironment.getExecutionEnvironment(); | ||
} | ||
//构建flink-metrics参数 | ||
ConfigOption<String> REPORTERS_LIST = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you can create an new method called initMetricConfig(Configuration config)
to cover this logic.
|
||
<dependencies> | ||
<dependency> | ||
<groupId>io.prometheus</groupId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Core can't contains prometheus, your should create new module call seatunnel-metrics-prometheus
, then implement your metrics push to prometheus logic in there. The core should only have interface for metrics and common code.
/** | ||
* A reporter which outputs measurements to PrometheusPushGateway | ||
*/ | ||
public class PrometheusPushGatewayReporter implements MetricReporter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move to seatunnel-metrics-prometheus
builder.append("helpString: ") | ||
.append(this.helpString) | ||
.append(lineSeparator); | ||
for(int i=0;i<this.dimensionKeys.size();i++){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code need be formated.
jobName = config.getString("jobName","flinkJob"); | ||
//config. | ||
//String string = metricConfig.getString("name", "de"); | ||
//log.info("StreamMetricReporter init:{}", string); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove useless code.
* exports Flink metrics to Seatunnel | ||
*/ | ||
public class SeatunnelMetricReporter extends AbstractSeatunnelReporter implements Scheduled { | ||
private static final Logger log = LoggerFactory.getLogger(SeatunnelMetricReporter.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private static final Logger log = LoggerFactory.getLogger(SeatunnelMetricReporter.class); | |
private static final Logger LOGGER = LoggerFactory.getLogger(SeatunnelMetricReporter.class); |
…sink (apache#2866) * parameter verification * update * update
* [Improve][DOC] Perfect the connector v2 doc * Update seatunnel-connectors-v2/README.zh.md Co-authored-by: Hisoka <fanjiaeminem@qq.com> * [Improve][DOC] A little tinkering * [Improve][DOC] A little tinkering * [Doc][connector] add Console sink doc close apache#2794 * [Doc][connector] add Console sink doc close apache#2794 * fix some problem * fix some problem * fine tuning Co-authored-by: Hisoka <fanjiaeminem@qq.com>
…a types (apache#2860) * [Improve][Connector-V2] Improve orc write strategy to support all data types Co-authored-by: tyrantlucifer <tyrantlucifer@gmail.com>
…(row) (apache#2839) * [Improve][Connector-v2]Supports direct definition of data values(row)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey, I have a different idea about Metrics.
For various engines, they already have metrics and metric reporters, and engine users have also managed metrics based on their metrics reporters.
Seatunnel just submits an engine job, and it is unreasonable for users to do additional metrics management.
My opinion is that seatunnel provides unified metrics for the connector-v2 api, and then adds the metrics of seatunnel to the existing reporter of the engine.
For older versions of the connector, the engine's own metrics are used directly.
The metrics reporter of seatunnel may only be used for the engine of seatunnel itself.
This is a feature will do next.
There alway can use engine own metrics both old version connector and connector-v2
At now, our reporter can support flink/spark/seatunnel with a little work. It is not conflict with their own reporter. |
histogramsIndex.put(new SimpleHistogram(key.getCount(), key.getStatistics().getMin(), key.getStatistics().getMax(), key.getStatistics().getStdDev(), key.getStatistics().getMean(), quantile), metric.getValue()); | ||
} | ||
//todo handle user config | ||
reporter = new PrometheusPushGatewayReporter(jobName, host, port); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved outside the SeatunnelMetricReporter#report()
method?
@Override | ||
public PrometheusPushGatewayReporter open() { | ||
//todo Handle user config | ||
return new PrometheusPushGatewayReporter("flink_prometheus_job", "localhost", DEFAULT_PORT); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why create PrometheusPushGatewayReporter
object again?
|
||
/** Reporters are used to export seatunnel {@link Metric Metrics} to an external backend. */ | ||
public interface MetricReporter { | ||
MetricReporter open(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why return MetricReporter
object? this is object method
}), | ||
newMetricInfo(metricName, dimensionKeys, dimensionValues)) | ||
} | ||
val reporter = new PrometheusPushGatewayReporter(pollJobName, pollHost, pollPort) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above
check file license header |
# Conflicts: # seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/resources/file/fakesource_to_local_orc.conf
2. Use reflection to automate assembly 3. Modify the flink/spark startup function 4. Try packaging configuration (todo)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
We merge it into |
#2735
#2592
Purpose of this pull request
Complete the code of seatuunel-metrics according to the framework of the issue, and add module seatuunel-metrics, including seatuunel-core, seatuunel-flink, seatuunel-spark three modules.
seatuunel-core is responsible for the basic definition of the seatunnel indicator system, including the definition of indicators and external interfaces
seatuunel-flink is responsible for the connection between seatunel and flink metrics, and accepts flink metrics
seatuunel-spark is responsible for the connection between seatuunel and spark metrics, and accepts spark metrics
Check list
New License Guide