串流更新
總覽
RTK Query 讓您可以接收持續性查詢的串流更新。這讓查詢能夠建立與伺服器之間的持續連線(通常使用 WebSocket),並在從伺服器收到其他資訊時,將更新套用至快取資料。
串流更新可用於讓 API 接收後端資料的即時更新,例如建立新項目或更新重要屬性。
若要為查詢啟用串流更新,請將非同步 onCacheEntryAdded
函式傳遞至查詢,包括在收到串流資料時如何更新查詢的邏輯。有關更多詳細資訊,請參閱 onCacheEntryAdded
API 參考。
何時使用串流更新
查詢資料的更新應主要透過 輪詢
間歇性地以某個間隔進行,使用 快取失效
根據與查詢和變更關聯的標籤來使資料失效,或使用 refetchOnMountOrArgChange
在使用資料的元件掛載時擷取最新資料。
但是,串流更新對於涉及下列情況的場景特別有用
- 對大型物件進行頻繁的小幅度變更。與重複輪詢大型物件不同,物件可以用初始查詢擷取,而串流更新可以在收到更新時更新個別屬性。
- 外部事件驅動的更新。資料可能會由伺服器或其他外部使用者變更,而預期會向活躍使用者顯示即時更新,單獨輪詢會導致查詢之間出現過時資料的期間,導致狀態容易不同步。串流更新可以在更新發生時更新所有活躍的用戶端,而不用等到下一個間隔經過。
受益於串流更新的範例使用案例包括
- GraphQL 訂閱
- 即時聊天應用程式
- 即時多人遊戲
- 多個同時使用者進行協作式文件編輯
使用 onCacheEntryAdded
生命週期
onCacheEntryAdded
生命週期回呼讓您撰寫任意非同步邏輯,該邏輯會在新快取項目新增至 RTK Query 快取後執行(即在元件為特定端點 + 參數組合建立新訂閱後)。
onCacheEntryAdded
會使用兩個引數呼叫:傳遞至訂閱的 arg
,以及包含「生命週期承諾」和公用函式的選項物件。您可以使用這些來撰寫順序邏輯,等待新增資料、啟動伺服器連線、套用部分更新,以及在移除查詢訂閱時清除連線。
通常,您會 await cacheDataLoaded
來判斷何時已擷取第一筆資料,然後使用 updateCacheData
公用程式在收到訊息時套用串流更新。updateCacheData
是 Immer 驅動的回呼,會收到目前快取值的 草稿
。您可以「變更」草稿值,根據收到的值視需要更新它。然後,RTK Query 會發送一個動作,根據這些變更套用差異化修補程式。
最後,你可以await cacheEntryRemoved
,以了解何時清除任何伺服器連線。
串流更新範例
Websocket 聊天 API
- TypeScript
- JavaScript
import { createApi, fetchBaseQuery } from '@reduxjs/toolkit/query/react'
import { isMessage } from './schemaValidators'
export type Channel = 'redux' | 'general'
export interface Message {
id: number
channel: Channel
userName: string
text: string
}
export const api = createApi({
baseQuery: fetchBaseQuery({ baseUrl: '/' }),
endpoints: (build) => ({
getMessages: build.query<Message[], Channel>({
query: (channel) => `messages/${channel}`,
async onCacheEntryAdded(
arg,
{ updateCachedData, cacheDataLoaded, cacheEntryRemoved }
) {
// create a websocket connection when the cache subscription starts
const ws = new WebSocket('ws://127.0.0.1:8080')
try {
// wait for the initial query to resolve before proceeding
await cacheDataLoaded
// when data is received from the socket connection to the server,
// if it is a message and for the appropriate channel,
// update our query result with the received message
const listener = (event: MessageEvent) => {
const data = JSON.parse(event.data)
if (!isMessage(data) || data.channel !== arg) return
updateCachedData((draft) => {
draft.push(data)
})
}
ws.addEventListener('message', listener)
} catch {
// no-op in case `cacheEntryRemoved` resolves before `cacheDataLoaded`,
// in which case `cacheDataLoaded` will throw
}
// cacheEntryRemoved will resolve when the cache subscription is no longer active
await cacheEntryRemoved
// perform cleanup steps once the `cacheEntryRemoved` promise resolves
ws.close()
},
}),
}),
})
export const { useGetMessagesQuery } = api
import { createApi, fetchBaseQuery } from '@reduxjs/toolkit/query/react'
import { isMessage } from './schemaValidators'
export const api = createApi({
baseQuery: fetchBaseQuery({ baseUrl: '/' }),
endpoints: (build) => ({
getMessages: build.query({
query: (channel) => `messages/${channel}`,
async onCacheEntryAdded(
arg,
{ updateCachedData, cacheDataLoaded, cacheEntryRemoved }
) {
// create a websocket connection when the cache subscription starts
const ws = new WebSocket('ws://127.0.0.1:8080')
try {
// wait for the initial query to resolve before proceeding
await cacheDataLoaded
// when data is received from the socket connection to the server,
// if it is a message and for the appropriate channel,
// update our query result with the received message
const listener = (event) => {
const data = JSON.parse(event.data)
if (!isMessage(data) || data.channel !== arg) return
updateCachedData((draft) => {
draft.push(data)
})
}
ws.addEventListener('message', listener)
} catch {
// no-op in case `cacheEntryRemoved` resolves before `cacheDataLoaded`,
// in which case `cacheDataLoaded` will throw
}
// cacheEntryRemoved will resolve when the cache subscription is no longer active
await cacheEntryRemoved
// perform cleanup steps once the `cacheEntryRemoved` promise resolves
ws.close()
},
}),
}),
})
export const { useGetMessagesQuery } = api
預期事項
當觸發 getMessages
查詢時(例如透過使用 useGetMessagesQuery()
鉤子掛載元件),將會根據端點的序列化引數新增一個快取條目
。關聯的查詢將根據 query
屬性觸發,以擷取快取的初始資料。同時,非同步 onCacheEntryAdded
回呼將會開始,並建立新的 WebSocket 連線。一旦收到初始查詢的回應,快取將會填入回應資料,而 cacheDataLoaded
承諾將會解決。在等待 cacheDataLoaded
承諾後,message
事件監聽器將會新增到 WebSocket 連線,當收到關聯訊息時,它會更新快取資料。
當沒有更多資料的有效訂閱時(例如當訂閱的元件在一段足夠的時間內保持未掛載時),cacheEntryRemoved
承諾將會解決,允許剩餘的程式碼執行並關閉 websocket 連線。RTK Query 也會從快取中移除關聯的資料。
如果稍後執行對應快取條目的查詢,它將覆寫整個快取條目,而串流更新監聽器將會繼續處理已更新的資料。
具有轉換後回應形狀的 Websocket 聊天 API
- TypeScript
- JavaScript
import { createApi, fetchBaseQuery } from '@reduxjs/toolkit/query/react'
import { createEntityAdapter } from '@reduxjs/toolkit'
import type { EntityState } from '@reduxjs/toolkit'
import { isMessage } from './schemaValidators'
export type Channel = 'redux' | 'general'
export interface Message {
id: number
channel: Channel
userName: string
text: string
}
const messagesAdapter = createEntityAdapter<Message>()
export const api = createApi({
baseQuery: fetchBaseQuery({ baseUrl: '/' }),
endpoints: (build) => ({
getMessages: build.query<EntityState<Message, number>, Channel>({
query: (channel) => `messages/${channel}`,
transformResponse(response: Message[]) {
return messagesAdapter.addMany(
messagesAdapter.getInitialState(),
response
)
},
async onCacheEntryAdded(
arg,
{ updateCachedData, cacheDataLoaded, cacheEntryRemoved }
) {
const ws = new WebSocket('ws://127.0.0.1:8080')
try {
await cacheDataLoaded
const listener = (event: MessageEvent) => {
const data = JSON.parse(event.data)
if (!isMessage(data) || data.channel !== arg) return
updateCachedData((draft) => {
messagesAdapter.upsertOne(draft, data)
})
}
ws.addEventListener('message', listener)
} catch {}
await cacheEntryRemoved
ws.close()
},
}),
}),
})
export const { useGetMessagesQuery } = api
import { createApi, fetchBaseQuery } from '@reduxjs/toolkit/query/react'
import { createEntityAdapter } from '@reduxjs/toolkit'
import { isMessage } from './schemaValidators'
const messagesAdapter = createEntityAdapter()
export const api = createApi({
baseQuery: fetchBaseQuery({ baseUrl: '/' }),
endpoints: (build) => ({
getMessages: build.query({
query: (channel) => `messages/${channel}`,
transformResponse(response) {
return messagesAdapter.addMany(
messagesAdapter.getInitialState(),
response
)
},
async onCacheEntryAdded(
arg,
{ updateCachedData, cacheDataLoaded, cacheEntryRemoved }
) {
const ws = new WebSocket('ws://127.0.0.1:8080')
try {
await cacheDataLoaded
const listener = (event) => {
const data = JSON.parse(event.data)
if (!isMessage(data) || data.channel !== arg) return
updateCachedData((draft) => {
messagesAdapter.upsertOne(draft, data)
})
}
ws.addEventListener('message', listener)
} catch {}
await cacheEntryRemoved
ws.close()
},
}),
}),
})
export const { useGetMessagesQuery } = api
這個範例示範如何修改前一個範例,以允許在將資料新增到快取時轉換回應形狀。
例如,資料從這個形狀轉換
[
{
id: 0
channel: 'redux'
userName: 'Mark'
text: 'Welcome to #redux!'
},
{
id: 1
channel: 'redux'
userName: 'Lenz'
text: 'Glad to be here!'
},
]
到這個
{
// The unique IDs of each item. Must be strings or numbers
ids: [0, 1],
// A lookup table mapping entity IDs to the corresponding entity objects
entities: {
0: {
id: 0,
channel: "redux",
userName: "Mark",
text: "Welcome to #redux!",
},
1: {
id: 1,
channel: "redux",
userName: "Lenz",
text: "Glad to be here!",
},
},
};
要記住的一個重點是,onCacheEntryAdded
回呼中快取資料的更新必須符合轉換後的資料形狀,而快取資料會呈現此形狀。這個範例顯示如何使用createEntityAdapter
進行初始的 transformResponse
,以及在收到串流更新時再次使用,以將收到的項目插入快取資料,同時維持正規化的狀態結構。