跳到主要内容

SyncEvent

SyncEvent 用来触发,注册事件

基本的用法

安装

npm install ocev

引入

import {SyncEvent} from 'ocev'

初始化

定义你的事件类型,初始化一个 SyncEvent 实例

import { SyncEvent } from 'ocev'

type EventHandlerMap = {
event1: (arg1: number, arg2: string) => void
event2: (arg1: string, arg2: number) => void
}

const syncEvent = SyncEvent.new<EventHandlerMap>()
// const syncEvent = new SyncEvent<EventHandlerMap>()

上面的代码表示,该 SyncEvent 实例会触发两种事件 event1,event2, 分别可接受的回调函数类型是 EventHandlerMap['event1'],EventHandlerMap['event2']

ocev 会根据提供的 EventHandler 的类型生成完善的类型提示

提示

为什么 EventHandlerMap 没有被设计成

type EventHandlerMap = {
event1: [number,string]
event2: [string,number]
}

考虑到未来会支持 () => Promise<T> 这种函数作为回调函数,并对返回值做处理,所以有必要知道函数的返回值是什么

注册一个事件

const cancel = syncEvent.on('event1', (arg1, arg2) => {
console.log(arg1, arg2)
})
// 调用 `cancel()` 取消注册 或者 syncEvent.off("event1", callback) 取消注册

触发该事件

syncEvent.emit("event1", 1, "2")
// 上面的回调函数触发

多事件注册

const cancel = syncEvent
.on("event1", (arg1, arg2) => {
console.log(arg1, arg2)
})
.on("event2", (arg1, arg2) => {
console.log(arg1, arg2)
})

// cancel() 一次性调用即可取消上面的两次注册

API

on

/**
* 注册一个事件
* @param {K} event 事件名称
* @param {M[K]} handler 回调函数
* @param {ListenerOptions} [options] 配置,可以设置节流和防抖,不支持同时设置
* @returns {LinkableListener<M>} 返回一个函数,该函数有 on,once 两个方法,可以继续注册事件,并且记录一个上下文
*/
on: <K extends keyof M>(event: K, handler: M[K], options?: ListenerOptions) => LinkableListener<M>

ListenerOptions

// 单位都是毫秒
type ListenerOptions = {
debounce?: { // 防抖
waitMs: number // 回调函数被推迟的时间
maxWaitMs?: number // 最大可被推迟的时间,如果超过该限制,则必须调用,无法继续推迟
}
throttle?: { //节流
waitMs: number // 最大可被重复调用的时间间隔
}
}

Example

// 防抖
syncEvent.on("event1", (arg1, arg2) => {}, {
debounce: {
waitMs: 200,
maxWaitMs: 500,
},
})

LinkableListener

interface LinkableListener<M> {
(): void
once: <K extends keyof M>(type: K, handler: M[K]) => LinkableListener<M>
on: <K extends keyof M>(type: K, handler: M[K], options?: ListenerOptions) => LinkableListener<M>
}

当 on 被调用的时候,返回一个函数,该函数有两个方法 on,once, 可以继续注册事件,并生成一个上下文,该上下文会记录每一个注册的事件形成一个链表,当最后一个cancel被调用的时候,上下文内所有的回调函数都会被清除

Example

Example

const cancelFunction = syncEvent.on("event1", (arg1, arg2) => {})

// function can be object too
const secondCancelFunction = cancelFunction.on('event2',(arg1,arg2)=>{})

secondCancelFunction.on('event1',() => {

})

secondCancelFunction() // cancel first and second registrations

listenerCount

获取某一个事件的回调函数的注册数量 如果不传具体的事件名,返回所有的回调函数的数量

/**
* @param {K} event 事件名称
* @returns {number} 数量
*/
listenerCount: <K extends keyof M>(event?: K | undefined) => number

Example

syncEvent.on("event1", () => {}).on("event2", () => {})
syncEvent.listenerCount("event1") // 1
syncEvent.listenerCount("event2") // 1
syncEvent.listenerCount() // 2

once

用法同 on 一致, 只不过该回调只会触发一次

/**
* @param {K} event - 事件名称
* @param {M[K]} handler - The handler function to be executed when the event is emitted.
* @returns {LinkableListener<M>} - An object containing methods to manage the listener.
*/
once: <K extends keyof M>(event: K, handler: M[K]) => LinkableListener<M>

any

监听所有的事件触发

  /**
* @param {Function} handler - 回调函数
* @returns {Function} - 返回一个函数,该函数调用取消 any 的注册
*/
any: <K extends keyof M = keyof M>(handler: (event: K, ...args: Arguments<M[K]>) => void) => () => void

Example

syncEvent.any((event, ...args) => {
console.log(event, args)
})

syncEvent.emit("event1", 1, "2") // event1, [1, "2"]
syncEvent.emit("event2", "3", 4) // event2, ["3", 4]

off

取消某一个事件的注册

/**
*
* @param {K} event - 事件名
* @param {M[K]} handler - 回调函数
*/
off: <K extends keyof M>(event: K, handler: M[K])

Example

const callback = () => {}
syncEvent.on("event1", callback)
// 取消注册
syncEvent.off("event1", callback)

emit

发布一个事件,并传递参数

/**
*
* @param {K} event - 事件名
* @param {...Arguments<M[K]>} args - 传递的参数
*/
emit: <K extends keyof M>(event: K, ...args: Arguments<M[K]>)

Example

syncEvent.emit("event1", 1, "2")

waitUtil

等待一个事件触发,返回它对应的响应参数的数组

  /**
* @param {K} event - 事件名称
* @param {WaitUtilConfig<Arguments<M[K]>>} [config={}] - 可选配置
* @returns {Promise<Arguments<M[K]>>} - 将对应的触发的参数转换成一个数组返回
*/
waitUtil: <K extends keyof M = keyof M>(
event: K,
config?: WaitUtilConfig<Arguments<M[K]>>,
) => Promise<Arguments<M[K]>>

Example


queueMicrotask(()=>{
syncEvent.emit("event1", 1, "2")
})
// block 直到 emit 触发
const result = await syncEvent.waitUtil("event1")

waitUtilRace

等待 eventList 任意一个事件触发的时候返回该事件的触发参数


/**
* @param {((EventListItem<M, K> | K)[])} eventList - 事件名或者事件配置
* @returns {Promise<WaitUtilCommonReturnValue<M, K>>} - 返回该事件对应的值
*/
waitUtilRace: <K extends keyof M = keyof M>(
eventList: (K | EventListItem<M, K>)[],
) => Promise<WaitUtilCommonReturnValue<M, K>>

Example

queueMicrotask(() => {
syncEvent.emit("event1", 1, "2")
})

const { event, value } = await syncEvent.waitUtilRace(["event1", "event2"])
// event => 'event1'
// value => [1,'2']

waitUtilAll

等待 eventList 中所有的事件触发,或其中一个抛出异常

/**
* @param {EventList} eventList - 事件列表
* @returns {Promise<ExtractHandlerMapArgumentsFromEventListItem<M, K, EventList>>} - 由所有事件的返回值组成,返回一个数组
*/
waitUtilAll: <
K extends keyof M = keyof M,
EventList extends readonly (K | Readonly<EventListItem<M, K>>)[] = readonly (
| K
| Readonly<EventListItem<M, K>>
)[],
>(
eventList: EventList,
) => Promise<ExtractHandlerMapArgumentsFromEventListItem<M, K, EventList>>

Example

  queueMicrotask(() => {
syncEvent.emit("event1", 1, "2")
syncEvent.emit("event2", "3", 4)
})

const [[arg1, arg2], [arg3, arg4]] = await syncEvent.waitUtilAll([
"event1",
"event2",
])
提示
  await syncEvent.waitUtilAll([
"event1",
{
event: "event2",
timeout: 1000,
},
] as const)

目前因为作者能力有限,如果 eventList 里面的元素不单纯是字符串的时候,用户需要加上 as const 给 eventList,waitUtilAll 才能获得明确的返回值的类型提示(其他 api 暂时没有这个问题),如果你有好的解决方案,请帮助我改进,万分感谢!

waitUtilAny

等待任意一个事件成功即可 Resolve ,所有事件都失败则 Reject,返回类型和 waitUtilRace 保持一致

  /**
* @param {((EventListItem<M, K> | K)[])} eventList - 事件列表
* @returns {Promise<WaitUtilCommonReturnValue<M, K>>} - 和 waitUtilRace 返回类型一致
*/
waitUtilAny: <K extends keyof M = keyof M>(
eventList: (K | EventListItem<M, K>)[],
) => Promise<WaitUtilCommonReturnValue<M, K>>

Example

queueMicrotask(() => {
syncEvent.emit("event2", "3", 4)
syncEvent.emit("event1", 1, "2")
})

const { event, value } = await syncEvent.waitUtilAny([
"event1",
{
event: "event2",
mapToError: () => Error(),
}, // 同 waitUtil 参数一致
] as const)

// event => 'event1'

createEventStream

创建一个事件流,源源不断的传递事件

  /**
* @param {K[]} eventList - An array of events to subscribe to.
* @param {EventStreamStrategy} strategy - 策略
* @returns {Object} - 异步迭代器,有方法 droppedEventCount/replacedEventCount获取被丢弃的事件数量
*/
createEventStream: <K extends keyof M = keyof M>(
eventList: K[],
strategy?: EventStreamStrategy,
) => {
[Symbol.asyncIterator]: () => {
next: () => Promise<{
value: WaitUtilCommonReturnValue<M, K>
done: boolean
}>
return: () => Promise<{
value: WaitUtilCommonReturnValue<M, K>
done: boolean
}>
}
droppedEventCount: () => number
replacedEventCount: () => number
}

Example


queueMicrotask(() => {
syncEvent.emit("event2", "3", 4)
syncEvent.emit("event1", 1, "2")
})

const eventStream = syncEvent.createEventStream(["event1", "event2"])

for await (const { event, value } of eventStream) { // 通过 for await 循环处理所有的事件
switch (event) {
case "event1": {
// value => [1, "2"]
break
}
case "event2": {
// value => ["3", 4]
break
}
default: {
throw Error("unreachable")
}
}
}

背压

考虑有如下的代码


setInterval(() => {
syncEvent.emit("event2", "3", 4)
syncEvent.emit("event1", 1, "2")
}, 100)

const eventStream = syncEvent.createEventStream(["event1", "event2"])

for await (const { event, value } of eventStream) {
// 每次处理都是 10s
await new Promise((res) => setTimeout(res, 10000))
}

SyncEvent 会创建一个队列,所有还没有被处理的事件都会被暂时存在队列里面

当事件触发的速度大于事件被处理的速度时,队列就会一直增长

如果你想处理这种情况,就需要配置 EventStreamStrategy

只有当你指定了 strategy.capacity ,并且队列里面的事件数量大于 strategy.capacity 的时候才会触发策略

EventStreamStrategy 有2种类型,分别对应三种不同的处理方式

  • drop:丢弃新触发的事件,直到有消息被消费,队列有位子
  • replace:丢弃一个最早触发但是未处理的事件,新触发的事件推入到队列中,保证 strategy.capacity 不变

如果不传递 strategystrategy.capacity <= 0 的时候不会丢弃事件

Example

setInterval(() => {
syncEvent.emit("event1", 1, "2")
syncEvent.emit("event2", "3", 4) // 每次都会被丢弃,因为容量是 1
}, 100)

const eventStream = syncEvent.createEventStream(["event1", "event2"], {
capacity: 1, // 最大容量是1
strategyWhenFull: "drop",
})

for await (const { event } of eventStream) {
console.log(event) // event 永远是 event1
switch (event) {
case "event1": {
break
}
default: {
// 不会有其他事件响应
throw Error("unreachable")
}
}
}

createReadableStream

createReadableStream 和 createEventStream 几乎是一样的

唯一的区别是 createReadableStream 返回的是 ReadableStream

Example


setInterval(() => {
syncEvent.emit("event1", 1, "2")
syncEvent.emit("event2", "3", 4)
}, 100)

const eventStream = syncEvent.createEventReadableStream(["event1", "event2"])
const reader = eventStream.getReader()

for (;;) {
const { done, value } = await reader.read()
if (done) {
break
}

switch (value.event) {
case "event1":
console.log(value.value) // 1,'2'
break
case "event2":
console.log(value.value) // '3',4
break
}
}

subscriber 和 publisher

将 SyncEvent 分成发布者和订阅者

syncEvent.subscriber  // 拥有除了 'emit' 之外所有订阅者的方法
syncEvent.publisher // 只有 'emit' 方法

当你在实现自定义的类的时候,会有需要来避免暴露某些方法

Example

syncEvent.subscriber.on("event1", () => {})

syncEvent.publisher.emit("event1", 1, "2")
// 自定义一个对象,只暴露订阅的方法
class Foo {
#syncEvent = SyncEvent.new<{
event1: (arg1: number, arg2: string) => void
event2: (arg1: string, arg2: number) => void
}>()

get subscriber() {
return this.#syncEvent.subscriber
}

constructor() {
setInterval(() => {
this.#syncEvent.emit("event1", 1, "2")
})
}
}

const foo = new Foo()

foo.subscriber.on("event1", () => {})