SyncEvent
SyncEvent is a Promise/Stream event library
Basic Usage
Install
npm install ocev
Import
import {SyncEvent} from 'ocev'
Initialize
Define your event type and initialize a SyncEvent
instance
import { SyncEvent } from 'ocev'
// define your event type
type EventHandlerMap = {
event1: (arg1: number, arg2: string) => void
event2: (arg1: string, arg2: number) => void
}
const syncEvent = SyncEvent.new<EventHandlerMap>()
// const syncEvent = new SyncEvent<EventHandlerMap>()
The above code indicates that the SyncEvent instance will trigger two events event1
,event2
callback function types accepted are EventHandlerMap[event1]
,EventHandlerMap['event2']
respectively.
ocev generates perfect type hints based on the type of EventHandler provided
Why is EventHandlerMap not designed to
type EventHandlerMap = {
event1: [number,string]
event2: [string,number]
}
Considering that () =>Promise<T>
functions will be supported in the future, it is necessary to know what the return value of the function is.
Register an event
const cancel = syncEvent.on('event1', (arg1, arg2) => {
console.log(arg1, arg2)
})
// Call cancel() to cancel or syncEvent.off("event1", callback) to cancel
emit event
syncEvent.emit("event1", 1, "2")
// The callback function above triggers
Multiple Event Registration
const cancel = syncEvent
.on("event1", (arg1, arg2) => {
console.log(arg1, arg2)
})
.on("event2", (arg1, arg2) => {
console.log(arg1, arg2)
})
// ....
// cancel() can cancel the above registrations once
API
on
/**
* Register an event handler for the specified event.
* @param {K} event Event name
* @param {M[K]} handler Event handler callback function
* @param {ListenerOptions} [options] Listener options support debounce and throttle , more details in the document
* @returns {LinkableListener<M>} Linkable listener object for chaining
*/
on: <K extends keyof M>(event: K, handler: M[K], options?: ListenerOptions) => LinkableListener<M>
ListenerOptions
// all in milliseconds
type ListenerOptions = {
debounce?: {
waitMs: number // the time the callback function was delay
maxWaitMs?: number // Maximum time that can be deferred, if this limit is exceeded, it must be called, no further deferral is possible
}
throttle?: {
waitMs: number //Maximum time interval that can be repeated
}
}
Example
syncEvent.on("event1", () => {}, {
debounce: {
waitMs: 200,
maxWaitMs: 500,
},
})
LinkableListener
interface LinkableListener<M> {
(): void
once: <K extends keyof M>(type: K, handler: M[K]) => LinkableListener<M>
on: <K extends keyof M>(type: K, handler: M[K], options?: ListenerOptions) => LinkableListener<M>
}
When on
is called, it returns a function that has two methods on,once
to continue registering events and generates a context that records each registered event to form a linked list. When the last cancel
is called, all callback functions in the context will be cleared.
Example
const cancelFunction = syncEvent.on("event1", (arg1, arg2) => {})
// function can be object too
const secondCancelFunction = cancelFunction.on('event2',(arg1,arg2)=>{})
secondCancelFunction.on('event1',() => {})
secondCancelFunction() // cancel first and second registrations
listenerCount
Gets the number of registered callback functions for an event
If no event name is passed, return the number of all callback functions
/**
* @param {K} event event name
* @returns {number} listener count
*/
listenerCount: <K extends keyof M>(event?: K | undefined) => number
Example
syncEvent.on("event1", () => {}).on("event2", () => {})
syncEvent.listenerCount("event1") // 1
syncEvent.listenerCount("event2") // 1
syncEvent.listenerCount() // 2
once
Same usage as on
, except the callback will only be triggered once
/**
* @param {K} event - event name
* @param {M[K]} handler - The handler function to be executed when the event is emitted.
* @returns {LinkableListener<M>} - An object containing methods to manage the listener.
*/
once: <K extends keyof M>(event: K, handler: M[K]) => LinkableListener<M>
any
Listen for all event triggers
/**
* @param {Function} handler - callback
* @returns {Function} - Returns a function that calls to unregister any
*/
any: <K extends keyof M = keyof M>(handler: (event: K, ...args: Arguments<M[K]>) => void) => () => void
Example
syncEvent.any((event, ...args) => {
console.log(event, args)
})
syncEvent.emit("event1", 1, "2") // event1, [1, "2"]
syncEvent.emit("event2", "3", 4) // event2, ["3", 4]
off
Unregistering an event
/**
*
* @param {K} event - event name
* @param {M[K]} handler - callback
*/
off: <K extends keyof M>(event: K, handler: M[K])
Example
const callback = () => {}
syncEvent.on("event1", callback)
// unregister
syncEvent.off("event1", callback)
emit
Publish an event and pass parameters
/**
*
* @param {K} event - event name
* @param {...Arguments<M[K]>} args - arguments
*/
emit: <K extends keyof M>(event: K, ...args: Arguments<M[K]>)
Example
syncEvent.emit("event1", 1, "2")
waitUtil
Wait for an event to trigger and return an array of its corresponding response parameters
/**
* @param {K} event - event name
* @param {WaitUtilConfig<Arguments<M[K]>>} [config={}] - config
* @returns {Promise<Arguments<M[K]>>} - Convert the corresponding trigger parameters into an array and return
*/
waitUtil: <K extends keyof M = keyof M>(
event: K,
config?: WaitUtilConfig<Arguments<M[K]>>,
) => Promise<Arguments<M[K]>>
Example
- base usage
- where
- timeout
- map to error
- cancel promise
queueMicrotask(()=>{
syncEvent.emit("event1", 1, "2")
})
// block util 'event1' emit
const result = await syncEvent.waitUtil("event1")
let count = 0
setInterval(() => {
syncEvent.emit("event1", count++, "2")
}, 100)
const result = await syncEvent.waitUtil("event1", {
// Every time an event is triggered, the where function is called, and only when 'where' returns true does the Promise resolve.
where: (arg1, arg2) => arg1 > 10,
})
// If the timeout has expired and the event has not been triggered, an exception TimeoutError is thrown.
syncEvent.waitUtil("event1", {
timeout: 1000,
}).catch(error=>{
// TimeoutError
})
queueMicrotask(() => {
syncEvent.emit("event1", 1, "2")
})
const result = await syncEvent.waitUtil("event1", {
// When 'mapToError' returns 'null| When' undefined, the entire Promise resolves normally; otherwise, Promise reject with the return value
mapToError: (arg1, arg2) => (arg1 < 0 ? Error() : null),
})
const cancelRef = { current() {} }
queueMicrotask(() => {
// WaitUtil throws an exception, ending the wait early.
cancelRef.current()
})
// 'cancelRef' is written to the current property, and the value is a function called to cause 'waitUtil' to throw an exception.
await syncEvent.waitUtil("event1", {
cancelRef,
})
waitUtilRace
When waiting for any event in eventList to trigger, return the trigger parameter of the event
/**
* @param {((EventListItem<M, K> | K)[])} eventList - event name list
* @returns {Promise<WaitUtilCommonReturnValue<M, K>>} - {event: K, value: Arguments<M[K]>}
*/
waitUtilRace: <K extends keyof M = keyof M>(
eventList: (K | EventListItem<M, K>)[],
) => Promise<WaitUtilCommonReturnValue<M, K>>
Example
- race
- config
queueMicrotask(() => {
syncEvent.emit("event1", 1, "2")
})
const { event, value } = await syncEvent.waitUtilRace(["event1", "event2"])
// event => 'event1'
// value => [1,'2']
const { event, value } = await syncEvent.waitUtilRace([
"event1",
{
event: "event2",
timeout: 1000,
where: () => true,
mapToError: () => null,
}, // 参数和 waitUtil 保持一致,多了一个 event
])
waitUtilAll
Wait for all events in the eventList to fire, or one of them throws an exception
/**
* @param {EventList} eventList - event list
* @returns {Promise<ExtractHandlerMapArgumentsFromEventListItem<M, K, EventList>>} - Composed of all event return values array, returning an array
*/
waitUtilAll: <
K extends keyof M = keyof M,
EventList extends readonly (K | Readonly<EventListItem<M, K>>)[] = readonly (
| K
| Readonly<EventListItem<M, K>>
)[],
>(
eventList: EventList,
) => Promise<ExtractHandlerMapArgumentsFromEventListItem<M, K, EventList>>
Example
- base usage
- options
queueMicrotask(() => {
syncEvent.emit("event1", 1, "2")
syncEvent.emit("event2", "3", 4)
})
const [[arg1, arg2], [arg3, arg4]] = await syncEvent.waitUtilAll([
"event1",
"event2",
])
queueMicrotask(() => {
syncEvent.emit("event1", 1, "2")
syncEvent.emit("event2", "3", 4)
})
const [[arg1, arg2], [arg3, arg4]] = await syncEvent.waitUtilAll([
"event1",
{
event: "event2",
timeout: 1000,
}, // Consistent with waitUtil parameter
] as const) // At present, the author's ability is limited. If you want to get the perfect type prompt in 'waitUtilAll', you must manually add as const. If you have a good solution, please help me. Thank you very much.
await syncEvent.waitUtilAll([
"event1",
{
event: "event2",
timeout: 1000,
},
] as const)
At present, Because of ability issues , if the elements in eventList are not simply strings, the user needs to add as const
to eventList, than 'waitUtilAll' get a clear type prompt for the return value (other APIs do not have this problem temporarily), if you have a good solution, please help me improve, thank you very much!
waitUtilAny
Resolve if any event succeeds, Reject if all events fail, and keep the return type consistent with 'waitUtilRace'
/**
* @param {((EventListItem<M, K> | K)[])} eventList - event list
* @returns {Promise<WaitUtilCommonReturnValue<M, K>>} - same return type as waitUtilRace
*/
waitUtilAny: <K extends keyof M = keyof M>(
eventList: (K | EventListItem<M, K>)[],
) => Promise<WaitUtilCommonReturnValue<M, K>>
Example
queueMicrotask(() => {
syncEvent.emit("event2", "3", 4)
syncEvent.emit("event1", 1, "2")
})
const { event, value } = await syncEvent.waitUtilAny([
"event1",
{
event: "event2",
mapToError: () => Error(),
}, // consistent with waitUtil parameter
] as const)
// event => 'event1'
createEventStream
Create an event stream that delivers events continuously
/**
* @param {K[]} eventList - An array of events to subscribe to.
* @param {EventStreamStrategy} strategy - 策略
* @returns {Object} - asynchronous iterator with method droppedEventCount/replacedEventCount to get the number of dropped events
*/
createEventStream: <K extends keyof M = keyof M>(
eventList: K[],
strategy?: EventStreamStrategy,
) => {
[Symbol.asyncIterator]: () => {
next: () => Promise<{
value: WaitUtilCommonReturnValue<M, K>
done: boolean
}>
return: () => Promise<{
value: WaitUtilCommonReturnValue<M, K>
done: boolean
}>
}
droppedEventCount: () => number
replacedEventCount: () => number
}
Example
queueMicrotask(() => {
syncEvent.emit("event2", "3", 4)
syncEvent.emit("event1", 1, "2")
})
const eventStream = syncEvent.createEventStream(["event1", "event2"])
for await (const { event, value } of eventStream) { // Process events through the for await loop
switch (event) {
case "event1": {
// value => [1, "2"]
break
}
case "event2": {
// value => ["3", 4]
break
}
default: {
throw Error("unreachable")
}
}
}
back pressure
Consider the following code
setInterval(() => {
syncEvent.emit("event2", "3", 4)
syncEvent.emit("event1", 1, "2")
}, 100)
const eventStream = syncEvent.createEventStream(["event1", "event2"])
for await (const { event, value } of eventStream) {
// 10s for each treatment
await new Promise((res) => setTimeout(res, 10000))
}
SyncEvent creates a queue in which all events that have not yet been processed are temporarily stored
The queue grows as events trigger faster than events are processed
If you want to handle this situation, you need to configure EventStreamStrategy.
The policy is triggered only if you specify strategy.capacity
and the number of events in the queue is greater than strategy.capacity
EventStreamStrategy
has two types, corresponding to three different processing methods.
drop
:Discard newly triggered events until a message is consumed and the queue has a seatreplace
:Discard the earliest but unprocessed event, push the newly triggered event to the queue, and keepstrategy.capacity
unchanged.
strategy
does not work without passing strategy
or strategy.capacity <= 0
Example
- drop
- replace
setInterval(() => {
syncEvent.emit("event1", 1, "2")
syncEvent.emit("event2", "3", 4) // every time it gets discarded, because the capacity is 1.
}, 100)
const eventStream = syncEvent.createEventStream(["event1", "event2"], {
capacity: 1, // maximum capacity is 1.
strategyWhenFull: "drop",
})
for await (const { event } of eventStream) {
console.log(event) // event is always event1
switch (event) {
case "event1": {
break
}
default: {
// There will be no other event response
throw Error("unreachable")
}
}
}
setInterval(() => {
syncEvent.emit("event1", 1, "2") // each time it is discarded, because the capacity is 1, each time it is crowded out by an updated event.
syncEvent.emit("event2", "3", 4)
}, 100)
const eventStream = syncEvent.createEventStream(["event1", "event2"], {
capacity: 1, // maximum capacity is 1.
strategyWhenFull: "replace",
})
for await (const { event } of eventStream) {
console.log(event) // event is always 'event2'
switch (event) {
case "event2": {
break
}
default: {
// there will be no other event response
throw Error("unreachable")
}
}
}
createReadableStream
CreateReadableStream
and CreateEventStream
are almost identical.
The only difference is that createReadableStream
returns ReadableStream
Example
setInterval(() => {
syncEvent.emit("event1", 1, "2")
syncEvent.emit("event2", "3", 4)
}, 100)
const eventStream = syncEvent.createEventReadableStream(["event1", "event2"])
const reader = eventStream.getReader()
for (;;) {
const { done, value } = await reader.read()
if (done) {
break
}
switch (value.event) {
case "event1":
console.log(value.value) // 1,'2'
break
case "event2":
console.log(value.value) // '3',4
break
}
}
subscriber and publisher
Split SyncEvents into Publishers and Subscribers
syncEvent.subscriber // Method with all subscribers except 'emit'
syncEvent.publisher // Only the 'emit' method
When you implement custom classes, there is a need to avoid exposing certain methods
Example
syncEvent.subscriber.on("event1", () => {})
syncEvent.publisher.emit("event1", 1, "2")
// Customize an object that exposes only **subscribed methods**
class Foo {
#syncEvent = SyncEvent.new<{
event1: (arg1: number, arg2: string) => void
event2: (arg1: string, arg2: number) => void
}>()
get subscriber() {
return this.#syncEvent.subscriber
}
constructor() {
setInterval(() => {
this.#syncEvent.emit("event1", 1, "2")
})
}
}
const foo = new Foo()
foo.subscriber.on("event1", () => {})