Init
This commit is contained in:
166
node_modules/mqtt/src/lib/store.ts
generated
vendored
Normal file
166
node_modules/mqtt/src/lib/store.ts
generated
vendored
Normal file
@ -0,0 +1,166 @@
|
||||
/**
|
||||
* Module dependencies
|
||||
*/
|
||||
import { Readable } from 'readable-stream'
|
||||
import { Packet } from 'mqtt-packet'
|
||||
import { DoneCallback } from './shared'
|
||||
|
||||
const streamsOpts = { objectMode: true }
|
||||
const defaultStoreOptions = {
|
||||
clean: true,
|
||||
}
|
||||
|
||||
export interface IStoreOptions {
|
||||
/**
|
||||
* true, clear _inflights at close
|
||||
*/
|
||||
clean?: boolean
|
||||
}
|
||||
|
||||
export type PacketCallback = (error?: Error, packet?: Packet) => void
|
||||
|
||||
export interface IStore {
|
||||
/**
|
||||
* Adds a packet to the store, a packet is
|
||||
* anything that has a messageId property.
|
||||
*
|
||||
*/
|
||||
put(packet: Packet, cb: DoneCallback): IStore
|
||||
|
||||
/**
|
||||
* Creates a stream with all the packets in the store
|
||||
*
|
||||
*/
|
||||
createStream(): Readable
|
||||
|
||||
/**
|
||||
* deletes a packet from the store.
|
||||
*/
|
||||
del(packet: Pick<Packet, 'messageId'>, cb: PacketCallback): IStore
|
||||
|
||||
/**
|
||||
* get a packet from the store.
|
||||
*/
|
||||
get(packet: Pick<Packet, 'messageId'>, cb: PacketCallback): IStore
|
||||
|
||||
/**
|
||||
* Close the store
|
||||
*/
|
||||
close(cb: DoneCallback): void
|
||||
}
|
||||
|
||||
/**
|
||||
* In-memory implementation of the message store
|
||||
* This can actually be saved into files.
|
||||
*
|
||||
* @param {Object} [options] - store options
|
||||
*/
|
||||
export default class Store implements IStore {
|
||||
private options: IStoreOptions
|
||||
|
||||
private _inflights: Map<number, Packet>
|
||||
|
||||
constructor(options?: IStoreOptions) {
|
||||
this.options = options || {}
|
||||
|
||||
// Defaults
|
||||
this.options = { ...defaultStoreOptions, ...options }
|
||||
|
||||
this._inflights = new Map()
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a packet to the store, a packet is
|
||||
* anything that has a messageId property.
|
||||
*
|
||||
*/
|
||||
put(packet: Packet, cb: DoneCallback) {
|
||||
this._inflights.set(packet.messageId, packet)
|
||||
|
||||
if (cb) {
|
||||
cb()
|
||||
}
|
||||
|
||||
return this
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a stream with all the packets in the store
|
||||
*
|
||||
*/
|
||||
createStream() {
|
||||
const stream = new Readable(streamsOpts)
|
||||
const values = []
|
||||
let destroyed = false
|
||||
let i = 0
|
||||
|
||||
this._inflights.forEach((value, key) => {
|
||||
values.push(value)
|
||||
})
|
||||
|
||||
stream._read = () => {
|
||||
if (!destroyed && i < values.length) {
|
||||
stream.push(values[i++])
|
||||
} else {
|
||||
stream.push(null)
|
||||
}
|
||||
}
|
||||
|
||||
stream.destroy = (err) => {
|
||||
if (destroyed) {
|
||||
return
|
||||
}
|
||||
|
||||
destroyed = true
|
||||
|
||||
setTimeout(() => {
|
||||
stream.emit('close')
|
||||
}, 0)
|
||||
|
||||
return stream
|
||||
}
|
||||
|
||||
return stream
|
||||
}
|
||||
|
||||
/**
|
||||
* deletes a packet from the store.
|
||||
*/
|
||||
del(packet: Pick<Packet, 'messageId'>, cb: PacketCallback) {
|
||||
const toDelete = this._inflights.get(packet.messageId)
|
||||
if (toDelete) {
|
||||
this._inflights.delete(packet.messageId)
|
||||
cb(null, toDelete)
|
||||
} else if (cb) {
|
||||
cb(new Error('missing packet'))
|
||||
}
|
||||
|
||||
return this
|
||||
}
|
||||
|
||||
/**
|
||||
* get a packet from the store.
|
||||
*/
|
||||
get(packet: Pick<Packet, 'messageId'>, cb: PacketCallback) {
|
||||
const storedPacket = this._inflights.get(packet.messageId)
|
||||
if (storedPacket) {
|
||||
cb(null, storedPacket)
|
||||
} else if (cb) {
|
||||
cb(new Error('missing packet'))
|
||||
}
|
||||
|
||||
return this
|
||||
}
|
||||
|
||||
/**
|
||||
* Close the store
|
||||
*/
|
||||
close(cb: DoneCallback) {
|
||||
if (this.options.clean) {
|
||||
this._inflights = null
|
||||
}
|
||||
if (cb) {
|
||||
cb()
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user