-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathRateGate.cs
186 lines (162 loc) · 7.66 KB
/
RateGate.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
using System;
using System.Collections.Concurrent;
using System.Threading;
namespace PennedObjects.RateLimiting {
/// <summary>
/// http://www.pennedobjects.com/2010/10/better-rate-limiting-with-dot-net/
/// Used to control the rate of some occurrence per unit of time.
/// </summary>
/// <remarks>
/// <para>
/// To control the rate of an action using a <see cref="RateGate" />,
/// code should simply call <see cref="WaitToProceed()" /> prior to
/// performing the action. <see cref="WaitToProceed()" /> will block
/// the current thread until the action is allowed based on the rate
/// limit.
/// </para>
/// <para>
/// This class is thread safe. A single <see cref="RateGate" /> instance
/// may be used to control the rate of an occurrence across multiple
/// threads.
/// </para>
/// </remarks>
public class RateGate : IDisposable {
/// <summary>
/// Timer used to trigger exiting the semaphore.
/// </summary>
private readonly Timer _exitTimer;
private readonly ConcurrentQueue<int> _exitTimes;
/// <summary>
/// Semaphore used to count and limit the number of occurrences per
/// </summary>
private readonly SemaphoreSlim _semaphore;
/// <summary>
/// Whether this instance is disposed.
/// </summary>
private bool _isDisposed;
/// <summary>
/// Initializes a <see cref="RateGate" /> with a rate of <paramref name="occurrences" />
/// per <paramref name="timeUnit" />.
/// </summary>
/// <param name="occurrences">Number of occurrences allowed per unit of time.</param>
/// <param name="timeUnit">Length of the time unit.</param>
/// <exception cref="ArgumentOutOfRangeException">
/// If <paramref name="occurrences" /> or <paramref name="timeUnit" /> is negative.
/// </exception>
public RateGate(int occurrences, TimeSpan timeUnit) {
// Check the arguments.
if (occurrences <= 0)
throw new ArgumentOutOfRangeException("occurrences", "Number of occurrences must be a positive integer");
if (timeUnit != timeUnit.Duration())
throw new ArgumentOutOfRangeException("timeUnit", "Time unit must be a positive span of time");
if (timeUnit >= TimeSpan.FromMilliseconds(UInt32.MaxValue))
throw new ArgumentOutOfRangeException("timeUnit", "Time unit must be less than 2^32 milliseconds");
Occurrences = occurrences;
TimeUnitMilliseconds = (int) timeUnit.TotalMilliseconds;
// Create the semaphore, with the number of occurrences as the maximum count.
_semaphore = new SemaphoreSlim(Occurrences, Occurrences);
// Create a queue to hold the semaphore exit times.
_exitTimes = new ConcurrentQueue<int>();
// Create a timer to exit the semaphore. Use the time unit as the original
// interval length because that's the earliest we will need to exit the semaphore.
_exitTimer = new Timer(ExitTimerCallback, null, TimeUnitMilliseconds, -1);
}
/// <summary>
/// Number of occurrences allowed per unit of time.
/// </summary>
public int Occurrences { get; private set; }
/// <summary>
/// The length of the time unit, in milliseconds.
/// </summary>
public int TimeUnitMilliseconds { get; private set; }
/// <summary>
/// Releases unmanaged resources held by an instance of this class.
/// </summary>
public void Dispose() {
Dispose(true);
GC.SuppressFinalize(this);
}
/// <summary>
/// Callback for the exit timer that exits the semaphore based on exit times
/// in the queue and then sets the timer for the nextexit time.
/// </summary>
/// <param name="state"></param>
private void ExitTimerCallback(object state) {
// While there are exit times that are passed due still in the queue,
// exit the semaphore and dequeue the exit time.
int exitTime;
while (_exitTimes.TryPeek(out exitTime)
&& unchecked(exitTime - Environment.TickCount) <= 0) {
_semaphore.Release();
_exitTimes.TryDequeue(out exitTime);
}
// Try to get the next exit time from the queue and compute
// the time until the next check should take place. If the
// queue is empty, then no exit times will occur until at least
// one time unit has passed.
int timeUntilNextCheck;
if (_exitTimes.TryPeek(out exitTime))
timeUntilNextCheck = unchecked(exitTime - Environment.TickCount);
else
timeUntilNextCheck = TimeUnitMilliseconds;
// Set the timer.
_exitTimer.Change(timeUntilNextCheck, -1);
}
/// <summary>
/// Blocks the current thread until allowed to proceed or until the
/// specified timeout elapses.
/// </summary>
/// <param name="millisecondsTimeout">Number of milliseconds to wait, or -1 to wait indefinitely.</param>
/// <returns>true if the thread is allowed to proceed, or false if timed out</returns>
public bool WaitToProceed(int millisecondsTimeout) {
// Check the arguments.
if (millisecondsTimeout < -1)
throw new ArgumentOutOfRangeException("millisecondsTimeout");
CheckDisposed();
// Block until we can enter the semaphore or until the timeout expires.
bool entered = _semaphore.Wait(millisecondsTimeout);
// If we entered the semaphore, compute the corresponding exit time
// and add it to the queue.
if (entered) {
int timeToExit = unchecked(Environment.TickCount + TimeUnitMilliseconds);
_exitTimes.Enqueue(timeToExit);
}
return entered;
}
/// <summary>
/// Blocks the current thread until allowed to proceed or until the
/// specified timeout elapses.
/// </summary>
/// <param name="timeout"></param>
/// <returns>true if the thread is allowed to proceed, or false if timed out</returns>
public bool WaitToProceed(TimeSpan timeout) {
return WaitToProceed((int) timeout.TotalMilliseconds);
}
/// <summary>
/// Blocks the current thread indefinitely until allowed to proceed.
/// </summary>
public void WaitToProceed() {
WaitToProceed(Timeout.Infinite);
}
// Throws an ObjectDisposedException if this object is disposed.
private void CheckDisposed() {
if (_isDisposed)
throw new ObjectDisposedException("RateGate is already disposed");
}
/// <summary>
/// Releases unmanaged resources held by an instance of this class.
/// </summary>
/// <param name="isDisposing">Whether this object is being disposed.</param>
protected virtual void Dispose(bool isDisposing) {
if (!_isDisposed) {
if (isDisposing) {
// The semaphore and timer both implement IDisposable and
// therefore must be disposed.
_semaphore.Dispose();
_exitTimer.Dispose();
_isDisposed = true;
}
}
}
}
}