Skip to content

Commit

Permalink
Merge pull request #2599 from reduxjs/feature/batch-rtkq-rejections
Browse files Browse the repository at this point in the history
  • Loading branch information
markerikson authored Aug 15, 2022
2 parents a4a0c2d + ba9ed75 commit 3ca3c88
Show file tree
Hide file tree
Showing 11 changed files with 281 additions and 60 deletions.
57 changes: 57 additions & 0 deletions packages/toolkit/src/query/core/buildMiddleware/batchActions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import type { QueryThunk, RejectedAction } from '../buildThunks'
import type { SubMiddlewareBuilder } from './types'

// Copied from https://github.com/feross/queue-microtask
let promise: Promise<any>
const queueMicrotaskShim =
typeof queueMicrotask === 'function'
? queueMicrotask.bind(typeof window !== 'undefined' ? window : global)
: // reuse resolved promise, and allocate it lazily
(cb: () => void) =>
(promise || (promise = Promise.resolve())).then(cb).catch((err: any) =>
setTimeout(() => {
throw err
}, 0)
)

export const build: SubMiddlewareBuilder = ({
api,
context: { apiUid },
queryThunk,
reducerPath,
}) => {
return (mwApi) => {
let abortedQueryActionsQueue: RejectedAction<QueryThunk, any>[] = []
let dispatchQueued = false

return (next) => (action) => {
if (queryThunk.rejected.match(action)) {
const { condition, arg } = action.meta

if (condition && arg.subscribe) {
// request was aborted due to condition (another query already running)
// _Don't_ dispatch right away - queue it for a debounced grouped dispatch
abortedQueryActionsQueue.push(action)

if (!dispatchQueued) {
queueMicrotaskShim(() => {
mwApi.dispatch(
api.internalActions.subscriptionRequestsRejected(
abortedQueryActionsQueue
)
)
abortedQueryActionsQueue = []
})
dispatchQueued = true
}
// _Don't_ let the action reach the reducers now!
return
}
}

const result = next(action)

return result
}
}
}
46 changes: 35 additions & 11 deletions packages/toolkit/src/query/core/buildMiddleware/cacheCollection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@ import type {

export type ReferenceCacheCollection = never

function isObjectEmpty(obj: Record<any, any>) {
// Apparently a for..in loop is faster than `Object.keys()` here:
// https://stackoverflow.com/a/59787784/62937
for (let k in obj) {
// If there is at least one key, it's not empty
return false
}
return true
}

declare module '../../endpointDefinitions' {
interface QueryExtraOptions<
TagTypes extends string,
Expand Down Expand Up @@ -38,6 +48,15 @@ export const THIRTY_TWO_BIT_MAX_TIMER_SECONDS = 2_147_483_647 / 1_000 - 1
export const build: SubMiddlewareBuilder = ({ reducerPath, api, context }) => {
const { removeQueryResult, unsubscribeQueryResult } = api.internalActions

function anySubscriptionsRemainingForKey(
queryCacheKey: string,
api: SubMiddlewareApi
) {
const subscriptions =
api.getState()[reducerPath].subscriptions[queryCacheKey]
return !!subscriptions && !isObjectEmpty(subscriptions)
}

return (mwApi) => {
const currentRemovalTimeouts: QueryStateMeta<TimeoutId> = {}

Expand Down Expand Up @@ -94,6 +113,11 @@ export const build: SubMiddlewareBuilder = ({ reducerPath, api, context }) => {
] as QueryDefinition<any, any, any, any>
const keepUnusedDataFor =
endpointDefinition?.keepUnusedDataFor ?? config.keepUnusedDataFor

if (keepUnusedDataFor === Infinity) {
// Hey, user said keep this forever!
return
}
// Prevent `setTimeout` timers from overflowing a 32-bit internal int, by
// clamping the max value to be at most 1000ms less than the 32-bit max.
// Look, a 24.8-day keepalive ought to be enough for anybody, right? :)
Expand All @@ -103,18 +127,18 @@ export const build: SubMiddlewareBuilder = ({ reducerPath, api, context }) => {
Math.min(keepUnusedDataFor, THIRTY_TWO_BIT_MAX_TIMER_SECONDS)
)

const currentTimeout = currentRemovalTimeouts[queryCacheKey]
if (currentTimeout) {
clearTimeout(currentTimeout)
}
currentRemovalTimeouts[queryCacheKey] = setTimeout(() => {
const subscriptions =
api.getState()[reducerPath].subscriptions[queryCacheKey]
if (!subscriptions || Object.keys(subscriptions).length === 0) {
api.dispatch(removeQueryResult({ queryCacheKey }))
if (!anySubscriptionsRemainingForKey(queryCacheKey, api)) {
const currentTimeout = currentRemovalTimeouts[queryCacheKey]
if (currentTimeout) {
clearTimeout(currentTimeout)
}
delete currentRemovalTimeouts![queryCacheKey]
}, finalKeepUnusedDataFor * 1000)
currentRemovalTimeouts[queryCacheKey] = setTimeout(() => {
if (!anySubscriptionsRemainingForKey(queryCacheKey, api)) {
api.dispatch(removeQueryResult({ queryCacheKey }))
}
delete currentRemovalTimeouts![queryCacheKey]
}, finalKeepUnusedDataFor * 1000)
}
}
}
}
2 changes: 2 additions & 0 deletions packages/toolkit/src/query/core/buildMiddleware/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { build as buildWindowEventHandling } from './windowEventHandling'
import { build as buildCacheLifecycle } from './cacheLifecycle'
import { build as buildQueryLifecycle } from './queryLifecycle'
import { build as buildDevMiddleware } from './devMiddleware'
import { build as buildBatchActions } from './batchActions'

export function buildMiddleware<
Definitions extends EndpointDefinitions,
Expand All @@ -38,6 +39,7 @@ export function buildMiddleware<
buildWindowEventHandling,
buildCacheLifecycle,
buildQueryLifecycle,
buildBatchActions,
].map((build) =>
build({
...(input as any as BuildMiddlewareInput<
Expand Down
33 changes: 21 additions & 12 deletions packages/toolkit/src/query/core/buildSlice.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import type {
ConfigState,
} from './apiState'
import { QueryStatus } from './apiState'
import type { MutationThunk, QueryThunk } from './buildThunks'
import type { MutationThunk, QueryThunk, RejectedAction } from './buildThunks'
import { calculateProvidedByThunk } from './buildThunks'
import type {
AssertTagTypes,
Expand Down Expand Up @@ -387,6 +387,26 @@ export function buildSlice({
delete draft[queryCacheKey]![requestId]
}
},
subscriptionRequestsRejected(
draft,
action: PayloadAction<RejectedAction<QueryThunk, any>[]>
) {
// We need to process "rejected" actions caused by a component trying to start a subscription
// after there's already a cache entry. Since many components may mount at once and all want
// the same data, we use a middleware that intercepts those actions batches these together
// into a single larger action , and we'll process all of them at once.
for (let rejectedAction of action.payload) {
const {
meta: { condition, arg, requestId },
} = rejectedAction
// request was aborted due to condition (another query already running)
if (condition && arg.subscribe) {
const substate = (draft[arg.queryCacheKey] ??= {})
substate[requestId] =
arg.subscriptionOptions ?? substate[requestId] ?? {}
}
}
},
},
extraReducers: (builder) => {
builder
Expand All @@ -403,17 +423,6 @@ export function buildSlice({
arg.subscriptionOptions ?? substate[requestId] ?? {}
}
})
.addCase(
queryThunk.rejected,
(draft, { meta: { condition, arg, requestId }, error, payload }) => {
// request was aborted due to condition (another query already running)
if (condition && arg.subscribe) {
const substate = (draft[arg.queryCacheKey] ??= {})
substate[requestId] =
arg.subscriptionOptions ?? substate[requestId] ?? {}
}
}
)
// update the state to be a new object to be picked up as a "state change"
// by redux-persist's `autoMergeLevel2`
.addMatcher(hasRehydrationInfo, (draft) => ({ ...draft }))
Expand Down
37 changes: 32 additions & 5 deletions packages/toolkit/src/query/react/buildHooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -696,16 +696,25 @@ export function buildHooks<Definitions extends EndpointDefinitions>({
pollingInterval,
})

const lastRenderHadSubscription = useRef(false)

const promiseRef = useRef<QueryActionCreatorResult<any>>()

let { queryCacheKey, requestId } = promiseRef.current || {}
const subscriptionRemoved = useSelector(
const currentRenderHasSubscription = useSelector(
(state: RootState<Definitions, string, string>) =>
!!queryCacheKey &&
!!requestId &&
!state[api.reducerPath].subscriptions[queryCacheKey]?.[requestId]
)

const subscriptionRemoved =
!currentRenderHasSubscription && lastRenderHadSubscription.current

usePossiblyImmediateEffect(() => {
lastRenderHadSubscription.current = currentRenderHasSubscription
})

usePossiblyImmediateEffect((): void | undefined => {
promiseRef.current = undefined
}, [subscriptionRemoved])
Expand Down Expand Up @@ -736,6 +745,7 @@ export function buildHooks<Definitions extends EndpointDefinitions>({
forceRefetch: refetchOnMountOrArgChange,
})
)

promiseRef.current = promise
} else if (stableSubscriptionOptions !== lastSubscriptionOptions) {
lastPromise.updateSubscriptionOptions(stableSubscriptionOptions)
Expand Down Expand Up @@ -923,8 +933,9 @@ export function buildHooks<Definitions extends EndpointDefinitions>({
...options,
})

const { data, status, isLoading, isSuccess, isError, error } = queryStateResults;
useDebugValue({ data, status, isLoading, isSuccess, isError, error });
const { data, status, isLoading, isSuccess, isError, error } =
queryStateResults
useDebugValue({ data, status, isLoading, isSuccess, isError, error })

return useMemo(
() => ({ ...queryStateResults, ...querySubscriptionResults }),
Expand Down Expand Up @@ -993,8 +1004,24 @@ export function buildHooks<Definitions extends EndpointDefinitions>({
})
}, [dispatch, fixedCacheKey, promise, requestId])

const { endpointName, data, status, isLoading, isSuccess, isError, error } = currentState;
useDebugValue({ endpointName, data, status, isLoading, isSuccess, isError, error });
const {
endpointName,
data,
status,
isLoading,
isSuccess,
isError,
error,
} = currentState
useDebugValue({
endpointName,
data,
status,
isLoading,
isSuccess,
isError,
error,
})

const finalState = useMemo(
() => ({ ...currentState, originalArgs, reset }),
Expand Down
15 changes: 9 additions & 6 deletions packages/toolkit/src/query/tests/buildHooks.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -1162,9 +1162,12 @@ describe('hooks tests', () => {
})

test('useMutation return value contains originalArgs', async () => {
const { result } = renderHook(() => api.endpoints.updateUser.useMutation(), {
wrapper: storeRef.wrapper,
})
const { result } = renderHook(
() => api.endpoints.updateUser.useMutation(),
{
wrapper: storeRef.wrapper,
}
)
const arg = { name: 'Foo' }

const firstRenderResult = result.current
Expand Down Expand Up @@ -1955,13 +1958,13 @@ describe('hooks with createApi defaults set', () => {

const addBtn = screen.getByTestId('addPost')

await waitFor(() => expect(getRenderCount()).toBe(3))
await waitFor(() => expect(getRenderCount()).toBe(4))

fireEvent.click(addBtn)
await waitFor(() => expect(getRenderCount()).toBe(5))
await waitFor(() => expect(getRenderCount()).toBe(6))
fireEvent.click(addBtn)
fireEvent.click(addBtn)
await waitFor(() => expect(getRenderCount()).toBe(7))
await waitFor(() => expect(getRenderCount()).toBe(8))
})

test('useQuery with selectFromResult option serves a deeply memoized value and does not rerender unnecessarily', async () => {
Expand Down
15 changes: 14 additions & 1 deletion packages/toolkit/src/query/tests/cacheCollection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ describe(`query: await cleanup, keepUnusedDataFor set`, () => {
query: () => '/success',
keepUnusedDataFor: 0,
}),
query4: build.query<unknown, string>({
query: () => '/success',
keepUnusedDataFor: Infinity,
}),
}),
keepUnusedDataFor: 29,
})
Expand All @@ -126,9 +130,18 @@ describe(`query: await cleanup, keepUnusedDataFor set`, () => {
expect(onCleanup).not.toHaveBeenCalled()
store.dispatch(api.endpoints.query3.initiate('arg')).unsubscribe()
expect(onCleanup).not.toHaveBeenCalled()
jest.advanceTimersByTime(1), await waitMs()
jest.advanceTimersByTime(1)
await waitMs()
expect(onCleanup).toHaveBeenCalled()
})

test('endpoint keepUnusedDataFor: Infinity', async () => {
expect(onCleanup).not.toHaveBeenCalled()
store.dispatch(api.endpoints.query4.initiate('arg')).unsubscribe()
expect(onCleanup).not.toHaveBeenCalled()
jest.advanceTimersByTime(THIRTY_TWO_BIT_MAX_INT)
expect(onCleanup).not.toHaveBeenCalled()
})
})

function storeForApi<
Expand Down
Loading

0 comments on commit 3ca3c88

Please sign in to comment.