Skip to content
This repository has been archived by the owner on Oct 12, 2022. It is now read-only.

Commit

Permalink
fix #523
Browse files Browse the repository at this point in the history
  • Loading branch information
heyanlong committed Aug 30, 2022
1 parent a829574 commit eabc690
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 15 deletions.
2 changes: 1 addition & 1 deletion skywalking.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ static void php_skywalking_init_globals(zend_skywalking_globals *skywalking_glob

skywalking_globals->oap_version = "9.0.0";
skywalking_globals->oap_cross_process_protocol = "3.0";
skywalking_globals->oap_authentication = NULL;
skywalking_globals->oap_authentication = "";

// tls
skywalking_globals->grpc_address = NULL;
Expand Down
33 changes: 20 additions & 13 deletions src/reporter/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ use tokio::signal::{self, unix::SignalKind};
use tonic::{
transport::{Channel, Endpoint},
Request,
metadata::{
MetadataMap, MetadataValue
}
};
use std::os::raw::c_char;
use std::thread;
Expand Down Expand Up @@ -114,7 +117,7 @@ pub struct Reporter {
pub service_instance: *const c_char,
}

pub fn init(address: String, service: String, mut service_instance: String, log_level: String, log_path: String) -> anyhow::Result<()> {
pub fn init(address: String, service: String, mut service_instance: String, log_level: String, log_path: String, authentication: String) -> anyhow::Result<()> {
let mut level = simplelog::LevelFilter::Debug;

if log_level == "disable" {
Expand Down Expand Up @@ -147,15 +150,15 @@ pub fn init(address: String, service: String, mut service_instance: String, log_
.worker_threads(4)
.enable_all()
.build()?;
rt.block_on(worker(address, service, service_instance));
rt.block_on(worker(address, service, service_instance, authentication));
Ok(())
}

pub async fn worker(address: String, service: String, service_instance: String) {
pub async fn worker(address: String, service: String, service_instance: String, authentication: String) {
let (segment_sender, segment_receiver) = counted_channel(5000);
spawn(do_connect(address.to_owned()));
spawn(login(service.to_owned(), service_instance.to_owned()));
spawn(keep_alive(service.to_owned(), service_instance.to_owned()));
spawn(login(service.to_owned(), service_instance.to_owned(), authentication.to_owned()));
spawn(keep_alive(service.to_owned(), service_instance.to_owned(), authentication.to_owned()));
spawn(sender(segment_receiver));
spawn(receive(segment_sender));

Expand Down Expand Up @@ -191,7 +194,7 @@ pub async fn do_connect(address: String) {
GRPC_CHANNEL.set(channel);
}

pub async fn login(service: String, service_instance: String) {
pub async fn login(service: String, service_instance: String, authentication: String) {
let props = vec![
KeyStringValuePair {
key: "os_name".to_owned(),
Expand Down Expand Up @@ -229,7 +232,7 @@ pub async fn login(service: String, service_instance: String) {
None => continue,
};
log::debug!("login instance {:?}", instance);
match do_login(channel.clone(), instance.clone()).await {
match do_login(channel.clone(), instance.clone(), authentication.clone()).await {
Ok(r) => {
REGISTER.set(true);
break;
Expand All @@ -242,13 +245,15 @@ pub async fn login(service: String, service_instance: String) {
}
}

pub async fn do_login(channel: Channel, instance: InstanceProperties) -> anyhow::Result<()> {
pub async fn do_login(channel: Channel, instance: InstanceProperties, authentication: String) -> anyhow::Result<()> {
let mut client = ManagementServiceClient::new(channel);
client.report_instance_properties(Request::new(instance)).await?;
let mut request = Request::new(instance);
request.metadata_mut().insert("Authentication", MetadataValue::from(authentication));
client.report_instance_properties(request).await?;
Ok(())
}

pub async fn keep_alive(service: String, service_instance: String) {
pub async fn keep_alive(service: String, service_instance: String, authentication: String) {
let instance = InstancePingPkg {
service: service.clone(),
service_instance: service_instance.clone(),
Expand All @@ -267,16 +272,18 @@ pub async fn keep_alive(service: String, service_instance: String) {
None => continue,
};
log::debug!("keep alive instance {:?}", instance);
match do_keep_alive(channel.clone(), instance.clone()).await {
match do_keep_alive(channel.clone(), instance.clone(), authentication.clone()).await {
Ok(r) => continue,
Err(e) => continue
};
}
}

pub async fn do_keep_alive(channel: Channel, instance: InstancePingPkg) -> anyhow::Result<()> {
pub async fn do_keep_alive(channel: Channel, instance: InstancePingPkg, authentication: String) -> anyhow::Result<()> {
let mut client = ManagementServiceClient::new(channel);
client.keep_alive(Request::new(instance)).await?;
let mut request = Request::new(instance);
request.metadata_mut().insert("Authentication", MetadataValue::from(authentication));
client.keep_alive(request).await?;
Ok(())
}

Expand Down
9 changes: 8 additions & 1 deletion src/sky_core_report.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,14 @@ bool sky_core_report_ipc_init(size_t max_length);

bool sky_core_report_ipc_send(char *data, size_t len);

bool sky_core_report_new(char *address, char *service, char *service_instance, char *log_level, char *log_path);
bool sky_core_report_new(
char *address,
char *service,
char *service_instance,
char *log_level,
char *log_path,
char *oap_authentication
);

char *sky_core_report_trace_id();

Expand Down
3 changes: 3 additions & 0 deletions src/sky_core_report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,22 @@ extern "C" fn sky_core_report_new(
service_instance: *const c_char,
log_level: *const c_char,
log_path: *const c_char,
oap_authentication: *const c_char
) -> bool {
let f = || unsafe {
let address = CStr::from_ptr(address).to_str()?;
let service = CStr::from_ptr(service).to_str()?;
let service_instance = CStr::from_ptr(service_instance).to_str()?;
let log_level = CStr::from_ptr(log_level).to_str()?;
let log_path = CStr::from_ptr(log_path).to_str()?;
let oap_authentication = CStr::from_ptr(oap_authentication).to_str()?;
grpc::init(
address.to_string(),
service.to_string(),
service_instance.to_string(),
log_level.to_string(),
log_path.to_string(),
oap_authentication.to_string(),
)?;
Ok::<_, anyhow::Error>(())
};
Expand Down

0 comments on commit eabc690

Please sign in to comment.