Usage > Streaming Updates: updating cache from pushed messages">Usage > Streaming Updates: updating cache from pushed messages">
跳至主要內容

串流更新

總覽

RTK Query 讓您可以接收持續性查詢的串流更新。這讓查詢能夠建立與伺服器之間的持續連線(通常使用 WebSocket),並在從伺服器收到其他資訊時,將更新套用至快取資料。

串流更新可用於讓 API 接收後端資料的即時更新,例如建立新項目或更新重要屬性。

若要為查詢啟用串流更新,請將非同步 onCacheEntryAdded 函式傳遞至查詢,包括在收到串流資料時如何更新查詢的邏輯。有關更多詳細資訊,請參閱 onCacheEntryAdded API 參考

何時使用串流更新

查詢資料的更新應主要透過 輪詢 間歇性地以某個間隔進行,使用 快取失效 根據與查詢和變更關聯的標籤來使資料失效,或使用 refetchOnMountOrArgChange 在使用資料的元件掛載時擷取最新資料。

但是,串流更新對於涉及下列情況的場景特別有用

  • 對大型物件進行頻繁的小幅度變更。與重複輪詢大型物件不同,物件可以用初始查詢擷取,而串流更新可以在收到更新時更新個別屬性。
  • 外部事件驅動的更新。資料可能會由伺服器或其他外部使用者變更,而預期會向活躍使用者顯示即時更新,單獨輪詢會導致查詢之間出現過時資料的期間,導致狀態容易不同步。串流更新可以在更新發生時更新所有活躍的用戶端,而不用等到下一個間隔經過。

受益於串流更新的範例使用案例包括

  • GraphQL 訂閱
  • 即時聊天應用程式
  • 即時多人遊戲
  • 多個同時使用者進行協作式文件編輯

使用 onCacheEntryAdded 生命週期

onCacheEntryAdded 生命週期回呼讓您撰寫任意非同步邏輯,該邏輯會在新快取項目新增至 RTK Query 快取後執行(即在元件為特定端點 + 參數組合建立新訂閱後)。

onCacheEntryAdded 會使用兩個引數呼叫:傳遞至訂閱的 arg,以及包含「生命週期承諾」和公用函式的選項物件。您可以使用這些來撰寫順序邏輯,等待新增資料、啟動伺服器連線、套用部分更新,以及在移除查詢訂閱時清除連線。

通常,您會 await cacheDataLoaded 來判斷何時已擷取第一筆資料,然後使用 updateCacheData 公用程式在收到訊息時套用串流更新。updateCacheData 是 Immer 驅動的回呼,會收到目前快取值的 草稿。您可以「變更」草稿值,根據收到的值視需要更新它。然後,RTK Query 會發送一個動作,根據這些變更套用差異化修補程式。

最後,你可以await cacheEntryRemoved,以了解何時清除任何伺服器連線。

串流更新範例

Websocket 聊天 API

import { createApi, fetchBaseQuery } from '@reduxjs/toolkit/query/react'
import { isMessage } from './schemaValidators'

export type Channel = 'redux' | 'general'

export interface Message {
id: number
channel: Channel
userName: string
text: string
}

export const api = createApi({
baseQuery: fetchBaseQuery({ baseUrl: '/' }),
endpoints: (build) => ({
getMessages: build.query<Message[], Channel>({
query: (channel) => `messages/${channel}`,
async onCacheEntryAdded(
arg,
{ updateCachedData, cacheDataLoaded, cacheEntryRemoved }
) {
// create a websocket connection when the cache subscription starts
const ws = new WebSocket('ws://127.0.0.1:8080')
try {
// wait for the initial query to resolve before proceeding
await cacheDataLoaded

// when data is received from the socket connection to the server,
// if it is a message and for the appropriate channel,
// update our query result with the received message
const listener = (event: MessageEvent) => {
const data = JSON.parse(event.data)
if (!isMessage(data) || data.channel !== arg) return

updateCachedData((draft) => {
draft.push(data)
})
}

ws.addEventListener('message', listener)
} catch {
// no-op in case `cacheEntryRemoved` resolves before `cacheDataLoaded`,
// in which case `cacheDataLoaded` will throw
}
// cacheEntryRemoved will resolve when the cache subscription is no longer active
await cacheEntryRemoved
// perform cleanup steps once the `cacheEntryRemoved` promise resolves
ws.close()
},
}),
}),
})

export const { useGetMessagesQuery } = api

預期事項

當觸發 getMessages 查詢時(例如透過使用 useGetMessagesQuery() 鉤子掛載元件),將會根據端點的序列化引數新增一個快取條目。關聯的查詢將根據 query 屬性觸發,以擷取快取的初始資料。同時,非同步 onCacheEntryAdded 回呼將會開始,並建立新的 WebSocket 連線。一旦收到初始查詢的回應,快取將會填入回應資料,而 cacheDataLoaded 承諾將會解決。在等待 cacheDataLoaded 承諾後,message 事件監聽器將會新增到 WebSocket 連線,當收到關聯訊息時,它會更新快取資料。

當沒有更多資料的有效訂閱時(例如當訂閱的元件在一段足夠的時間內保持未掛載時),cacheEntryRemoved 承諾將會解決,允許剩餘的程式碼執行並關閉 websocket 連線。RTK Query 也會從快取中移除關聯的資料。

如果稍後執行對應快取條目的查詢,它將覆寫整個快取條目,而串流更新監聽器將會繼續處理已更新的資料。

具有轉換後回應形狀的 Websocket 聊天 API

import { createApi, fetchBaseQuery } from '@reduxjs/toolkit/query/react'
import { createEntityAdapter } from '@reduxjs/toolkit'
import type { EntityState } from '@reduxjs/toolkit'
import { isMessage } from './schemaValidators'

export type Channel = 'redux' | 'general'

export interface Message {
id: number
channel: Channel
userName: string
text: string
}

const messagesAdapter = createEntityAdapter<Message>()
export const api = createApi({
baseQuery: fetchBaseQuery({ baseUrl: '/' }),
endpoints: (build) => ({
getMessages: build.query<EntityState<Message, number>, Channel>({
query: (channel) => `messages/${channel}`,
transformResponse(response: Message[]) {
return messagesAdapter.addMany(
messagesAdapter.getInitialState(),
response
)
},
async onCacheEntryAdded(
arg,
{ updateCachedData, cacheDataLoaded, cacheEntryRemoved }
) {
const ws = new WebSocket('ws://127.0.0.1:8080')
try {
await cacheDataLoaded

const listener = (event: MessageEvent) => {
const data = JSON.parse(event.data)
if (!isMessage(data) || data.channel !== arg) return

updateCachedData((draft) => {
messagesAdapter.upsertOne(draft, data)
})
}

ws.addEventListener('message', listener)
} catch {}
await cacheEntryRemoved
ws.close()
},
}),
}),
})

export const { useGetMessagesQuery } = api

這個範例示範如何修改前一個範例,以允許在將資料新增到快取時轉換回應形狀。

例如,資料從這個形狀轉換

[
{
id: 0
channel: 'redux'
userName: 'Mark'
text: 'Welcome to #redux!'
},
{
id: 1
channel: 'redux'
userName: 'Lenz'
text: 'Glad to be here!'
},
]

到這個

{
// The unique IDs of each item. Must be strings or numbers
ids: [0, 1],
// A lookup table mapping entity IDs to the corresponding entity objects
entities: {
0: {
id: 0,
channel: "redux",
userName: "Mark",
text: "Welcome to #redux!",
},
1: {
id: 1,
channel: "redux",
userName: "Lenz",
text: "Glad to be here!",
},
},
};

要記住的一個重點是,onCacheEntryAdded 回呼中快取資料的更新必須符合轉換後的資料形狀,而快取資料會呈現此形狀。這個範例顯示如何使用createEntityAdapter進行初始的 transformResponse,以及在收到串流更新時再次使用,以將收到的項目插入快取資料,同時維持正規化的狀態結構。