SyncEvent
SyncEvent 用来触发,注册事件
基本的用法
安装
npm install ocev
引入
import {SyncEvent} from 'ocev'
初始化
定义你的事件类型,初始化一个 SyncEvent 实例
import { SyncEvent } from 'ocev'
type EventHandlerMap = {
event1: (arg1: number, arg2: string) => void
event2: (arg1: string, arg2: number) => void
}
const syncEvent = SyncEvent.new<EventHandlerMap>()
// const syncEvent = new SyncEvent<EventHandlerMap>()
上面的代码表示,该 SyncEvent 实例会触发两种事件 event1
,event2
, 分别可接受的回调函数类型是 EventHandlerMap['event1']
,EventHandlerMap['event2']
ocev 会根据提供的 EventHandler 的类型生成完善的类型提示
为什么 EventHandlerMap
没有被设计成
type EventHandlerMap = {
event1: [number,string]
event2: [string,number]
}
考虑到未来会支持 () => Promise<T>
这种函数作为回调函数,并对返回值做处理,所以有必要知道函数的返回值是什么
注册一个事件
const cancel = syncEvent.on('event1', (arg1, arg2) => {
console.log(arg1, arg2)
})
// 调用 `cancel()` 取消注册 或者 syncEvent.off("event1", callback) 取消注册
触发该事件
syncEvent.emit("event1", 1, "2")
// 上面的回调函数触发
多事件注册
const cancel = syncEvent
.on("event1", (arg1, arg2) => {
console.log(arg1, arg2)
})
.on("event2", (arg1, arg2) => {
console.log(arg1, arg2)
})
// cancel() 一次性调用即可取消上面的两次注册
API
on
/**
* 注册一个事件
* @param {K} event 事件名称
* @param {M[K]} handler 回调函数
* @param {ListenerOptions} [options] 配置,可以设置节流和防抖,不支持同时设置
* @returns {LinkableListener<M>} 返回一个函数,该函数有 on,once 两个方法,可以继续注册事件,并且记录一个上下文
*/
on: <K extends keyof M>(event: K, handler: M[K], options?: ListenerOptions) => LinkableListener<M>
ListenerOptions
// 单位都是毫秒
type ListenerOptions = {
debounce?: { // 防抖
waitMs: number // 回调函数被推迟的时间
maxWaitMs?: number // 最大可被推迟的时间,如果超过该限制,则必须调用,无法继续推迟
}
throttle?: { //节流
waitMs: number // 最大可被重复调用的时间间隔
}
}
Example
// 防抖
syncEvent.on("event1", (arg1, arg2) => {}, {
debounce: {
waitMs: 200,
maxWaitMs: 500,
},
})
LinkableListener
interface LinkableListener<M> {
(): void
once: <K extends keyof M>(type: K, handler: M[K]) => LinkableListener<M>
on: <K extends keyof M>(type: K, handler: M[K], options?: ListenerOptions) => LinkableListener<M>
}
当 on 被调用的时候,返回一个函数,该函数有两个方法 on,once
, 可以继续注册事件,并生成一个上下文,该上下文会记录每一个注册的事件形成一个链表,当最后一个cancel
被调用的时候,上下文内所有的回调函数都会被清除
Example
Example
const cancelFunction = syncEvent.on("event1", (arg1, arg2) => {})
// function can be object too
const secondCancelFunction = cancelFunction.on('event2',(arg1,arg2)=>{})
secondCancelFunction.on('event1',() => {
})
secondCancelFunction() // cancel first and second registrations
listenerCount
获取某一个事件的回调函数的注册数量 如果不传具体的事件名,返回所有的回调函数的数量
/**
* @param {K} event 事件名称
* @returns {number} 数量
*/
listenerCount: <K extends keyof M>(event?: K | undefined) => number
Example
syncEvent.on("event1", () => {}).on("event2", () => {})
syncEvent.listenerCount("event1") // 1
syncEvent.listenerCount("event2") // 1
syncEvent.listenerCount() // 2
once
用法同 on
一致, 只不过该回调只会触发一次
/**
* @param {K} event - 事件名称
* @param {M[K]} handler - The handler function to be executed when the event is emitted.
* @returns {LinkableListener<M>} - An object containing methods to manage the listener.
*/
once: <K extends keyof M>(event: K, handler: M[K]) => LinkableListener<M>
any
监听所有的事件触发
/**
* @param {Function} handler - 回调函数
* @returns {Function} - 返回一个函数,该函数调用取消 any 的注册
*/
any: <K extends keyof M = keyof M>(handler: (event: K, ...args: Arguments<M[K]>) => void) => () => void
Example
syncEvent.any((event, ...args) => {
console.log(event, args)
})
syncEvent.emit("event1", 1, "2") // event1, [1, "2"]
syncEvent.emit("event2", "3", 4) // event2, ["3", 4]
off
取消某一个事件的注册
/**
*
* @param {K} event - 事件名
* @param {M[K]} handler - 回调函数
*/
off: <K extends keyof M>(event: K, handler: M[K])
Example
const callback = () => {}
syncEvent.on("event1", callback)
// 取消注册
syncEvent.off("event1", callback)
emit
发布一个事件,并传递参数
/**
*
* @param {K} event - 事件名
* @param {...Arguments<M[K]>} args - 传递的参数
*/
emit: <K extends keyof M>(event: K, ...args: Arguments<M[K]>)
Example
syncEvent.emit("event1", 1, "2")
waitUtil
等待一个事件触发,返回它对应的响应参数的数组
/**
* @param {K} event - 事件名称
* @param {WaitUtilConfig<Arguments<M[K]>>} [config={}] - 可选配置
* @returns {Promise<Arguments<M[K]>>} - 将对应的触发的参数转换成一个数组返回
*/
waitUtil: <K extends keyof M = keyof M>(
event: K,
config?: WaitUtilConfig<Arguments<M[K]>>,
) => Promise<Arguments<M[K]>>
Example
- 基本用法
- where 判断
- 超时
- 映射异常
- 提前 cancel
queueMicrotask(()=>{
syncEvent.emit("event1", 1, "2")
})
// block 直到 emit 触发
const result = await syncEvent.waitUtil("event1")
let count = 0
setInterval(() => {
syncEvent.emit("event1", count++, "2")
}, 100)
const result = await syncEvent.waitUtil("event1", {
// 每次事件触发都会调用 where 函数, 只有当 where 返回 true 的时候,整个 Promise 才 resolve
where: (arg1, arg2) => arg1 > 10,
})
// 如果超过 timeout 的时间,事件还没有触发,抛出异常 TimeoutError
syncEvent.waitUtil("event1", {
timeout: 1000,
}).catch(error=>{
// TimeoutError
})
queueMicrotask(() => {
syncEvent.emit("event1", 1, "2")
})
const result = await syncEvent.waitUtil("event1", {
// 当 mapToError 返回 null|undefined 的时候,整个 Promise 正常 resolve,其他情况下,Promise reject 抛出该返回值
mapToError: (arg1, arg2) => (arg1 < 0 ? Error() : null),
})
const cancelRef = { current() {} }
queueMicrotask(() => {
// 这么处理 waitUtil 会抛出异常,提前结束等待
cancelRef.current()
})
// cancelRef 会被写入 current 属性,值是一个函数,调用该函数可以让 waitUtil 抛出异常
await syncEvent.waitUtil("event1", {
cancelRef,
})
waitUtilRace
等待 eventList 任意一个事件触发的时候返回该事件的触发参数
/**
* @param {((EventListItem<M, K> | K)[])} eventList - 事件名或者事件配置
* @returns {Promise<WaitUtilCommonReturnValue<M, K>>} - 返回该事件对应的值
*/
waitUtilRace: <K extends keyof M = keyof M>(
eventList: (K | EventListItem<M, K>)[],
) => Promise<WaitUtilCommonReturnValue<M, K>>
Example
- 竞争触发
- 可配置
queueMicrotask(() => {
syncEvent.emit("event1", 1, "2")
})
const { event, value } = await syncEvent.waitUtilRace(["event1", "event2"])
// event => 'event1'
// value => [1,'2']
const { event, value } = await syncEvent.waitUtilRace([
"event1",
{
event: "event2",
timeout: 1000,
where: () => true,
mapToError: () => null,
}, // 参数和 waitUtil 保持一致,多了一个 event
])
waitUtilAll
等待 eventList 中所有的事件触发,或其中一个抛出异常
/**
* @param {EventList} eventList - 事件列表
* @returns {Promise<ExtractHandlerMapArgumentsFromEventListItem<M, K, EventList>>} - 由所有事件的返回值组成,返回一个数组
*/
waitUtilAll: <
K extends keyof M = keyof M,
EventList extends readonly (K | Readonly<EventListItem<M, K>>)[] = readonly (
| K
| Readonly<EventListItem<M, K>>
)[],
>(
eventList: EventList,
) => Promise<ExtractHandlerMapArgumentsFromEventListItem<M, K, EventList>>
Example
- 基本用法
- 可配置
queueMicrotask(() => {
syncEvent.emit("event1", 1, "2")
syncEvent.emit("event2", "3", 4)
})
const [[arg1, arg2], [arg3, arg4]] = await syncEvent.waitUtilAll([
"event1",
"event2",
])
queueMicrotask(() => {
syncEvent.emit("event1", 1, "2")
syncEvent.emit("event2", "3", 4)
})
const [[arg1, arg2], [arg3, arg4]] = await syncEvent.waitUtilAll([
"event1",
{
event: "event2",
timeout: 1000,
}, // 同 waitUtil 参数一致
] as const) // 目前作者能力有限,如果要在 waitUtilAll 里面获取完善的类型提示,必须要用户手动加上 as const,如果你有好的解决方案,请帮助我,感激不尽
await syncEvent.waitUtilAll([
"event1",
{
event: "event2",
timeout: 1000,
},
] as const)
目前因为作者能力有限,如果 eventList 里面的元素不单纯是字符串的时候,用户需要加上 as const
给 eventList,waitUtilAll 才能获得明确的返回值的类型提示(其他 api 暂时没有这个问题),如果你有好的解决方案,请帮助我改进,万分感谢!
waitUtilAny
等待任意一个事件成功即可 Resolve ,所有事件都失败则 Reject,返回类型和 waitUtilRace 保持一致
/**
* @param {((EventListItem<M, K> | K)[])} eventList - 事件列表
* @returns {Promise<WaitUtilCommonReturnValue<M, K>>} - 和 waitUtilRace 返回类型一致
*/
waitUtilAny: <K extends keyof M = keyof M>(
eventList: (K | EventListItem<M, K>)[],
) => Promise<WaitUtilCommonReturnValue<M, K>>
Example
queueMicrotask(() => {
syncEvent.emit("event2", "3", 4)
syncEvent.emit("event1", 1, "2")
})
const { event, value } = await syncEvent.waitUtilAny([
"event1",
{
event: "event2",
mapToError: () => Error(),
}, // 同 waitUtil 参数一致
] as const)
// event => 'event1'
createEventStream
创建一个事件流,源源不断的传递事件
/**
* @param {K[]} eventList - An array of events to subscribe to.
* @param {EventStreamStrategy} strategy - 策略
* @returns {Object} - 异步迭代器,有方法 droppedEventCount/replacedEventCount获取被丢弃的事件数量
*/
createEventStream: <K extends keyof M = keyof M>(
eventList: K[],
strategy?: EventStreamStrategy,
) => {
[Symbol.asyncIterator]: () => {
next: () => Promise<{
value: WaitUtilCommonReturnValue<M, K>
done: boolean
}>
return: () => Promise<{
value: WaitUtilCommonReturnValue<M, K>
done: boolean
}>
}
droppedEventCount: () => number
replacedEventCount: () => number
}
Example
queueMicrotask(() => {
syncEvent.emit("event2", "3", 4)
syncEvent.emit("event1", 1, "2")
})
const eventStream = syncEvent.createEventStream(["event1", "event2"])
for await (const { event, value } of eventStream) { // 通过 for await 循环处理所有的事件
switch (event) {
case "event1": {
// value => [1, "2"]
break
}
case "event2": {
// value => ["3", 4]
break
}
default: {
throw Error("unreachable")
}
}
}
背压
考虑有如下的代码
setInterval(() => {
syncEvent.emit("event2", "3", 4)
syncEvent.emit("event1", 1, "2")
}, 100)
const eventStream = syncEvent.createEventStream(["event1", "event2"])
for await (const { event, value } of eventStream) {
// 每次处理都是 10s
await new Promise((res) => setTimeout(res, 10000))
}
SyncEvent 会创建一个队列,所有还没有被处理的事件都会被暂时存在队列里面
当事件触发的速度大于事件被处理的速度时,队列就会一直增长
如果你想处理这种情况,就需要配置 EventStreamStrategy
只有当你指定了 strategy.capacity
,并且队列里面的事件数量大于 strategy.capacity
的时候才会触发策略
EventStreamStrategy
有2种类型,分别对应三种不同的处理方式
drop
:丢弃新触发的事件,直到有消息被消费,队列有位子replace
:丢弃一个最早触发但是未处理的事件,新触发的事件推入到队列中,保证strategy.capacity
不变
如果不传递 strategy
或 strategy.capacity <= 0
的时候不会丢弃事件
Example
- drop
- replace
setInterval(() => {
syncEvent.emit("event1", 1, "2")
syncEvent.emit("event2", "3", 4) // 每次都会被丢弃,因为容量是 1
}, 100)
const eventStream = syncEvent.createEventStream(["event1", "event2"], {
capacity: 1, // 最大容量是1
strategyWhenFull: "drop",
})
for await (const { event } of eventStream) {
console.log(event) // event 永远是 event1
switch (event) {
case "event1": {
break
}
default: {
// 不会有其他事件响应
throw Error("unreachable")
}
}
}
setInterval(() => {
syncEvent.emit("event1", 1, "2") // 每次都会被丢弃,因为容量是1,每次都会被更新的事件挤掉
syncEvent.emit("event2", "3", 4)
}, 100)
const eventStream = syncEvent.createEventStream(["event1", "event2"], {
capacity: 1, // 最大容量是1
strategyWhenFull: "replace",
})
for await (const { event } of eventStream) {
console.log(event) // event 永远是 event2
switch (event) {
case "event2": {
break
}
default: {
// 不会有其他事件响应
throw Error("unreachable")
}
}
}
createReadableStream
createReadableStream 和 createEventStream 几乎是一样的
唯一的区别是 createReadableStream 返回的是 ReadableStream
Example
setInterval(() => {
syncEvent.emit("event1", 1, "2")
syncEvent.emit("event2", "3", 4)
}, 100)
const eventStream = syncEvent.createEventReadableStream(["event1", "event2"])
const reader = eventStream.getReader()
for (;;) {
const { done, value } = await reader.read()
if (done) {
break
}
switch (value.event) {
case "event1":
console.log(value.value) // 1,'2'
break
case "event2":
console.log(value.value) // '3',4
break
}
}
subscriber 和 publisher
将 SyncEvent 分成发布者和订阅者
syncEvent.subscriber // 拥有除了 'emit' 之外所有订阅者的方法
syncEvent.publisher // 只有 'emit' 方法
当你在实现自定义的类的时候,会有需要来避免暴露某些方法
Example
syncEvent.subscriber.on("event1", () => {})
syncEvent.publisher.emit("event1", 1, "2")
// 自定义一个对象,只暴露订阅的方法
class Foo {
#syncEvent = SyncEvent.new<{
event1: (arg1: number, arg2: string) => void
event2: (arg1: string, arg2: number) => void
}>()
get subscriber() {
return this.#syncEvent.subscriber
}
constructor() {
setInterval(() => {
this.#syncEvent.emit("event1", 1, "2")
})
}
}
const foo = new Foo()
foo.subscriber.on("event1", () => {})