From 8809380c8642b164e72959e85e0fad5b20e7e649 Mon Sep 17 00:00:00 2001 From: Evert Prants Date: Sat, 28 Nov 2020 15:34:34 +0200 Subject: [PATCH] Channels, Protocols, Services --- src/channel/index.ts | 103 +++++++++++++++++++++++- src/core/logger.ts | 25 ++++-- src/plugin/decorators/configurable.ts | 6 ++ src/plugin/decorators/dependency.ts | 19 ++--- src/plugin/decorators/index.ts | 8 +- src/plugin/decorators/service.ts | 6 ++ src/plugin/loader.ts | 19 ++++- src/plugin/manager.ts | 38 +++++++-- src/plugin/plugin.ts | 15 ++-- src/types/config.ts | 59 +++++++++----- src/types/index.ts | 10 ++- src/types/message.ts | 3 +- src/types/protocol.ts | 66 ++++++++++++++++ src/types/service.ts | 110 ++++++++++++++++++++++++++ tslint.json | 3 + 15 files changed, 431 insertions(+), 59 deletions(-) create mode 100644 src/plugin/decorators/configurable.ts create mode 100644 src/plugin/decorators/service.ts create mode 100644 src/types/protocol.ts create mode 100644 src/types/service.ts diff --git a/src/channel/index.ts b/src/channel/index.ts index a38abb4..7e68d4b 100644 --- a/src/channel/index.ts +++ b/src/channel/index.ts @@ -1,13 +1,110 @@ +import { IPlugin } from '../plugin'; +import { IMessage, Protocol } from '../types'; import { ScopedEventEmitter } from '../util/events'; +export interface IChannel { + name: string; + plugins: string[]; + enabled: boolean; +} + export class ChannelManager { + private channels: IChannel[] = []; + constructor(private stream: ScopedEventEmitter) {} - public initialize(configured: any[]): void { + public static determinePlugin(source: any): IPlugin | null { + if (source != null) { + if (source.manifest) { + return source; + } + + if (source.plugin && source.plugin.manifest) { + return source.plugin; + } + } + + return null; + } + + public initialize(configured: IChannel[]): void { + this.addPreconfiguredChannels(configured); + for (const event of ['message', 'event', 'special']) { - this.stream.on('channel', event, (...data: any[]) => { - // TODO: pass messages between channels + this.stream.on('channel', event, (data: IMessage) => { + const plugin = ChannelManager.determinePlugin(data.source); + if (!plugin) { + return; + } + const source = plugin.manifest.name; + const emitTo = this.getChannelsByPluginName(source); + for (const chan of emitTo) { + if (chan.plugins.length < 2) { + continue; + } + for (const pl of chan.plugins) { + if (pl !== source) { + this.stream.emitTo(pl, event, data); + } + } + } }); } } + + private getChannelsByPluginName(plugin: string): IChannel[] { + const list = []; + for (const chan of this.channels) { + if (chan.enabled === false) { + continue; + } + if (chan.plugins.indexOf(plugin) !== -1) { + list.push(chan); + } + } + return list; + } + + private addPreconfiguredChannels(channels: IChannel[]): void { + for (const chan of channels) { + if (!chan.name) { + throw new Error('Channel name is mandatory.'); + } + + if (!chan.plugins) { + throw new Error('Channel plugins list is mandatory.'); + } + + this.channels.push(chan); + } + } + + public getChannelByName(name: string): IChannel | null { + for (const chan of this.channels) { + if (chan.name === name) { + return chan; + } + } + return null; + } + + public addChannel(chan: IChannel): IChannel { + const exists = this.getChannelByName(chan.name); + if (exists) { + throw new Error('Channel by that name already exists!'); + } + this.channels.push(chan); + return chan; + } + + public removeChannel(chan: string | IChannel): void { + if (typeof chan === 'string') { + const getchan = this.getChannelByName(chan); + if (!getchan) { + throw new Error('Channel by that name doesn\'t exists!'); + } + chan = getchan; + } + this.channels.splice(this.channels.indexOf(chan), 1); + } } diff --git a/src/core/logger.ts b/src/core/logger.ts index c4cdde0..346ae32 100644 --- a/src/core/logger.ts +++ b/src/core/logger.ts @@ -3,12 +3,23 @@ import util from 'util'; export class Logger { public timestamp = 'dd/mm/yy HH:MM:ss'; + private console = [console.log, console.warn, console.error]; constructor() {} + public setReadline(rl: any): void { + for (const index in this.console) { + const old = this.console[index]; + this.console[index] = (...data: any[]): void => { + rl.output.write('\x1b[2K\r'); + old.apply(null, data); + }; + } + } + private write(ltype: string, ...data: any[]): void { const message = []; - let cfunc = console.log; + let cfunc = this.console[0]; if (this.timestamp) { message.push(`[${dateFmt(new Date(), this.timestamp)}]`); @@ -18,16 +29,16 @@ export class Logger { case 'info': message.push('[ INFO]'); break; - case 'error': - message.push('[ERROR]'); - cfunc = console.error; + case 'debug': + message.push('[DEBUG]'); break; case 'warn': message.push('[ WARN]'); - cfunc = console.warn; + cfunc = this.console[1]; break; - case 'debug': - message.push('[DEBUG]'); + case 'error': + message.push('[ERROR]'); + cfunc = this.console[2]; break; } diff --git a/src/plugin/decorators/configurable.ts b/src/plugin/decorators/configurable.ts new file mode 100644 index 0000000..f2d58d8 --- /dev/null +++ b/src/plugin/decorators/configurable.ts @@ -0,0 +1,6 @@ + +export function Configurable(defconf: any): Function { + return (constructor: Function): void => { + constructor.prototype.__defconf = defconf; + }; +} diff --git a/src/plugin/decorators/dependency.ts b/src/plugin/decorators/dependency.ts index 35e355e..f3518b1 100644 --- a/src/plugin/decorators/dependency.ts +++ b/src/plugin/decorators/dependency.ts @@ -9,13 +9,14 @@ export function DependencyLoad(dep: string): (...args: any[]) => void { descriptor.value = function(...args: any[]): void { const self = this as any; self.stream.on(self.name, 'pluginLoaded', (plugin: any) => { - if (typeof plugin !== 'string') { - plugin = plugin.metadata.name; - } - if (plugin !== dep) { + if (typeof plugin === 'string') { return; } - originalMethod.apply(self, plugin); + const nameof = plugin.manifest.name; + if (nameof !== dep) { + return; + } + originalMethod.call(self, plugin); }); }; // Set the function to be autoexecuted when the plugin is initialized. @@ -34,16 +35,16 @@ export function DependencyUnload(dep: string): (...args: any[]) => void { descriptor.value = function(...args: any[]): void { const self = this as any; self.stream.on(self.name, 'pluginUnloaded', (plugin: any) => { + let nameof = plugin; if (typeof plugin !== 'string') { - plugin = plugin.metadata.name; + nameof = plugin.manifest.name; } - if (plugin !== dep) { + if (nameof !== dep) { return; } - originalMethod.apply(self, plugin); + originalMethod.call(self, plugin); }); }; - // Set the function to be autoexecuted when the plugin is initialized. descriptor.value.prototype.__autoexec = 1; return descriptor; }; diff --git a/src/plugin/decorators/index.ts b/src/plugin/decorators/index.ts index 95d4954..3fbb290 100644 --- a/src/plugin/decorators/index.ts +++ b/src/plugin/decorators/index.ts @@ -1,3 +1,5 @@ -export { EventListener } from './eventlistener'; -export { Auto } from './auto'; -export { DependencyLoad, DependencyUnload } from './dependency'; +export * from './eventlistener'; +export * from './auto'; +export * from './dependency'; +export * from './configurable'; +export * from './service'; diff --git a/src/plugin/decorators/service.ts b/src/plugin/decorators/service.ts new file mode 100644 index 0000000..77fc174 --- /dev/null +++ b/src/plugin/decorators/service.ts @@ -0,0 +1,6 @@ + +export function InjectService(servtype: any): Function { + return (constructor: Function): void => { + constructor.prototype.__service = servtype; + }; +} diff --git a/src/plugin/loader.ts b/src/plugin/loader.ts index 6c3fe0d..44e1d4e 100644 --- a/src/plugin/loader.ts +++ b/src/plugin/loader.ts @@ -4,6 +4,8 @@ import * as path from 'path'; import { IEnvironment } from '../types/environment'; import { IPluginManifest } from './plugin'; +import { logger } from '../core'; + export class PluginMetaLoader { constructor(private env: IEnvironment) {} @@ -58,15 +60,26 @@ export class PluginMetaLoader { return json; } - public async loadAll(): Promise { + public async loadAll(ignoreErrors = false): Promise { const dirlist = await fs.readdir(this.env.pluginsPath); const plugins: IPluginManifest[] = []; for (const file of dirlist) { + // Ignore hidden files and non-directories + const fpath = path.join(this.env.pluginsPath, file); + if (file.indexOf('.') === 0 || + file === 'node_modules' || + !(await fs.lstat(fpath)).isDirectory()) { + continue; + } + try { - const plugin = await this.load(path.basename(file)); + const plugin = await this.load(file); plugins.push(plugin); } catch (e) { - console.error(e); + if (ignoreErrors) { + continue; + } + logger.error(e); } } return plugins; diff --git a/src/plugin/manager.ts b/src/plugin/manager.ts index a088a37..4d90b3b 100644 --- a/src/plugin/manager.ts +++ b/src/plugin/manager.ts @@ -1,8 +1,8 @@ import * as path from 'path'; -import { IEnvironment } from '../types/environment'; +import { IEnvironment, Service } from '../types'; import { IPlugin, IPluginManifest, Plugin } from './plugin'; -import { PluginConfiguration } from '../types/plugin-config'; +import { PluginConfiguration } from '../types'; import { PluginConfigurator } from './config'; import { ScopedEventEmitter } from '../util/events'; @@ -114,6 +114,12 @@ export class PluginManager { } public async load(plugin: IPluginManifest): Promise { + // Don't load plugins twice + const ready = this.getLoadedByName(plugin.name); + if (ready) { + return ready; + } + // Check dependencies const requires = []; logger.debug('Loading plugin', plugin.name); @@ -138,7 +144,8 @@ export class PluginManager { try { await this.load(manifest); } catch (e) { - throw new Error(`Plugin dependency "${manifest.name}" loading failed for "${plugin.name}": ${e.stack}`); + logger.error(e.stack); + throw new Error(`Plugin dependency "${manifest.name}" loading failed for "${plugin.name}"`); } } @@ -148,7 +155,8 @@ export class PluginManager { try { await this.npm.installPackage(depm); } catch (e) { - throw new Error(`Plugin dependency "${depm}" installation failed for "${plugin.name}": ${e.stack}`); + logger.error(e.stack); + throw new Error(`Plugin dependency "${depm}" installation failed for "${plugin.name}"`); } } @@ -162,10 +170,21 @@ export class PluginManager { throw new Error(`Plugin "${plugin.name}" loading failed.`); } + // Find default configuration, if it's configured, load the configuration + if (PluginModule.prototype.__defconf && config) { + config.setDefaults(PluginModule.prototype.__defconf); + await config.load(); + } + // Construct an instance of the module logger.debug('Instancing plugin %s', plugin.name); const loaded = new PluginModule(plugin, this.stream, config); try { + // Give the plugin a service + if (PluginModule.prototype.__service) { + loaded.service = new Service(PluginModule.prototype.__service); + } + // Call the initializer if (loaded.initialize) { loaded.initialize.call(loaded); @@ -186,7 +205,8 @@ export class PluginManager { } } } catch (e) { - throw new Error(`Plugin "${plugin.name}" initialization failed: ${e.stack}`); + logger.error(e.stack); + throw new Error(`Plugin "${plugin.name}" initialization failed.`); } this.plugins.set(plugin.name, loaded); @@ -212,7 +232,15 @@ export class PluginManager { this.stream.on('core', 'pluginUnloaded', (mf: IPlugin | string) => { if (typeof mf !== 'string') { + if (mf.manifest && mf.service != null) { + mf.service.die(); + } mf = mf.manifest.name; + } else { + const st = this.getLoadedByName(mf); + if (st && st.manifest && st.service != null) { + st.service.die(); + } } // Delete plugin from the list of loaded plugins diff --git a/src/plugin/plugin.ts b/src/plugin/plugin.ts index 998b877..b91452a 100644 --- a/src/plugin/plugin.ts +++ b/src/plugin/plugin.ts @@ -1,14 +1,17 @@ import { logger } from '../core/logger'; -import { PluginConfiguration } from '../types/plugin-config'; +import { PluginConfiguration, Service } from '../types'; import { ScopedEventEmitter } from '../util/events'; export interface IPlugin { manifest: IPluginManifest; stream: ScopedEventEmitter; config: PluginConfiguration; + service: Service | null; } export class Plugin implements IPlugin { + public service: Service | null = null; + constructor( public manifest: IPluginManifest, public stream: ScopedEventEmitter, @@ -16,23 +19,23 @@ export class Plugin implements IPlugin { public initialize(): void {} - private get name(): string { + protected get name(): string { return this.manifest.name; } - private get version(): string { + protected get version(): string { return this.manifest.version; } - private addEventListener(name: string, fn: any): void { + protected addEventListener(name: string, fn: any): void { this.stream.on(this.name, name, fn); } - private emit(event: string, fn: any): void { + protected emit(event: string, fn: any): void { this.stream.emit.call(this.stream, event, fn); } - private emitTo(name: string, event: string, fn: any): void { + protected emitTo(name: string, event: string, fn: any): void { this.stream.emitTo.call(this.stream, name, event, fn); } } diff --git a/src/types/config.ts b/src/types/config.ts index 2baa4bb..981af63 100644 --- a/src/types/config.ts +++ b/src/types/config.ts @@ -4,11 +4,12 @@ import { IEnvironment } from './environment'; export class Configuration { private config: any = {}; - private dirty = false; + private loaded = false; - constructor(private env: IEnvironment, private file: string, private defaults?: any) {} + constructor(private env: IEnvironment, private file: string, private defaults: any = {}) {} public async load(): Promise { + this.loaded = true; if (!await fs.pathExists(this.file)) { this.saveDefaults(); return; @@ -22,29 +23,51 @@ export class Configuration { } } - public async save(force = false): Promise { - if (force) { - return this.write(); - } - - this.dirty = true; + public async save(): Promise { + return fs.writeJson(this.file, this.config); } - public get(key: string, defval?: any): any { - if (!this.config[key]) { + public get(key: string, defval?: any, from?: any): any { + if (!from) { + from = this.config; + } + + // Recursive object traversal + if (key.indexOf('.') !== -1) { + const split = key.split('.'); + const first = this.get(split[0], null, from); + if (first) { + return this.get(split.slice(1).join('.'), defval, first); + } return defval; } - return this.config[key]; + + // Array indexing + if (key.indexOf('[') !== -1 && key.indexOf(']') !== -1) { + const match = key.match(/\[(\d+)\]/i); + const realKey = key.substr(0, key.indexOf('[')); + if (match != null) { + const index = parseInt(match[1], 10); + if (from[realKey]) { + return from[realKey][index]; + } + } + return defval; + } + + if (!from[key]) { + return defval; + } + + return from[key]; + } + + public setDefaults(defconf: any): void { + this.defaults = defconf; } private saveDefaults(): void { this.config = this.defaults || {}; - this.save(true); + this.save(); } - - private async write(): Promise { - return fs.writeJson(this.file, this.config); - } - - private writeTask(): void {} } diff --git a/src/types/index.ts b/src/types/index.ts index 5c8f0bc..1be031d 100644 --- a/src/types/index.ts +++ b/src/types/index.ts @@ -1,4 +1,6 @@ -export { Configuration } from './config'; -export { IEnvironment } from './environment'; -export { PluginConfiguration } from './plugin-config'; -export { IMessage } from './message'; +export * from './config'; +export * from './environment'; +export * from './plugin-config'; +export * from './message'; +export * from './protocol'; +export * from './service'; diff --git a/src/types/message.ts b/src/types/message.ts index 5b29795..5403eeb 100644 --- a/src/types/message.ts +++ b/src/types/message.ts @@ -1,8 +1,9 @@ import { IPlugin } from '../plugin/plugin'; +import { Protocol } from './protocol'; export interface IMessage { data: any; - source: IPlugin; + source: IPlugin | Protocol; time: Date; resolve(...args: any[]): void; } diff --git a/src/types/protocol.ts b/src/types/protocol.ts new file mode 100644 index 0000000..0a68a77 --- /dev/null +++ b/src/types/protocol.ts @@ -0,0 +1,66 @@ +import { randomBytes } from 'crypto'; + +import { EventEmitter } from 'events'; +import { IPlugin } from '../plugin'; +import { IMessage } from './message'; + +export class Protocol extends EventEmitter { + // override this! + public id = randomBytes(4).toString('hex'); + public type = 'GenericProtocol'; + + protected running = false; + + // This should be set to true when the protocol was stopped for any reason + // at any time. + protected stopped = false; + + // This should be set to true when the protocol fails for any reason + public failed = false; + + constructor(public plugin: IPlugin, public config: any) { + super(); + this.passEvents(); + } + + public get name(): string { + return this.config.name; + } + + public get status(): boolean { + return this.running; + } + + public start(...args: any[]): void { + this.running = true; + this.emit('running'); + } + + public stop(force = false): void { + if (!this.running) { + return; + } + + this.running = false; + this.stopped = true; + + if (force) { + this.failed = true; + } + + this.emit('stopped'); + } + + public resolve(message: IMessage, ...data: any[]): void {} + + protected passEvents(): void { + this.on('stop', (force) => this.stop(force)); + this.on('start', (...args: any[]) => { + // Prevent restarting from here, it's unsafe + if (this.status || (!this.status && this.stopped)) { + return; + } + this.start(...args); + }); + } +} diff --git a/src/types/service.ts b/src/types/service.ts new file mode 100644 index 0000000..fe71c4c --- /dev/null +++ b/src/types/service.ts @@ -0,0 +1,110 @@ +import { Protocol } from './protocol'; + +export class Service { + private protocols: Map = new Map(); + private stopped = false; + + constructor(private protoType: any) {} + + // Add a new protocol to this service + public use(pto: Protocol, autostart = true): Protocol { + // This service is no longer accepting new protocols + if (this.stopped) { + return pto; + } + + const n = pto.name; + if (n == null || n === '') { + throw new Error('Invalid Protocol configuration: Needs an unique name!'); + } + + if (this.protocols.has(n)) { + return this.protocols.get(n) as Protocol; + } + + this.protocols.set(n, pto); + + if (autostart) { + pto.start(); + } + + return pto; + } + + // Stop a protocol running in this service + public stop(pto: string | Protocol, force = false): void { + let proto: Protocol; + if (typeof pto === 'string') { + if (!this.protocols.get(pto)) { + return; + } + proto = this.protocols.get(pto) as Protocol; + } else { + proto = pto; + if (!this.protocols.get(pto.name)) { + // Not part of this service! + return; + } + } + proto.stop(force); + this.protocols.delete(proto.name); + } + + public getProtocolByName(name: string): Protocol | undefined { + return this.protocols.get(name); + } + + // Gracefully stops everything running in this service + public stopAll(): Promise { + return new Promise((resolve, reject) => { + if (this.stopped) { + return resolve(); + } + + let toStop = +this.protocols.size; + let killed = false; + + // Nothing to stop + if (toStop === 0) { + return resolve(); + } + + // If the protocols fail to stop within a time frame, we have to kill + // them in order to prevent lingering connections. + const killTimeout = setTimeout(() => { + killed = true; + this.die(); + resolve(); + }, 10000); + + // Stop everything and wait for them to announce the fact + // that they've stopped. + this.stopped = true; + for (const [name, proto] of this.protocols) { + proto.once('stopped', () => { + if (killed) { + return; + } + + toStop--; + + if (toStop <= 0) { + this.protocols.clear(); + clearTimeout(killTimeout); + resolve(); + } + }); + proto.stop(); + } + }); + } + + // Kills everything running in this service + public die(): void { + this.stopped = true; + for (const [name, proto] of this.protocols) { + proto.stop(true); + } + this.protocols.clear(); + } +} diff --git a/tslint.json b/tslint.json index bc10d1c..1c2e31f 100644 --- a/tslint.json +++ b/tslint.json @@ -84,6 +84,9 @@ "call-signature" ], "forin": false, + "ban-types": { + "function": false + }, "typedef-whitespace": { "options": [ {