server/src/net/ws-adapter.ts

53 lines
1.4 KiB
TypeScript

import * as WebSocket from 'ws';
import { WebSocketAdapter, INestApplicationContext } from '@nestjs/common';
import { MessageMappingProperties } from '@nestjs/websockets';
import { Observable, fromEvent, EMPTY } from 'rxjs';
import { mergeMap, filter } from 'rxjs/operators';
import { Packet } from './packet';
export class WsAdapter implements WebSocketAdapter {
constructor(private app: INestApplicationContext) {}
create(port: number, options: any = {}): any {
return new WebSocket.Server({ port, ...options });
}
bindClientConnect(
server: WebSocket.Server,
callback: (...args: any[]) => void,
) {
server.on('connection', callback);
}
bindMessageHandlers(
client: WebSocket,
handlers: MessageMappingProperties[],
process: (data: any) => Observable<any>,
) {
fromEvent(client, 'message')
.pipe(
mergeMap((data: WebSocket.MessageEvent) =>
this.bindMessageHandler(data, handlers, process),
),
filter((result) => result),
)
.subscribe();
}
bindMessageHandler(
buffer: WebSocket.MessageEvent,
handlers: MessageMappingProperties[],
process: (data: any) => Observable<any>,
): Observable<any> {
const messageHandler = handlers[0];
if (!messageHandler) {
return EMPTY;
}
return process(messageHandler.callback(Packet.from(buffer.data as Buffer)));
}
close(server: WebSocket.Server) {
server.close();
}
}