Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(deps): Upgrade to Rust 1.50.0 #6428

Merged
merged 17 commits into from
Feb 18, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions lib/remap-lang/src/expression/assignment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ impl Expression for Assignment {
fn execute(&self, state: &mut state::Program, object: &mut dyn Object) -> Result<Value> {
let value = self.value.execute(state, object);

// ignoring the unnecessariy wrap as this whole parser is going away momentarily and this
// matches up better with the other *_assignment methods anyway
#[allow(clippy::unnecessary_wraps)]
fn var_assignment<'a>(
state: &mut state::Program,
var: &Variable,
Expand Down
3 changes: 3 additions & 0 deletions lib/remap-lang/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,9 @@ impl<'a> Parser<'a> {
Ok((start..end, nodes).into())
}

// ignoring the unnecessariy wrap as this whole parser is going away momentarily and this
// matches up better with the other *_from_* methods anyway
#[allow(clippy::unnecessary_wraps)]
fn pairs_from_str<'b>(&mut self, rule: R, source: &'b str) -> IResult<Pairs<'b, R>> {
use pest::Parser;

Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.49.0
1.50.0
2 changes: 1 addition & 1 deletion src/api/schema/components/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl Sink {

/// Sink metrics
pub async fn metrics(&self) -> metrics::SinkMetrics {
metrics::by_component_name(self.get_name()).to_sink_metrics(self.get_component_type())
metrics::by_component_name(self.get_name()).into_sink_metrics(self.get_component_type())
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/api/schema/components/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl Source {

/// Source metrics
pub async fn metrics(&self) -> metrics::SourceMetrics {
metrics::by_component_name(&self.get_name()).to_source_metrics(&self.get_component_type())
metrics::by_component_name(&self.get_name()).into_source_metrics(&self.get_component_type())
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/api/schema/components/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl Transform {

/// Transform metrics
pub async fn metrics(&self) -> metrics::TransformMetrics {
metrics::by_component_name(&self.0.name).to_transform_metrics(&self.get_component_type())
metrics::by_component_name(&self.0.name).into_transform_metrics(&self.get_component_type())
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/api/schema/metrics/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ pub enum SinkMetrics {
}

pub trait IntoSinkMetrics {
fn to_sink_metrics(self, component_type: &str) -> SinkMetrics;
fn into_sink_metrics(self, component_type: &str) -> SinkMetrics;
}

impl IntoSinkMetrics for Vec<Metric> {
fn to_sink_metrics(self, _component_type: &str) -> SinkMetrics {
fn into_sink_metrics(self, _component_type: &str) -> SinkMetrics {
SinkMetrics::GenericSinkMetrics(generic::GenericSinkMetrics::new(self))
}
}
4 changes: 2 additions & 2 deletions src/api/schema/metrics/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ pub enum SourceMetrics {
}

pub trait IntoSourceMetrics {
fn to_source_metrics(self, component_type: &str) -> SourceMetrics;
fn into_source_metrics(self, component_type: &str) -> SourceMetrics;
}

impl IntoSourceMetrics for Vec<Metric> {
fn to_source_metrics(self, component_type: &str) -> SourceMetrics {
fn into_source_metrics(self, component_type: &str) -> SourceMetrics {
match component_type {
"file" => SourceMetrics::FileSourceMetrics(file::FileSourceMetrics::new(self)),
_ => SourceMetrics::GenericSourceMetrics(generic::GenericSourceMetrics::new(self)),
Expand Down
4 changes: 2 additions & 2 deletions src/api/schema/metrics/transform/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ pub enum TransformMetrics {
}

pub trait IntoTransformMetrics {
fn to_transform_metrics(self, component_type: &str) -> TransformMetrics;
fn into_transform_metrics(self, component_type: &str) -> TransformMetrics;
}

impl IntoTransformMetrics for Vec<Metric> {
fn to_transform_metrics(self, _component_type: &str) -> TransformMetrics {
fn into_transform_metrics(self, _component_type: &str) -> TransformMetrics {
TransformMetrics::GenericTransformMetrics(generic::GenericTransformMetrics::new(self))
}
}
10 changes: 5 additions & 5 deletions src/internal_events/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,13 @@ fn gauge_add(gauge: &AtomicUsize, add: isize, emitter: impl Fn(usize)) {
emitter(new_value);
// Try to update gauge to new value and releasing writes to gauge metric in the process.
// Otherwise acquire new writes to gauge metric.
let latest = gauge.compare_and_swap(value, new_value, Ordering::AcqRel);
if value == latest {
value = match gauge.compare_exchange(value, new_value, Ordering::AcqRel, Ordering::Acquire)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bruceg I'd appreciate your 👀 here. compare_and_swap was deprecated. I've swapped it for the equivalent compare_exchange call, but there is also a compare_exchange_weak call that can purportedly generate more efficient assembly. The difference is:

Unlike AtomicUsize::compare_exchange, this function is allowed to spuriously fail even when the comparison succeeds, which can result in more efficient code on some platforms. The return value is a result indicating whether the new value was written and containing the previous value.

The nuance being that it might sometimes fail even if it could have succeeded.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compare_exchange_weak will play much better on ARM and we should aim to use it. compare_exchange emits instructions that require strong ordering of memory. A strong ordering means atomic operations are implicit acquire/release and probably Total Sorted Order (every CPU agrees about the order memory operations took place) where weak ordering allows CPUs to disagree, which is cool because some of them can shut off if need be or do other stuff. (ARM is weak, data-dependent which means every CPU eventually agrees on the results of operations and A->B dereference is guaranteed that A is at least as fresh as B. When systems don't support data-dependency things get bonkers.)

For x86 with implicit strong order we'll never see additional fail loops, if we were an omniscient observer of the system. On ARM it's possible that we will because the CPU our instructions run on is lagging getting updates from other CPUs but it's an acceptable trade-off except in specialized circumstances.

I also suggest using ordering semantics AcqRel for success here, like you've done, and Relaxed for failure. On ARM this'll mean more potential spins but overall more efficient results.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Went ahead and made myself #6477 to tackle this.

Copy link
Member Author

@jszwedko jszwedko Feb 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed response! I think what you said makes sense and agree with the AcqRel for success and Relaxed for failure. I'll leave that to your PR since this one maintains the current state.

On ARM it's possible that we will because the CPU our instructions run on is lagging getting updates from other CPUs but it's an acceptable trade-off except in specialized circumstances.

To deepen my understanding, under what circumstances would it be unacceptable?

I did note that the Rust PR for deprecating compare_and_swap only introduced one compare_exchange_weak:

rust-lang/rust#79261

The PR comment seems to indicate that they were mostly just going for satisfying the deprecation rather than with an eye to improving performance though.

Copy link
Contributor

@blt blt Feb 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed response! I think what you said makes sense and agree with the AcqRel for success and Relaxed for failure. I'll leave that to your PR since this one maintains the current state.

Cool. I'm good with that.

On ARM it's possible that we will because the CPU our instructions run on is lagging getting updates from other CPUs but it's an acceptable trade-off except in specialized circumstances.

To deepen my understanding, under what circumstances would it be unacceptable?

Solid question. Consider a situation where you execute some expensive function per loop, maybe it has a non-trivial side-effect or is expensive in CPU terms but you can't migrate it out of the loop. You might be better off paying the higher synchronization cost here, depending on what your goals are.

The PR comment seems to indicate that they were mostly just going for satisfying the deprecation rather than with an eye to improving performance though.

Quite right. Huh. Well, I was going off my own background understanding; ARM'll get better instructions emitted if we use compare_exchange_weak. If you take this program

use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

pub fn mul(val: &Arc<AtomicU64>, factor: u64) {
    let mut old = val.load(Ordering::Relaxed);
    loop {
        let new = old * factor;
        match val.compare_exchange_weak(old, new, Ordering::AcqRel, Ordering::Relaxed) {
            Ok(_) => break,
            Err(x) => old = x,
        }
    }
}

pub fn main() {
    let val = Arc::new(AtomicU64::new(4));
    mul(&val, 8);
    println!("{:?}", val);
}

and generate x86 you'll see a lock cmpxchg instruction pair whether you use compare_exchange_weak or compare_exchange. On ARM when I compile for arm-unknown-linux-gnueabihf at opt-level 3 the _weak variant of the program clobbers less registers, has fewer exclusive loads and does less conditional work. I actually find opt-level 3 hard to read -- I think the compiler noticed all my values are powers of 2 and did clever things from that -- and fortunately when I call opt-level 0 it embeds the assembly of compare_exchange_weak and compare_exchange. The strong version has more callee-save registers than I understand and calls further into helper functions that aren't inlined. The compiler source calls out to intrinsics, which makes sense, but without digging in further and brushing up on ARM -- which I need to do eventually -- I think the best I can offer is the strong variant emits more instructions to force coordination with the other CPUs. An ARM expert could pin point exactly where in the assembly for the above this happens, and if someone out there reading this knows I'd love to get taught. :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Solid question. Consider a situation where you execute some expensive function per loop, maybe it has a non-trivial side-effect or is expensive in CPU terms but you can't migrate it out of the loop. You might be better off paying the higher synchronization cost here, depending on what your goals are.

👍 Yeah that makes sense as a trade-off. Thanks again for the thorough explanation!

{
// Success
break;
Ok(_) => break,
// Try again with new value
Err(v) => v,
}
// Try again with new value
value = latest;
}

// In the worst case scenario we will emit `n^2 / 2` times when there are `n` parallel
Expand Down
6 changes: 2 additions & 4 deletions src/mapping/query/function/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ impl SplitFn {
}

impl Function for SplitFn {
#[allow(clippy::collapsible_match)] // I expect this file to be going away shortly
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JeanMertz @FungusHumungus is this accurate? It seems like src/mapping is largely vestigial at this point. Do you think we'd drop it as part of #6353 ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, feel free to ignore this. I'll drop it once #6353 makes it into master (later today, as the CI is expected to be green within the hour).

fn execute(&self, ctx: &Event) -> Result<QueryValue> {
let string = {
let bytes = required_value!(ctx, self.path, Value::Bytes(v) => v);
Expand Down Expand Up @@ -71,10 +72,7 @@ impl Function for SplitFn {
},
Parameter {
keyword: "pattern",
accepts: |v| {
matches!(v, QueryValue::Value(Value::Bytes(_))
| QueryValue::Regex(_))
},
accepts: |v| matches!(v, QueryValue::Value(Value::Bytes(_)) | QueryValue::Regex(_)),
required: true,
},
Parameter {
Expand Down
10 changes: 5 additions & 5 deletions src/sinks/aws_kinesis_firehose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ impl KinesisFirehoseService {
cx.acker(),
)
.sink_map_err(|error| error!(message = "Fatal kinesis firehose sink error.", %error))
.with_flat_map(move |e| stream::iter(encode_event(e, &encoding)).map(Ok));
.with_flat_map(move |e| stream::iter(Some(encode_event(e, &encoding))).map(Ok));

Ok(sink)
}
Expand Down Expand Up @@ -240,7 +240,7 @@ enum HealthcheckError {
StreamNamesMismatch { name: String, stream_name: String },
}

fn encode_event(mut event: Event, encoding: &EncodingConfig<Encoding>) -> Option<Record> {
fn encode_event(mut event: Event, encoding: &EncodingConfig<Encoding>) -> Record {
encoding.apply_rules(&mut event);
let log = event.into_log();
let data = match encoding.codec() {
Expand All @@ -254,7 +254,7 @@ fn encode_event(mut event: Event, encoding: &EncodingConfig<Encoding>) -> Option

let data = Bytes::from(data);

Some(Record { data })
Record { data }
}

#[cfg(test)]
Expand All @@ -270,7 +270,7 @@ mod tests {
#[test]
fn firehose_encode_event_text() {
let message = "hello world".to_string();
let event = encode_event(message.clone().into(), &Encoding::Text.into()).unwrap();
let event = encode_event(message.clone().into(), &Encoding::Text.into());

assert_eq!(&event.data[..], message.as_bytes());
}
Expand All @@ -280,7 +280,7 @@ mod tests {
let message = "hello world".to_string();
let mut event = Event::from(message.clone());
event.as_mut_log().insert("key", "value");
let event = encode_event(event, &Encoding::Json.into()).unwrap();
let event = encode_event(event, &Encoding::Json.into());

let map: BTreeMap<String, String> = serde_json::from_slice(&event.data[..]).unwrap();

Expand Down
10 changes: 4 additions & 6 deletions src/sinks/nats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,16 @@ impl SinkConfig for NatsSinkConfig {
}

impl NatsSinkConfig {
fn to_nats_options(&self) -> crate::Result<nats::Options> {
fn to_nats_options(&self) -> nats::Options {
// Set reconnect_buffer_size on the nats client to 0 bytes so that the
// client doesn't buffer internally (to avoid message loss).
let options = nats::Options::new()
nats::Options::new()
.with_name(&self.name)
.reconnect_buffer_size(0);

Ok(options)
.reconnect_buffer_size(0)
}

async fn connect(&self) -> crate::Result<nats::asynk::Connection> {
self.to_nats_options()?
self.to_nats_options()
.connect_async(&self.url)
.map_err(|e| e.into())
.await
Expand Down
9 changes: 4 additions & 5 deletions src/sinks/papertrail.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl SinkConfig for PapertrailConfig {

let sink_config = TcpSinkConfig::new(address, self.keepalive, tls, self.send_buffer_bytes);

sink_config.build(cx, move |event| encode_event(event, pid, &encoding))
sink_config.build(cx, move |event| Some(encode_event(event, pid, &encoding)))
}

fn input_type(&self) -> DataType {
Expand All @@ -77,7 +77,7 @@ impl SinkConfig for PapertrailConfig {
}
}

fn encode_event(mut event: Event, pid: u32, encoding: &EncodingConfig<Encoding>) -> Option<Bytes> {
fn encode_event(mut event: Event, pid: u32, encoding: &EncodingConfig<Encoding>) -> Bytes {
let host = if let Some(host) = event.as_mut_log().remove(log_schema().host_key()) {
Some(host.to_string_lossy())
} else {
Expand Down Expand Up @@ -110,7 +110,7 @@ fn encode_event(mut event: Event, pid: u32, encoding: &EncodingConfig<Encoding>)

s.push(b'\n');

Some(Bytes::from(s))
Bytes::from(s)
}

#[cfg(test)]
Expand All @@ -137,8 +137,7 @@ mod tests {
except_fields: Some(vec!["magic".into()]),
timestamp_format: None,
},
)
.unwrap();
);

let msg =
bytes.slice(String::from_utf8_lossy(&bytes).find(": ").unwrap() + 2..bytes.len() - 1);
Expand Down
6 changes: 3 additions & 3 deletions src/sinks/util/buffer/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ impl MetricSet {
pub fn make_absolute(&mut self, metric: Metric) -> Option<Metric> {
match metric.data.kind {
MetricKind::Absolute => Some(metric),
MetricKind::Incremental => self.incremental_to_absolute(metric),
MetricKind::Incremental => Some(self.incremental_to_absolute(metric)),
}
}

Expand All @@ -281,7 +281,7 @@ impl MetricSet {
/// Convert the incremental metric into an absolute one, using the
/// state buffer to keep track of the value throughout the entire
/// application uptime.
fn incremental_to_absolute(&mut self, metric: Metric) -> Option<Metric> {
fn incremental_to_absolute(&mut self, metric: Metric) -> Metric {
let mut entry = MetricEntry(metric.into_absolute());
let mut existing = self.0.take(&entry).unwrap_or_else(|| {
// Start from zero value if the entry is not found.
Expand All @@ -290,7 +290,7 @@ impl MetricSet {
existing.data.value.add(&entry.data.value);
entry.data.value = existing.data.value.clone();
self.0.insert(existing);
Some(entry.0)
entry.0
}

/// Convert the absolute metric into an incremental by calculating
Expand Down
6 changes: 3 additions & 3 deletions src/sinks/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl SinkConfig for VectorSinkConfig {
self.send_buffer_bytes,
);

sink_config.build(cx, encode_event)
sink_config.build(cx, |event| Some(encode_event(event)))
}

fn input_type(&self) -> DataType {
Expand All @@ -92,7 +92,7 @@ enum HealthcheckError {
ConnectError { source: std::io::Error },
}

fn encode_event(event: Event) -> Option<Bytes> {
fn encode_event(event: Event) -> Bytes {
let event = proto::EventWrapper::from(event);
let event_len = event.encoded_len();
let full_len = event_len + 4;
Expand All @@ -101,7 +101,7 @@ fn encode_event(event: Event) -> Option<Bytes> {
out.put_u32(event_len as u32);
event.encode(&mut out).unwrap();

Some(out.into())
out.into()
}

#[cfg(test)]
Expand Down
Loading