-
Notifications
You must be signed in to change notification settings - Fork 9
/
harness.rs
179 lines (156 loc) · 6.27 KB
/
harness.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
use BASE_OPS_PER_MIN;
use WorkerCommand;
use client::{LobstersClient, LobstersRequest};
use crossbeam_channel;
use execution::{self, id_to_slug, Sampler};
use rand::{self, Rng};
use std::sync::{Arc, Barrier, Mutex};
use std::{thread, time};
use tokio_core;
pub(crate) fn run<C, I>(
load: execution::Workload,
in_flight: usize,
mut factory: I,
prime: bool,
) -> (
f64,
Vec<thread::JoinHandle<(f64, execution::Stats, execution::Stats)>>,
usize,
)
where
I: Send + 'static,
C: LobstersClient<Factory = I> + 'static,
{
// generating a request takes a while because we have to generate random numbers (including
// zipfs). so, depending on the target load, we may need more than one load generation
// thread. we'll make them all share the pool of issuers though.
let mut target = BASE_OPS_PER_MIN as f64 * load.req_scale / 60.0;
let generator_capacity = 100_000.0; // req/s == 10 µs to generate a request
let ngen = (target / generator_capacity).ceil() as usize;
target /= ngen as f64;
let nthreads = load.threads;
let warmup = load.warmup;
let runtime = load.runtime;
if prime {
C::setup(&mut factory);
}
let factory = Arc::new(Mutex::new(factory));
let (pool, jobs) = crossbeam_channel::unbounded();
let workers: Vec<_> = (0..nthreads)
.map(|i| {
let jobs = jobs.clone();
let factory = factory.clone();
thread::Builder::new()
.name(format!("issuer-{}", i))
.spawn(move || {
let core = tokio_core::reactor::Core::new().unwrap();
let c = C::spawn(&mut *factory.lock().unwrap(), &core.handle());
execution::issuer::run(warmup, runtime, in_flight, core, c, jobs)
// NOTE: there may still be a bunch of requests in the queue here,
// but core.run() will return when the stream is closed.
})
.unwrap()
})
.collect();
let barrier = Arc::new(Barrier::new(nthreads + 1));
let now = time::Instant::now();
// compute how many of each thing there will be in the database after scaling by mem_scale
let sampler = Sampler::new(load.mem_scale);
let nstories = sampler.nstories();
// then, log in all the users
for u in 0..sampler.nusers() {
pool.send(WorkerCommand::Request(now, Some(u), LobstersRequest::Login))
.unwrap();
}
if prime {
println!("--> priming database");
let mut rng = rand::thread_rng();
// wait for all threads to be ready
// we don't normally know which worker thread will receive any given command (it's mpmc
// after all), but since a ::Wait causes the receiving thread to *block*, we know that once
// it receives one, it can't receive another until the barrier has been passed! Therefore,
// sending `nthreads` barriers should ensure that every thread gets one
for _ in 0..nthreads {
pool.send(WorkerCommand::Wait(barrier.clone())).unwrap();
}
barrier.wait();
// first, we need to prime the database stories!
for id in 0..nstories {
// NOTE: we're assuming that users who vote much also submit many stories
let req = LobstersRequest::Submit {
id: id_to_slug(id),
title: format!("Base article {}", id),
};
pool.send(WorkerCommand::Request(
now,
Some(sampler.user(&mut rng)),
req,
)).unwrap();
}
// and as many comments
for id in 0..sampler.ncomments() {
let story = id % nstories; // TODO: distribution
// synchronize occasionally to ensure that we can safely generate parent comments
if story == 0 {
for _ in 0..nthreads {
pool.send(WorkerCommand::Wait(barrier.clone())).unwrap();
}
barrier.wait();
}
let parent = if rng.gen_weighted_bool(2) {
// we need to pick a parent in the same story
let generated_comments = id - story;
// how many stories to we know there are per story?
let generated_comments_per_story = generated_comments / nstories;
// pick the nth comment to chosen story
if generated_comments_per_story != 0 {
let story_comment = rng.gen_range(0, generated_comments_per_story);
Some(story + nstories * story_comment)
} else {
None
}
} else {
None
};
let req = LobstersRequest::Comment {
id: id_to_slug(id),
story: id_to_slug(story),
parent: parent.map(id_to_slug),
};
// NOTE: we're assuming that users who vote much also submit many stories
pool.send(WorkerCommand::Request(
now,
Some(sampler.user(&mut rng)),
req,
)).unwrap();
}
// wait for all threads to finish priming comments
// the addition of the ::Wait barrier will also ensure that start is (re)set
for _ in 0..nthreads {
pool.send(WorkerCommand::Wait(barrier.clone())).unwrap();
}
barrier.wait();
println!("--> finished priming database");
}
// wait for all threads to be ready (and set their start time correctly)
for _ in 0..nthreads {
pool.send(WorkerCommand::Start(barrier.clone())).unwrap();
}
barrier.wait();
let generators: Vec<_> = (0..ngen)
.map(|geni| {
let pool = pool.clone();
let load = load.clone();
let sampler = sampler.clone();
thread::Builder::new()
.name(format!("load-gen{}", geni))
.spawn(move || execution::generator::run::<C>(load, sampler, pool, target))
.unwrap()
})
.collect();
drop(pool);
let gps = generators.into_iter().map(|gen| gen.join().unwrap()).sum();
// how many operations were left in the queue at the end?
let dropped = jobs.iter().count();
(gps, workers, dropped)
}