Easy to Use ReactiveX Queue that Supports Delay/DelayExecutor/Throttle/Debounce Features Powered by RxJS.
Picture Credit: Queues in JavaScript
RxQueue
is the base class of all other queues. It extends from RxJS Subject.
Example:
import { RxQueue } from 'rx-queue'
const queue = new RxQueue()
queue.next(1)
queue.next(2)
queue.next(3)
queue.subscribe(console.log)
// Output: 1
// Output: 2
// Output: 3
DelayQueue
passes all the items and add delays between items.
Picture Credit: ReactiveX Single Operator Delay
Practical examples of DelayQueue
:
- We are calling a HTTP API which can only be called no more than ten times per second, or it will throw a
500
error.
Example:
import { DelayQueue } from 'rx-queue'
const delay = new DelayQueue(500) // set delay period time to 500 milliseconds
delay.subscribe(console.log)
delay.next(1)
delay.next(2)
delay.next(3)
// Output: 1
// Paused 500 millisecond...
// Output: 2
// Paused 500 millisecond...
// Output: 3
ThrottleQueue
passes one item and then drop all the following items in a period of time.
Picture Credit: ReactiveX Observable Throttle
By using throttle, we don't allow to our queue to pass more than once every X milliseconds.
Practical examples of ThrottleQueue
:
- User is typing text in a textarea. We want to call auto-save function when user is typing, and want it only run at most once every five minutes.
Example:
import { ThrottleQueue } from 'rx-queue'
const throttle = new ThrottleQueue(500) // set period time to 500 milliseconds
throttle.subscribe(console.log)
throttle.next(1)
throttle.next(2)
throttle.next(3)
// Output: 1
DebounceQueue
drops a item if there's another one comes in a period of time.
Picture Credit: ReactiveX Observable Debounce
The Debounce technique allow us to deal with multiple sequential items in a time period to only keep the last one.
Debouncing enforces that no more items will be passed again until a certain amount of time has passed without any new items coming.
Practical examples of DebounceQueue
:
- User is typing text in a search box. We want to make an auto-complete function call only after the user stop typing for 500 milliseconds.
Example:
import { DebounceQueue } from 'rx-queue'
const debounce = new DebounceQueue(500) // set period time to 500 milliseconds
debounce.subscribe(console.log)
debounce.next(1)
debounce.next(2)
debounce.next(3)
// Paused 500 millisecond...
// Output: 3
DelayQueueExecutor
calls functions one by one with a delay time period between calls.
If you want this feature but do not want rxjs dependencies, you can have a look on a zero dependencies alternative: [BottleNeck](https://github.com/SGrondin/bottleneck)
Picture Credit: ReactiveX Single Operator Delay
Practical examples of DelayQueueExecutor
:
- We are calling a HTTP API which can only be called no more than ten times per second, or it will throw a
500
error.
Example:
import { DelayQueueExecutor } from 'rx-queue'
const delay = new DelayQueueExecutor(500) // set delay period time to 500 milliseconds
delay.execute(() => console.log(1))
delay.execute(() => console.log(2))
delay.execute(() => console.log(3))
// Output: 1
// Paused 500 millisecond...
// Output: 2
// Paused 500 millisecond...
// Output: 3
When we have a array and need to use an async function to get the result of them, we can use Promise.all()
:
const asyncTask = async function (item) {
/**
* Some heavy task, like:
* 1. requires XXX MB of memory
* 2. make 10+ new network connections and each takes 10+ seconds
* 3. etc.
*/
}
const result = await Promise.all(
hugeArray.map(item => asyncTask),
)
Because the above example asyncTask
requires lots of resource for each task,
so if the hugeArray
has many items, like 1,000+,
then to use the Promise.all
will very likely to crash the system.
The solution is that we can use concurrencyExecuter()
to execute them in parallel with a concurrency limitation.
// async task:
const heavyTask = (n: number) => Promise.resolve(resolve => setTimeout(resolve(n^2), 100))
const results = concurrencyExecuter(
2, // concurrency
)(
heavyTask, // task async function
)(
[1, 2, 3], // task arguments
)
/**
* in the following `for` loop, we will have 2 currency tasks running at the same time.
*/
for await (const result of results) {
console.log(result)
}
That's it.
- ES Module Support
- TypeScript 4.5
concurrencyExecuter()
method added
- Upgrade RxJS to v7.1
- Upgrade TypeScript to v4.3
- Fix RxJS breaking changes #71
- Fix typo: issue #40 - rename
DelayQueueExector
toDelayQueueExecutor
- fix exception bug in browser(ie. Angular)
- Upgrade to RxJS 6
- Moved CI from Travis-ci.org to Travis-ci.com
- Support:
DelayQueue
,ThrottleQueue
,DebounceQueue
,DelayQueueExecutor
. - first version
Huan LI (李卓桓) <zixia@zixia.net>
- Code & Docs © 2017-now Huan LI <zixia@zixia.net>
- Code released under the Apache-2.0 License
- Docs released under Creative Commons