Skip to main content

SyncEvent

SyncEvent is a Promise/Stream event library

Basic Usage

Install

npm install ocev

Import

import {SyncEvent} from 'ocev'

Initialize

Define your event type and initialize a SyncEvent instance

import { SyncEvent } from 'ocev'

// define your event type
type EventHandlerMap = {
event1: (arg1: number, arg2: string) => void
event2: (arg1: string, arg2: number) => void
}

const syncEvent = SyncEvent.new<EventHandlerMap>()
// const syncEvent = new SyncEvent<EventHandlerMap>()

The above code indicates that the SyncEvent instance will trigger two events event1,event2

callback function types accepted are EventHandlerMap[event1],EventHandlerMap['event2'] respectively.

ocev generates perfect type hints based on the type of EventHandler provided

tip

Why is EventHandlerMap not designed to

type EventHandlerMap = {
event1: [number,string]
event2: [string,number]
}

Considering that () =>Promise<T> functions will be supported in the future, it is necessary to know what the return value of the function is.

Register an event

const cancel = syncEvent.on('event1', (arg1, arg2) => {
console.log(arg1, arg2)
})
// Call cancel() to cancel or syncEvent.off("event1", callback) to cancel

emit event

syncEvent.emit("event1", 1, "2")
// The callback function above triggers

Multiple Event Registration

const cancel = syncEvent
.on("event1", (arg1, arg2) => {
console.log(arg1, arg2)
})
.on("event2", (arg1, arg2) => {
console.log(arg1, arg2)
})
// ....

// cancel() can cancel the above registrations once

API

on

/**
* Register an event handler for the specified event.
* @param {K} event Event name
* @param {M[K]} handler Event handler callback function
* @param {ListenerOptions} [options] Listener options support debounce and throttle , more details in the document
* @returns {LinkableListener<M>} Linkable listener object for chaining
*/
on: <K extends keyof M>(event: K, handler: M[K], options?: ListenerOptions) => LinkableListener<M>

ListenerOptions

// all in milliseconds
type ListenerOptions = {
debounce?: {
waitMs: number // the time the callback function was delay
maxWaitMs?: number // Maximum time that can be deferred, if this limit is exceeded, it must be called, no further deferral is possible
}
throttle?: {
waitMs: number //Maximum time interval that can be repeated
}
}

Example

syncEvent.on("event1", () => {}, {
debounce: {
waitMs: 200,
maxWaitMs: 500,
},
})

LinkableListener

interface LinkableListener<M> {
(): void
once: <K extends keyof M>(type: K, handler: M[K]) => LinkableListener<M>
on: <K extends keyof M>(type: K, handler: M[K], options?: ListenerOptions) => LinkableListener<M>
}

When on is called, it returns a function that has two methods on,once to continue registering events and generates a context that records each registered event to form a linked list. When the last cancel is called, all callback functions in the context will be cleared.

Example

const cancelFunction = syncEvent.on("event1", (arg1, arg2) => {})

// function can be object too
const secondCancelFunction = cancelFunction.on('event2',(arg1,arg2)=>{})

secondCancelFunction.on('event1',() => {})

secondCancelFunction() // cancel first and second registrations

listenerCount

Gets the number of registered callback functions for an event

If no event name is passed, return the number of all callback functions

/**
* @param {K} event event name
* @returns {number} listener count
*/
listenerCount: <K extends keyof M>(event?: K | undefined) => number

Example

syncEvent.on("event1", () => {}).on("event2", () => {})
syncEvent.listenerCount("event1") // 1
syncEvent.listenerCount("event2") // 1
syncEvent.listenerCount() // 2

once

Same usage as on, except the callback will only be triggered once

/**
* @param {K} event - event name
* @param {M[K]} handler - The handler function to be executed when the event is emitted.
* @returns {LinkableListener<M>} - An object containing methods to manage the listener.
*/
once: <K extends keyof M>(event: K, handler: M[K]) => LinkableListener<M>

any

Listen for all event triggers

  /**
* @param {Function} handler - callback
* @returns {Function} - Returns a function that calls to unregister any
*/
any: <K extends keyof M = keyof M>(handler: (event: K, ...args: Arguments<M[K]>) => void) => () => void

Example

syncEvent.any((event, ...args) => {
console.log(event, args)
})

syncEvent.emit("event1", 1, "2") // event1, [1, "2"]
syncEvent.emit("event2", "3", 4) // event2, ["3", 4]

off

Unregistering an event

/**
*
* @param {K} event - event name
* @param {M[K]} handler - callback
*/
off: <K extends keyof M>(event: K, handler: M[K])

Example

const callback = () => {}
syncEvent.on("event1", callback)
// unregister
syncEvent.off("event1", callback)

emit

Publish an event and pass parameters

/**
*
* @param {K} event - event name
* @param {...Arguments<M[K]>} args - arguments
*/
emit: <K extends keyof M>(event: K, ...args: Arguments<M[K]>)

Example

syncEvent.emit("event1", 1, "2")

waitUtil

Wait for an event to trigger and return an array of its corresponding response parameters

  /**
* @param {K} event - event name
* @param {WaitUtilConfig<Arguments<M[K]>>} [config={}] - config
* @returns {Promise<Arguments<M[K]>>} - Convert the corresponding trigger parameters into an array and return
*/
waitUtil: <K extends keyof M = keyof M>(
event: K,
config?: WaitUtilConfig<Arguments<M[K]>>,
) => Promise<Arguments<M[K]>>

Example


queueMicrotask(()=>{
syncEvent.emit("event1", 1, "2")
})
// block util 'event1' emit
const result = await syncEvent.waitUtil("event1")

waitUtilRace

When waiting for any event in eventList to trigger, return the trigger parameter of the event


/**
* @param {((EventListItem<M, K> | K)[])} eventList - event name list
* @returns {Promise<WaitUtilCommonReturnValue<M, K>>} - {event: K, value: Arguments<M[K]>}
*/
waitUtilRace: <K extends keyof M = keyof M>(
eventList: (K | EventListItem<M, K>)[],
) => Promise<WaitUtilCommonReturnValue<M, K>>

Example

queueMicrotask(() => {
syncEvent.emit("event1", 1, "2")
})

const { event, value } = await syncEvent.waitUtilRace(["event1", "event2"])
// event => 'event1'
// value => [1,'2']

waitUtilAll

Wait for all events in the eventList to fire, or one of them throws an exception

/**
* @param {EventList} eventList - event list
* @returns {Promise<ExtractHandlerMapArgumentsFromEventListItem<M, K, EventList>>} - Composed of all event return values array, returning an array
*/
waitUtilAll: <
K extends keyof M = keyof M,
EventList extends readonly (K | Readonly<EventListItem<M, K>>)[] = readonly (
| K
| Readonly<EventListItem<M, K>>
)[],
>(
eventList: EventList,
) => Promise<ExtractHandlerMapArgumentsFromEventListItem<M, K, EventList>>

Example

  queueMicrotask(() => {
syncEvent.emit("event1", 1, "2")
syncEvent.emit("event2", "3", 4)
})

const [[arg1, arg2], [arg3, arg4]] = await syncEvent.waitUtilAll([
"event1",
"event2",
])
tip
  await syncEvent.waitUtilAll([
"event1",
{
event: "event2",
timeout: 1000,
},
] as const)

At present, Because of ability issues , if the elements in eventList are not simply strings, the user needs to add as const to eventList, than 'waitUtilAll' get a clear type prompt for the return value (other APIs do not have this problem temporarily), if you have a good solution, please help me improve, thank you very much!

waitUtilAny

Resolve if any event succeeds, Reject if all events fail, and keep the return type consistent with 'waitUtilRace'

  /**
* @param {((EventListItem<M, K> | K)[])} eventList - event list
* @returns {Promise<WaitUtilCommonReturnValue<M, K>>} - same return type as waitUtilRace
*/
waitUtilAny: <K extends keyof M = keyof M>(
eventList: (K | EventListItem<M, K>)[],
) => Promise<WaitUtilCommonReturnValue<M, K>>

Example

queueMicrotask(() => {
syncEvent.emit("event2", "3", 4)
syncEvent.emit("event1", 1, "2")
})

const { event, value } = await syncEvent.waitUtilAny([
"event1",
{
event: "event2",
mapToError: () => Error(),
}, // consistent with waitUtil parameter
] as const)

// event => 'event1'

createEventStream

Create an event stream that delivers events continuously

  /**
* @param {K[]} eventList - An array of events to subscribe to.
* @param {EventStreamStrategy} strategy - 策略
* @returns {Object} - asynchronous iterator with method droppedEventCount/replacedEventCount to get the number of dropped events
*/
createEventStream: <K extends keyof M = keyof M>(
eventList: K[],
strategy?: EventStreamStrategy,
) => {
[Symbol.asyncIterator]: () => {
next: () => Promise<{
value: WaitUtilCommonReturnValue<M, K>
done: boolean
}>
return: () => Promise<{
value: WaitUtilCommonReturnValue<M, K>
done: boolean
}>
}
droppedEventCount: () => number
replacedEventCount: () => number
}

Example


queueMicrotask(() => {
syncEvent.emit("event2", "3", 4)
syncEvent.emit("event1", 1, "2")
})

const eventStream = syncEvent.createEventStream(["event1", "event2"])

for await (const { event, value } of eventStream) { // Process events through the for await loop
switch (event) {
case "event1": {
// value => [1, "2"]
break
}
case "event2": {
// value => ["3", 4]
break
}
default: {
throw Error("unreachable")
}
}
}

back pressure

Consider the following code


setInterval(() => {
syncEvent.emit("event2", "3", 4)
syncEvent.emit("event1", 1, "2")
}, 100)

const eventStream = syncEvent.createEventStream(["event1", "event2"])

for await (const { event, value } of eventStream) {
// 10s for each treatment
await new Promise((res) => setTimeout(res, 10000))
}

SyncEvent creates a queue in which all events that have not yet been processed are temporarily stored

The queue grows as events trigger faster than events are processed

If you want to handle this situation, you need to configure EventStreamStrategy.

The policy is triggered only if you specify strategy.capacity and the number of events in the queue is greater than strategy.capacity

EventStreamStrategy has two types, corresponding to three different processing methods.

  • drop:Discard newly triggered events until a message is consumed and the queue has a seat
  • replace:Discard the earliest but unprocessed event, push the newly triggered event to the queue, and keep strategy.capacity unchanged.

strategy does not work without passing strategy or strategy.capacity <= 0

Example

setInterval(() => {
syncEvent.emit("event1", 1, "2")
syncEvent.emit("event2", "3", 4) // every time it gets discarded, because the capacity is 1.
}, 100)

const eventStream = syncEvent.createEventStream(["event1", "event2"], {
capacity: 1, // maximum capacity is 1.
strategyWhenFull: "drop",
})

for await (const { event } of eventStream) {
console.log(event) // event is always event1
switch (event) {
case "event1": {
break
}
default: {
// There will be no other event response
throw Error("unreachable")
}
}
}

createReadableStream

CreateReadableStream and CreateEventStream are almost identical.

The only difference is that createReadableStream returns ReadableStream

Example


setInterval(() => {
syncEvent.emit("event1", 1, "2")
syncEvent.emit("event2", "3", 4)
}, 100)

const eventStream = syncEvent.createEventReadableStream(["event1", "event2"])
const reader = eventStream.getReader()

for (;;) {
const { done, value } = await reader.read()
if (done) {
break
}

switch (value.event) {
case "event1":
console.log(value.value) // 1,'2'
break
case "event2":
console.log(value.value) // '3',4
break
}
}

subscriber and publisher

Split SyncEvents into Publishers and Subscribers

syncEvent.subscriber  // Method with all subscribers except 'emit'
syncEvent.publisher // Only the 'emit' method

When you implement custom classes, there is a need to avoid exposing certain methods

Example

syncEvent.subscriber.on("event1", () => {})

syncEvent.publisher.emit("event1", 1, "2")
// Customize an object that exposes only **subscribed methods**
class Foo {
#syncEvent = SyncEvent.new<{
event1: (arg1: number, arg2: string) => void
event2: (arg1: string, arg2: number) => void
}>()

get subscriber() {
return this.#syncEvent.subscriber
}

constructor() {
setInterval(() => {
this.#syncEvent.emit("event1", 1, "2")
})
}
}

const foo = new Foo()

foo.subscriber.on("event1", () => {})