Channels, Protocols, Services

This commit is contained in:
Evert Prants 2020-11-28 15:34:34 +02:00
parent 3a6f5f2d8e
commit 8809380c86
Signed by: evert
GPG Key ID: 1688DA83D222D0B5
15 changed files with 431 additions and 59 deletions

View File

@ -1,13 +1,110 @@
import { IPlugin } from '../plugin';
import { IMessage, Protocol } from '../types';
import { ScopedEventEmitter } from '../util/events'; import { ScopedEventEmitter } from '../util/events';
export interface IChannel {
name: string;
plugins: string[];
enabled: boolean;
}
export class ChannelManager { export class ChannelManager {
private channels: IChannel[] = [];
constructor(private stream: ScopedEventEmitter) {} constructor(private stream: ScopedEventEmitter) {}
public initialize(configured: any[]): void { public static determinePlugin(source: any): IPlugin | null {
if (source != null) {
if (source.manifest) {
return source;
}
if (source.plugin && source.plugin.manifest) {
return source.plugin;
}
}
return null;
}
public initialize(configured: IChannel[]): void {
this.addPreconfiguredChannels(configured);
for (const event of ['message', 'event', 'special']) { for (const event of ['message', 'event', 'special']) {
this.stream.on('channel', event, (...data: any[]) => { this.stream.on('channel', event, (data: IMessage) => {
// TODO: pass messages between channels const plugin = ChannelManager.determinePlugin(data.source);
if (!plugin) {
return;
}
const source = plugin.manifest.name;
const emitTo = this.getChannelsByPluginName(source);
for (const chan of emitTo) {
if (chan.plugins.length < 2) {
continue;
}
for (const pl of chan.plugins) {
if (pl !== source) {
this.stream.emitTo(pl, event, data);
}
}
}
}); });
} }
} }
private getChannelsByPluginName(plugin: string): IChannel[] {
const list = [];
for (const chan of this.channels) {
if (chan.enabled === false) {
continue;
}
if (chan.plugins.indexOf(plugin) !== -1) {
list.push(chan);
}
}
return list;
}
private addPreconfiguredChannels(channels: IChannel[]): void {
for (const chan of channels) {
if (!chan.name) {
throw new Error('Channel name is mandatory.');
}
if (!chan.plugins) {
throw new Error('Channel plugins list is mandatory.');
}
this.channels.push(chan);
}
}
public getChannelByName(name: string): IChannel | null {
for (const chan of this.channels) {
if (chan.name === name) {
return chan;
}
}
return null;
}
public addChannel(chan: IChannel): IChannel {
const exists = this.getChannelByName(chan.name);
if (exists) {
throw new Error('Channel by that name already exists!');
}
this.channels.push(chan);
return chan;
}
public removeChannel(chan: string | IChannel): void {
if (typeof chan === 'string') {
const getchan = this.getChannelByName(chan);
if (!getchan) {
throw new Error('Channel by that name doesn\'t exists!');
}
chan = getchan;
}
this.channels.splice(this.channels.indexOf(chan), 1);
}
} }

View File

@ -3,12 +3,23 @@ import util from 'util';
export class Logger { export class Logger {
public timestamp = 'dd/mm/yy HH:MM:ss'; public timestamp = 'dd/mm/yy HH:MM:ss';
private console = [console.log, console.warn, console.error];
constructor() {} constructor() {}
public setReadline(rl: any): void {
for (const index in this.console) {
const old = this.console[index];
this.console[index] = (...data: any[]): void => {
rl.output.write('\x1b[2K\r');
old.apply(null, data);
};
}
}
private write(ltype: string, ...data: any[]): void { private write(ltype: string, ...data: any[]): void {
const message = []; const message = [];
let cfunc = console.log; let cfunc = this.console[0];
if (this.timestamp) { if (this.timestamp) {
message.push(`[${dateFmt(new Date(), this.timestamp)}]`); message.push(`[${dateFmt(new Date(), this.timestamp)}]`);
@ -18,16 +29,16 @@ export class Logger {
case 'info': case 'info':
message.push('[ INFO]'); message.push('[ INFO]');
break; break;
case 'error': case 'debug':
message.push('[ERROR]'); message.push('[DEBUG]');
cfunc = console.error;
break; break;
case 'warn': case 'warn':
message.push('[ WARN]'); message.push('[ WARN]');
cfunc = console.warn; cfunc = this.console[1];
break; break;
case 'debug': case 'error':
message.push('[DEBUG]'); message.push('[ERROR]');
cfunc = this.console[2];
break; break;
} }

View File

@ -0,0 +1,6 @@
export function Configurable(defconf: any): Function {
return (constructor: Function): void => {
constructor.prototype.__defconf = defconf;
};
}

View File

@ -9,13 +9,14 @@ export function DependencyLoad(dep: string): (...args: any[]) => void {
descriptor.value = function(...args: any[]): void { descriptor.value = function(...args: any[]): void {
const self = this as any; const self = this as any;
self.stream.on(self.name, 'pluginLoaded', (plugin: any) => { self.stream.on(self.name, 'pluginLoaded', (plugin: any) => {
if (typeof plugin !== 'string') { if (typeof plugin === 'string') {
plugin = plugin.metadata.name;
}
if (plugin !== dep) {
return; return;
} }
originalMethod.apply(self, plugin); const nameof = plugin.manifest.name;
if (nameof !== dep) {
return;
}
originalMethod.call(self, plugin);
}); });
}; };
// Set the function to be autoexecuted when the plugin is initialized. // Set the function to be autoexecuted when the plugin is initialized.
@ -34,16 +35,16 @@ export function DependencyUnload(dep: string): (...args: any[]) => void {
descriptor.value = function(...args: any[]): void { descriptor.value = function(...args: any[]): void {
const self = this as any; const self = this as any;
self.stream.on(self.name, 'pluginUnloaded', (plugin: any) => { self.stream.on(self.name, 'pluginUnloaded', (plugin: any) => {
let nameof = plugin;
if (typeof plugin !== 'string') { if (typeof plugin !== 'string') {
plugin = plugin.metadata.name; nameof = plugin.manifest.name;
} }
if (plugin !== dep) { if (nameof !== dep) {
return; return;
} }
originalMethod.apply(self, plugin); originalMethod.call(self, plugin);
}); });
}; };
// Set the function to be autoexecuted when the plugin is initialized.
descriptor.value.prototype.__autoexec = 1; descriptor.value.prototype.__autoexec = 1;
return descriptor; return descriptor;
}; };

View File

@ -1,3 +1,5 @@
export { EventListener } from './eventlistener'; export * from './eventlistener';
export { Auto } from './auto'; export * from './auto';
export { DependencyLoad, DependencyUnload } from './dependency'; export * from './dependency';
export * from './configurable';
export * from './service';

View File

@ -0,0 +1,6 @@
export function InjectService(servtype: any): Function {
return (constructor: Function): void => {
constructor.prototype.__service = servtype;
};
}

View File

@ -4,6 +4,8 @@ import * as path from 'path';
import { IEnvironment } from '../types/environment'; import { IEnvironment } from '../types/environment';
import { IPluginManifest } from './plugin'; import { IPluginManifest } from './plugin';
import { logger } from '../core';
export class PluginMetaLoader { export class PluginMetaLoader {
constructor(private env: IEnvironment) {} constructor(private env: IEnvironment) {}
@ -58,15 +60,26 @@ export class PluginMetaLoader {
return json; return json;
} }
public async loadAll(): Promise<IPluginManifest[]> { public async loadAll(ignoreErrors = false): Promise<IPluginManifest[]> {
const dirlist = await fs.readdir(this.env.pluginsPath); const dirlist = await fs.readdir(this.env.pluginsPath);
const plugins: IPluginManifest[] = []; const plugins: IPluginManifest[] = [];
for (const file of dirlist) { for (const file of dirlist) {
// Ignore hidden files and non-directories
const fpath = path.join(this.env.pluginsPath, file);
if (file.indexOf('.') === 0 ||
file === 'node_modules' ||
!(await fs.lstat(fpath)).isDirectory()) {
continue;
}
try { try {
const plugin = await this.load(path.basename(file)); const plugin = await this.load(file);
plugins.push(plugin); plugins.push(plugin);
} catch (e) { } catch (e) {
console.error(e); if (ignoreErrors) {
continue;
}
logger.error(e);
} }
} }
return plugins; return plugins;

View File

@ -1,8 +1,8 @@
import * as path from 'path'; import * as path from 'path';
import { IEnvironment } from '../types/environment'; import { IEnvironment, Service } from '../types';
import { IPlugin, IPluginManifest, Plugin } from './plugin'; import { IPlugin, IPluginManifest, Plugin } from './plugin';
import { PluginConfiguration } from '../types/plugin-config'; import { PluginConfiguration } from '../types';
import { PluginConfigurator } from './config'; import { PluginConfigurator } from './config';
import { ScopedEventEmitter } from '../util/events'; import { ScopedEventEmitter } from '../util/events';
@ -114,6 +114,12 @@ export class PluginManager {
} }
public async load(plugin: IPluginManifest): Promise<IPlugin> { public async load(plugin: IPluginManifest): Promise<IPlugin> {
// Don't load plugins twice
const ready = this.getLoadedByName(plugin.name);
if (ready) {
return ready;
}
// Check dependencies // Check dependencies
const requires = []; const requires = [];
logger.debug('Loading plugin', plugin.name); logger.debug('Loading plugin', plugin.name);
@ -138,7 +144,8 @@ export class PluginManager {
try { try {
await this.load(manifest); await this.load(manifest);
} catch (e) { } catch (e) {
throw new Error(`Plugin dependency "${manifest.name}" loading failed for "${plugin.name}": ${e.stack}`); logger.error(e.stack);
throw new Error(`Plugin dependency "${manifest.name}" loading failed for "${plugin.name}"`);
} }
} }
@ -148,7 +155,8 @@ export class PluginManager {
try { try {
await this.npm.installPackage(depm); await this.npm.installPackage(depm);
} catch (e) { } catch (e) {
throw new Error(`Plugin dependency "${depm}" installation failed for "${plugin.name}": ${e.stack}`); logger.error(e.stack);
throw new Error(`Plugin dependency "${depm}" installation failed for "${plugin.name}"`);
} }
} }
@ -162,10 +170,21 @@ export class PluginManager {
throw new Error(`Plugin "${plugin.name}" loading failed.`); throw new Error(`Plugin "${plugin.name}" loading failed.`);
} }
// Find default configuration, if it's configured, load the configuration
if (PluginModule.prototype.__defconf && config) {
config.setDefaults(PluginModule.prototype.__defconf);
await config.load();
}
// Construct an instance of the module // Construct an instance of the module
logger.debug('Instancing plugin %s', plugin.name); logger.debug('Instancing plugin %s', plugin.name);
const loaded = new PluginModule(plugin, this.stream, config); const loaded = new PluginModule(plugin, this.stream, config);
try { try {
// Give the plugin a service
if (PluginModule.prototype.__service) {
loaded.service = new Service(PluginModule.prototype.__service);
}
// Call the initializer // Call the initializer
if (loaded.initialize) { if (loaded.initialize) {
loaded.initialize.call(loaded); loaded.initialize.call(loaded);
@ -186,7 +205,8 @@ export class PluginManager {
} }
} }
} catch (e) { } catch (e) {
throw new Error(`Plugin "${plugin.name}" initialization failed: ${e.stack}`); logger.error(e.stack);
throw new Error(`Plugin "${plugin.name}" initialization failed.`);
} }
this.plugins.set(plugin.name, loaded); this.plugins.set(plugin.name, loaded);
@ -212,7 +232,15 @@ export class PluginManager {
this.stream.on('core', 'pluginUnloaded', (mf: IPlugin | string) => { this.stream.on('core', 'pluginUnloaded', (mf: IPlugin | string) => {
if (typeof mf !== 'string') { if (typeof mf !== 'string') {
if (mf.manifest && mf.service != null) {
mf.service.die();
}
mf = mf.manifest.name; mf = mf.manifest.name;
} else {
const st = this.getLoadedByName(mf);
if (st && st.manifest && st.service != null) {
st.service.die();
}
} }
// Delete plugin from the list of loaded plugins // Delete plugin from the list of loaded plugins

View File

@ -1,14 +1,17 @@
import { logger } from '../core/logger'; import { logger } from '../core/logger';
import { PluginConfiguration } from '../types/plugin-config'; import { PluginConfiguration, Service } from '../types';
import { ScopedEventEmitter } from '../util/events'; import { ScopedEventEmitter } from '../util/events';
export interface IPlugin { export interface IPlugin {
manifest: IPluginManifest; manifest: IPluginManifest;
stream: ScopedEventEmitter; stream: ScopedEventEmitter;
config: PluginConfiguration; config: PluginConfiguration;
service: Service | null;
} }
export class Plugin implements IPlugin { export class Plugin implements IPlugin {
public service: Service | null = null;
constructor( constructor(
public manifest: IPluginManifest, public manifest: IPluginManifest,
public stream: ScopedEventEmitter, public stream: ScopedEventEmitter,
@ -16,23 +19,23 @@ export class Plugin implements IPlugin {
public initialize(): void {} public initialize(): void {}
private get name(): string { protected get name(): string {
return this.manifest.name; return this.manifest.name;
} }
private get version(): string { protected get version(): string {
return this.manifest.version; return this.manifest.version;
} }
private addEventListener(name: string, fn: any): void { protected addEventListener(name: string, fn: any): void {
this.stream.on(this.name, name, fn); this.stream.on(this.name, name, fn);
} }
private emit(event: string, fn: any): void { protected emit(event: string, fn: any): void {
this.stream.emit.call(this.stream, event, fn); this.stream.emit.call(this.stream, event, fn);
} }
private emitTo(name: string, event: string, fn: any): void { protected emitTo(name: string, event: string, fn: any): void {
this.stream.emitTo.call(this.stream, name, event, fn); this.stream.emitTo.call(this.stream, name, event, fn);
} }
} }

View File

@ -4,11 +4,12 @@ import { IEnvironment } from './environment';
export class Configuration { export class Configuration {
private config: any = {}; private config: any = {};
private dirty = false; private loaded = false;
constructor(private env: IEnvironment, private file: string, private defaults?: any) {} constructor(private env: IEnvironment, private file: string, private defaults: any = {}) {}
public async load(): Promise<void> { public async load(): Promise<void> {
this.loaded = true;
if (!await fs.pathExists(this.file)) { if (!await fs.pathExists(this.file)) {
this.saveDefaults(); this.saveDefaults();
return; return;
@ -22,29 +23,51 @@ export class Configuration {
} }
} }
public async save(force = false): Promise<void> { public async save(): Promise<void> {
if (force) { return fs.writeJson(this.file, this.config);
return this.write();
}
this.dirty = true;
} }
public get(key: string, defval?: any): any { public get(key: string, defval?: any, from?: any): any {
if (!this.config[key]) { if (!from) {
from = this.config;
}
// Recursive object traversal
if (key.indexOf('.') !== -1) {
const split = key.split('.');
const first = this.get(split[0], null, from);
if (first) {
return this.get(split.slice(1).join('.'), defval, first);
}
return defval; return defval;
} }
return this.config[key];
// Array indexing
if (key.indexOf('[') !== -1 && key.indexOf(']') !== -1) {
const match = key.match(/\[(\d+)\]/i);
const realKey = key.substr(0, key.indexOf('['));
if (match != null) {
const index = parseInt(match[1], 10);
if (from[realKey]) {
return from[realKey][index];
}
}
return defval;
}
if (!from[key]) {
return defval;
}
return from[key];
}
public setDefaults(defconf: any): void {
this.defaults = defconf;
} }
private saveDefaults(): void { private saveDefaults(): void {
this.config = this.defaults || {}; this.config = this.defaults || {};
this.save(true); this.save();
} }
private async write(): Promise<void> {
return fs.writeJson(this.file, this.config);
}
private writeTask(): void {}
} }

View File

@ -1,4 +1,6 @@
export { Configuration } from './config'; export * from './config';
export { IEnvironment } from './environment'; export * from './environment';
export { PluginConfiguration } from './plugin-config'; export * from './plugin-config';
export { IMessage } from './message'; export * from './message';
export * from './protocol';
export * from './service';

View File

@ -1,8 +1,9 @@
import { IPlugin } from '../plugin/plugin'; import { IPlugin } from '../plugin/plugin';
import { Protocol } from './protocol';
export interface IMessage { export interface IMessage {
data: any; data: any;
source: IPlugin; source: IPlugin | Protocol;
time: Date; time: Date;
resolve(...args: any[]): void; resolve(...args: any[]): void;
} }

66
src/types/protocol.ts Normal file
View File

@ -0,0 +1,66 @@
import { randomBytes } from 'crypto';
import { EventEmitter } from 'events';
import { IPlugin } from '../plugin';
import { IMessage } from './message';
export class Protocol extends EventEmitter {
// override this!
public id = randomBytes(4).toString('hex');
public type = 'GenericProtocol';
protected running = false;
// This should be set to true when the protocol was stopped for any reason
// at any time.
protected stopped = false;
// This should be set to true when the protocol fails for any reason
public failed = false;
constructor(public plugin: IPlugin, public config: any) {
super();
this.passEvents();
}
public get name(): string {
return this.config.name;
}
public get status(): boolean {
return this.running;
}
public start(...args: any[]): void {
this.running = true;
this.emit('running');
}
public stop(force = false): void {
if (!this.running) {
return;
}
this.running = false;
this.stopped = true;
if (force) {
this.failed = true;
}
this.emit('stopped');
}
public resolve(message: IMessage, ...data: any[]): void {}
protected passEvents(): void {
this.on('stop', (force) => this.stop(force));
this.on('start', (...args: any[]) => {
// Prevent restarting from here, it's unsafe
if (this.status || (!this.status && this.stopped)) {
return;
}
this.start(...args);
});
}
}

110
src/types/service.ts Normal file
View File

@ -0,0 +1,110 @@
import { Protocol } from './protocol';
export class Service {
private protocols: Map<string, Protocol> = new Map();
private stopped = false;
constructor(private protoType: any) {}
// Add a new protocol to this service
public use(pto: Protocol, autostart = true): Protocol {
// This service is no longer accepting new protocols
if (this.stopped) {
return pto;
}
const n = pto.name;
if (n == null || n === '') {
throw new Error('Invalid Protocol configuration: Needs an unique name!');
}
if (this.protocols.has(n)) {
return this.protocols.get(n) as Protocol;
}
this.protocols.set(n, pto);
if (autostart) {
pto.start();
}
return pto;
}
// Stop a protocol running in this service
public stop(pto: string | Protocol, force = false): void {
let proto: Protocol;
if (typeof pto === 'string') {
if (!this.protocols.get(pto)) {
return;
}
proto = this.protocols.get(pto) as Protocol;
} else {
proto = pto;
if (!this.protocols.get(pto.name)) {
// Not part of this service!
return;
}
}
proto.stop(force);
this.protocols.delete(proto.name);
}
public getProtocolByName(name: string): Protocol | undefined {
return this.protocols.get(name);
}
// Gracefully stops everything running in this service
public stopAll(): Promise<void> {
return new Promise((resolve, reject) => {
if (this.stopped) {
return resolve();
}
let toStop = +this.protocols.size;
let killed = false;
// Nothing to stop
if (toStop === 0) {
return resolve();
}
// If the protocols fail to stop within a time frame, we have to kill
// them in order to prevent lingering connections.
const killTimeout = setTimeout(() => {
killed = true;
this.die();
resolve();
}, 10000);
// Stop everything and wait for them to announce the fact
// that they've stopped.
this.stopped = true;
for (const [name, proto] of this.protocols) {
proto.once('stopped', () => {
if (killed) {
return;
}
toStop--;
if (toStop <= 0) {
this.protocols.clear();
clearTimeout(killTimeout);
resolve();
}
});
proto.stop();
}
});
}
// Kills everything running in this service
public die(): void {
this.stopped = true;
for (const [name, proto] of this.protocols) {
proto.stop(true);
}
this.protocols.clear();
}
}

View File

@ -84,6 +84,9 @@
"call-signature" "call-signature"
], ],
"forin": false, "forin": false,
"ban-types": {
"function": false
},
"typedef-whitespace": { "typedef-whitespace": {
"options": [ "options": [
{ {