Want to Contribute to us or want to have 15k+ Audience read your Article ? Or Just want to make a strong Backlink?

How to Trace Websocket Events in NestJS – [Case Study]

Cowl Picture by Enrico Mantegazza

In NestJS, HTTP requests are traced by req.id parameter. You possibly can write your {custom} id generator, or use default id generated by NestJS or Specific or Fastify. Nevertheless, there isn’t a such idea of request id, or request tracing in the case of Websockets / Socket.IO.

Although NestJS permits use of interceptors, filters or guards in Gateways, one function I’ve felt lacking is request id. With out request id, you’ll be able to’t hint websocket occasions and that’s not excellent for observability.

On this submit, I’ll clarify how one can hint websocket occasions in NestJS. For this, we’ll create a {custom} WebSocketAdapter. In doing so, we’ll dive deep into supply code of NestJS and remedy some challenges!




What’s a WebSocketAdapter?

NestJS makes use of Adapter Pattern to summary underlying libraries out of your code. For instance NestJS does not care in case you are utilizing Fastify or Specific as an HTTP library. If you wish to use one other library, you simply have to implement an adapter outlined by NestJS.

There are numerous websocket libraries in Node.js, and likewise there’s Socket.IO, which makes issues simpler for builders. On this submit, we’ll modify IoAdapter, from the present official NestJS bundle: @nestjs/platform-socket.io.

Socket.IO is a bidirectional communication protocol, than can work over HTTP or Websockets. We’ll use it for instance websocket library on this submit, however precise particulars of the Socket.IO protocol is out of scope for this submit. I am planning to publish an article about how Socket.IO works, in the event you do not need to miss out, subscribe to Noop Today!



Methods to Write a WebSocketAdapter in NestJS

To create a {custom} WebSocketAdapter in NestJS, it’s good to implement WebSocketAdapter interface from @nestjs/common.

// https://github.com/nestjs/nest/blob/grasp/packages/widespread/interfaces/websockets/web-socket-adapter.interface.ts

export interface WebSocketAdapter<TServer = any, TClient = any, TOptions = any> {
    create(port: quantity, choices?: TOptions): TServer;
    bindClientConnect(server: TServer, callback: Operate): any;
    bindClientDisconnect?(consumer: TClient, callback: Operate): any;
    bindMessageHandlers(consumer: TClient, handlers: WsMessageHandler[], remodel: (knowledge: any) => Observable<any>): any;
    shut(server: TServer): any;
}
Enter fullscreen mode

Exit fullscreen mode

In a typical Node.js utility involving websockets / socket.io, you in all probability have the identical performance differently. Since we’re utilizing NestJS as our framework, lets see how we should always construction our code.

A typical socket.io server in Node.js

const specific = require('specific');
const app = specific();
const http = require('http');
const server = http.createServer(app);
const { Server } = require("socket.io");

// create in WebSocketAdapter
const io = new Server(server);

// bindClientConnect in WebSocketAdapter
io.on('connection', (socket) => {
  console.log('handleConnection');

  // bindClientDisconnect in WebSocketAdapter
  socket.on('disconnect', () => {
    console.log('handleDisconnect');
  })

  // bindMessageHandlers in WebSocketAdapter
  socket.on('foo', (knowledge, callback) => {
    console.log('handleFoo');
    callback('OK');
  })
});

server.pay attention(3000, () => {
  console.log('listening on *:3000');
});

// shut in WebSocketAdapter
setTimeout(() => { io.shut() }, 10000);
Enter fullscreen mode

Exit fullscreen mode

NestJS supplies an AbstractWsAdapter class to make issues simpler for us. So as an alternative of this:

import { WebSocketAdapter } from '@nestjs/widespread';

export class MyWebSocketAdapter implements WebSocketAdapter {}
Enter fullscreen mode

Exit fullscreen mode

We are able to do that:

import { AbstractWsAdapter } from '@nestjs/websockets';

export class MyWebsocketAdapter extends AbstractWsAdapter {}
Enter fullscreen mode

Exit fullscreen mode

The distinction is that, AbstractWsAdapter supplies default implementations for some strategies, however you continue to have to implement create and bindMessageHandlers strategies.

We’ll act sensible and use present code from the official bundle, and make modifications solely in required elements.



Use Current IoAdapter from @nestjs/platform-socket.io

That is the IoAdapter from NestJS official packages, that acts as a bridge between NestJS and Socket.IO.

// https://github.com/nestjs/nest/blob/grasp/packages/platform-socket.io/adapters/io-adapter.ts
import { isFunction, isNil } from '@nestjs/widespread/utils/shared.utils';
import {
  AbstractWsAdapter,
  MessageMappingProperties,
} from '@nestjs/websockets';
import { DISCONNECT_EVENT } from '@nestjs/websockets/constants';
import { fromEvent, Observable } from 'rxjs';
import { filter, first, map, mergeMap, share, takeUntil } from 'rxjs/operators';
import { Server, ServerOptions, Socket } from 'socket.io';

export class IoAdapter extends AbstractWsAdapter {
  public create(
    port: quantity,
    choices?: ServerOptions & { namespace?: string; server?: any },
  ): Server {
    if (!choices) {
      return this.createIOServer(port);
    }
    const { namespace, server, ...choose } = choices;
    return server && isFunction(server.of)
      ? server.of(namespace)
      : namespace
      ? this.createIOServer(port, choose).of(namespace)
      : this.createIOServer(port, choose);
  }

  public createIOServer(port: quantity, choices?: any): any {
    if (this.httpServer && port === 0) {
      return new Server(this.httpServer, choices);
    }
    return new Server(port, choices);
  }

  public bindMessageHandlers(
    socket: Socket,
    handlers: MessageMappingProperties[],
    remodel: (knowledge: any) => Observable<any>,
  ) {
    const disconnect$ = fromEvent(socket, DISCONNECT_EVENT).pipe(
      share(),
      first(),
    );

    handlers.forEach(({ message, callback }) => {
      const supply$ = fromEvent(socket, message).pipe(
        mergeMap((payload: any) => {
          const { knowledge, ack } = this.mapPayload(payload);
          return remodel(callback(knowledge, ack)).pipe(
            filter((response: any) => !isNil(response)),
            map((response: any) => [response, ack]),
          );
        }),
        takeUntil(disconnect$),
      );
      supply$.subscribe(([response, ack]) => {
        if (response.occasion) {
          return socket.emit(response.occasion, response.knowledge);
        }
        isFunction(ack) && ack(response);
      });
    });
  }

  public mapPayload(payload: unknown): { knowledge: any; ack?: Operate } {
    if (!Array.isArray(payload)) {
      if (isFunction(payload)) {
        return { knowledge: undefined, ack: payload as Operate };
      }
      return { knowledge: payload };
    }
    const lastElement = payload[payload.length - 1];
    const isAck = isFunction(lastElement);
    if (isAck) {
      const dimension = payload.size - 1;
      return {
        knowledge: dimension === 1 ? payload[0] : payload.slice(0, dimension),
        ack: lastElement,
      };
    }
    return { knowledge: payload };
  }
}
Enter fullscreen mode

Exit fullscreen mode

Let’s have a look at what does all of the strategies do:

  • create methodology is accountable for creating the socket.io server, we needn’t modify right here.
  • createIoServer is an helper methodology for create
  • bindMessageHandlers is accountable for delivering socket.io messages to our handlers. We’ll modify this for sending further arguments to our handlers.
  • mapPayload parses payload from socket.io to { ack, knowledge }, we needn’t modify right here.

So, we simply have to deal with bindMessageHandlers.



Modifying The bindMessageHandlers

public bindMessageHandlers(
  socket: Socket,
  handlers: MessageMappingProperties[],
  remodel: (knowledge: any) => Observable<any>,
) {
  const disconnect$ = fromEvent(socket, DISCONNECT_EVENT).pipe(
    share(),
    first(),
  );

  handlers.forEach(({ message, callback }) => {
    const supply$ = fromEvent(socket, message).pipe(
      mergeMap((payload: any) => {
        const { knowledge, ack } = this.mapPayload(payload);
        return remodel(callback(knowledge, ack)).pipe(
          filter((response: any) => !isNil(response)),
          map((response: any) => [response, ack]),
        );
      }),
      takeUntil(disconnect$),
    );
    supply$.subscribe(([response, ack]) => {
      if (response.occasion) {
        return socket.emit(response.occasion, response.knowledge);
      }
      isFunction(ack) && ack(response);
    });
  });
}
Enter fullscreen mode

Exit fullscreen mode

First, lets perceive what’s handlers on this. It has the kind: MessageMappingProperties:

export interface MessageMappingProperties  any;

Enter fullscreen mode

Exit fullscreen mode

In socket.io phrases: socket.on(message, callback), message will match the occasion title, and callback would be the precise handler operate.

So, so as to move parameters to our handler strategies, we should move parameters to the **callback**.

Lets take a more in-depth take a look at code with extra commentary:

// message --> eventName
// callback --> eventHandler
handlers.forEach(({ message, callback }) => {
    // RxJS equivelant for --> socket.on(message)
    const supply$ = fromEvent(socket, message).pipe(
        // payload is socket.io payload
        mergeMap((payload: any) => {
            const { knowledge, ack } = this.mapPayload(payload);
            // remodel, transforms our callback handler to observable
            // callback operate is known as with TWO parameters
            return remodel(callback(knowledge, ack)).pipe(
                filter((response: any) => !isNil(response)),
                // If callback returns any response, wrap it with ack
                map((response: any) => [response, ack]),
            );
        }),
        takeUntil(disconnect$),
    );
    supply$.subscribe(([response, ack]) => {
        // In case you return { occasion: 'foo', knowledge: 'bar' } from handler
        // See: https://docs.nestjs.com/websockets/gateways#multiple-responses
        if (response.occasion) {
            return socket.emit(response.occasion, response.knowledge);
        }
        // If consumer accepts response, ship response
        isFunction(ack) && ack(response);
    });
});
Enter fullscreen mode

Exit fullscreen mode



A Little Drawback in NestJS

In case you’ve ever used IoAdapter in your venture, you would possibly notice though the adapter passes two arguments to the callback operate, your handler strategies do not obtain them in full. Let me clarify:

Say you’ve got a message handler:

class MySocketGateway {
  @SubscribeMessage('foo')
  fooHandler(argument1, argument2, argument3){
    console.log({
      argument1, // Shopper
      argument2, // Physique
      argument3, // undefined
    })
  }
}
Enter fullscreen mode

Exit fullscreen mode

In case you go forward and take a look at one thing like this, you will note that 1st argument is Socket / Shopper object, 2nd argument is physique BUT third argument is undefined. Two questions come to thoughts:

  • The place did 1st argument got here from?
  • The place did my ack operate disappear?

Lets go along with the primary query, if we dive deep into underlying NestJS code, we will discover the precise place the place adapter.bindMessageHandlers is known as.

// https://github.com/nestjs/nest/blob/21bd8c37364a2a2591e3de9bfb88d32d09431438/packages/websockets/web-sockets-controller.ts#L147

public subscribeMessages<T = any>(
    subscribersMap: MessageMappingProperties[],
    consumer: T,
    occasion: NestGateway,
) {
    const adapter = this.config.getIoAdapter();
    const handlers = subscribersMap.map(({ callback, message }) => ({
      message,
      callback: callback.bind(occasion, consumer),
    }));
    adapter.bindMessageHandlers(consumer, handlers, knowledge =>
      fromPromise(this.pickResult(knowledge)).pipe(mergeAll()),
    );
}
Enter fullscreen mode

Exit fullscreen mode

In case you look carefully, callback is sure with two parameters. bind methodology has one required argument for this, and remainder of the arguments are provided as arguments to the operate – *callback. *Extra data about this in mozilla docs. So, our 1st argument is fastened by NestJS to be consumer object.



How about ack operate, why does it disapper?

This was very exhausting for me to debug however I’ve discovered why does that occur, and how one can repair it!

The issue is just not concerning the ack operate. I’ve tried sending completely different parameters, or sending greater than two parameters to callback operate however solely the primary parameter results in handler, and relaxation is gone.

There’s a helper class [WsContextCreator](https://github.com/nestjs/nest/blob/aa3ad07c1023b71edda6b6ea53374787d3712231/packages/websockets/context/ws-context-creator.ts#L56) in @nestjs/websockets, that’s accountable for attaching Interceptors, Guards, Pipes and ExceptionHandlers to handlers, AND managing parameters despatched to handler strategies!

That helper class is accountable for disappearing parameters! The explanation it makes different parameters disappear is expounded to this use case:

class MyWebsocketGateway {
    @SubscribeMessage('foo')
    handleFoo(consumer: Shopper, physique){
        console.log({ physique });
    }

    @SubscribeMessage('bar')
    handleBar(@MessageBody() physique){
        console.log({ physique })
    }
}
Enter fullscreen mode

Exit fullscreen mode

If you wish to adorn your handler parameters with @MessageBody or @ConnectedSocket, the WsContextCreator assigns these parameters for you. You possibly can even do experiments like this:

class MyWebsocketGateway {
    @SubscribeMessage('foo')
    handleFoo(arg1, arg2, arg3, @MessageBody() physique){
        console.log({ arg1, arg2, arg3 }) // All undefined
        console.log({ physique })
    }
}
Enter fullscreen mode

Exit fullscreen mode

I will not go into line by line particulars of how does this occur, however I’ll present you a conceptual clarification and if you wish to examine extra, you’ll be able to at all times take a look at supply code of @nestjs/websockets for a very good train.

The important thing level is WsContextCreator extracts knowledge from preliminary arguments to the callback operate, rearranges them and sends these arguments to your handler operate. Through the extraction course of, WsContextCreator seems up metadata of the handler operate:

// https://github.com/nestjs/nest/blob/aa3ad07c1023b71edda6b6ea53374787d3712231/packages/websockets/context/ws-context-creator.ts#L163

const metadata = this.contextUtils.reflectCallbackMetadata<TMetadata>(
    occasion,
    methodName,
    PARAM_ARGS_METADATA,
) || DEFAULT_CALLBACK_METADATA;
Enter fullscreen mode

Exit fullscreen mode

In case your operate does not have PARAM_ARGS_METADATA, NestJS assigns default callback metadata. Guess what, DEFAULT_CALLBACK_METADATA assigns solely two parameters to our handler parameters – consumer and payload.

// https://github.com/nestjs/nest/blob/grasp/packages/websockets/context/ws-metadata-constants.ts

import { WsParamtype } from '../enums/ws-paramtype.enum';

export const DEFAULT_CALLBACK_METADATA = {
  [`${WsParamtype.PAYLOAD}:1`]: { index: 1, knowledge: undefined, pipes: [] },
  [`${WsParamtype.SOCKET}:0`]: { index: 0, knowledge: undefined, pipes: [] },
};
Enter fullscreen mode

Exit fullscreen mode



Resolution for Offering Further Parameters to Handlers

NestJS exports an undocumented helper operate known as assignCustomParameterMetadata, which permits us to override DEFAULT_CALLBACK_METADATA and use our personal metadata.

// https://github.com/nestjs/nest/blob/5aeb40b/packages/widespread/utils/assign-custom-metadata.util.ts#L9

export operate assignCustomParameterMetadata(
  args: Document<quantity, RouteParamMetadata>,
  paramtype: quantity | string,
  index: quantity,
  manufacturing unit: CustomParamFactory,
  knowledge?: ParamData,
  ...pipes: (Sort<PipeTransform> | PipeTransform)[]
) {
  return {
    ...args,
    [`${paramtype}${CUSTOM_ROUTE_AGRS_METADATA}:${index}`]: {
      index,
      manufacturing unit,
      knowledge,
      pipes,
    },
  };
}
Enter fullscreen mode

Exit fullscreen mode

The operate is just not self-explanatory, however I feel will probably be way more clear after instance under. Necessary factor is “the place ought to we use this operate?”.

This operate is supposed to switch PARAM_ARGS_METADATA, and usually that metadata is outlined by decorators resembling @ConnectedSocket or @MessageBody. So, lets adjust to the conventions and create ourselves a brand new parameter decorator: @Ack.



Creating the @Ack Decorator

In case you are not accustomed to the idea of decorators, or decorators in NestJS, you’ll be able to learn my earlier article: Using Custom Decorators in NestJS.

import { PARAM_ARGS_METADATA } from '@nestjs/websockets/constants';

export operate Ack(): ParameterDecorator {
// index is the index of parameter we embellished
// It's not associated to getArgByIndex under!
    return operate(goal, key, index) {
        // Get present metadata of the handler
        const args = Replicate.getMetadata(PARAM_ARGS_METADATA, goal.constructor, key) || {};
        // Lengthen with new metadata
        const meta = assignCustomParameterMetadata(args, 'Ack', index, (knowledge, enter: ExecutionContext) => {
            // This permits NestJS to extract required parameter from preliminary arguments provided to 'callback' operate
            // Index right here must match index of the callback parameters
            // 0 --> Shopper
            // 1 --> Payload from IoAdapter
            // 2 --> Ack from IoAdapter
            // 0 is at all times consumer, however remainder of the parameters depend upon the underlying adapter.
            return enter.getArgByIndex<Operate>(2);
        });
        Replicate.defineMetadata(PARAM_ARGS_METADATA, meta, goal.constructor, key);
    };
}
Enter fullscreen mode

Exit fullscreen mode

Now we will attain our ack operate from the handler as follows:

class MyWebSocketGateway {
    @SubscribeMessage('foo')
    fooHandler(@MessageBody() physique, @Ack() ack){
        ack('foo_response');
    }
}
Enter fullscreen mode

Exit fullscreen mode



Lets Transfer On With RequestId

That basically was a case research, is not it? Lets add a request id to our handler parameters in case we need to hint our requests.

First we have to modify our adapter, in the event you bear in mind from above, the adapter provides arguments to the callback operate. We are able to create a request id in our adapter and provide it to the callback.

  // You may make your personal implementation for this.
non-public createRequestId(){
  return crypto.randomUUID();
}

public bindMessageHandlers(
  socket: Socket,
  handlers: MessageMappingProperties[],
  remodel: (knowledge: any) => Observable<any>,
) {
    const disconnect$ = fromEvent(socket, DISCONNECT_EVENT).pipe(
      share(),
      first(),
    );

    handlers.forEach(({ message, callback }) => {
      const supply$ = fromEvent(socket, message).pipe(
        mergeMap((payload: any) => {
          const { knowledge, ack } = this.mapPayload(payload);
          // <-- Modified Part Begin
          const requestId = this.createRequestId();
          return remodel(callback(knowledge, ack, requestId))
          // Modified Part Finish -->
          .pipe(
            filter((response: any) => !isNil(response)),
            map((response: any) => [response, ack]),
          );
        }),
        takeUntil(disconnect$),
      );
      supply$.subscribe(([response, ack]) => {
        if (response.occasion) {
          return socket.emit(response.occasion, response.knowledge);
        }
        isFunction(ack) && ack(response);
      });
    });
}
Enter fullscreen mode

Exit fullscreen mode

That’s all of the modification our adapter wants. Now we will create one other decorator: @RequestId

import { PARAM_ARGS_METADATA } from '@nestjs/websockets/constants';

export operate RequestId(): ParameterDecorator {
    return operate(goal, key, index) {
        const args = Replicate.getMetadata(PARAM_ARGS_METADATA, goal.constructor, key) || {};
        const meta = assignCustomParameterMetadata(args, 'RequestId', index, (knowledge, enter: ExecutionContext) => {
            return enter.getArgByIndex<string>(3);
        });
        Replicate.defineMetadata(PARAM_ARGS_METADATA, meta, goal.constructor, key);
    };
}
Enter fullscreen mode

Exit fullscreen mode

That’s precisely the identical decorator with @Ack with solely distinction being return enter.getArgByIndex(3);. Now our handler operate seems like this:

class MyWebSocketGateway {
    @SubscribeMessage('foo')
    fooHandler(@MessageBody() physique, @Ack() ack, @RequestId() requestId){
        console.log('Obtained request with id: ', requestId);
        ack('foo_response');
    }
}
Enter fullscreen mode

Exit fullscreen mode



Subsequent Steps

In case you do not need to take care of all of those, I am at present engaged on an npm bundle that has higher developer ergonomics in comparison with official @nestjs/platform-socket.io bundle. I am conscious that adorning each parameter in handler features may be ugly and tiring to eyes. I consider there are extra stunning options to this downside. I am making an attempt to determine which resolution would end in higher developer expertise.

I want to hear your opinions about this. Whether or not you counsel a utilization instance, otherwise you face different issues with official bundle, please let me know what you assume within the feedback.

Additionally, in the event you do not need to miss out on articles about NestJS and programming typically subscribe to Noop Today. I hope you realized one thing new at present, see you within the upcoming posts!


Right here is the repo that incorporates TraceableIoAdapter with instance code: https://github.com/nooptoday/nestjs-trace-websocket-events

Add a Comment

Your email address will not be published. Required fields are marked *

Want to Contribute to us or want to have 15k+ Audience read your Article ? Or Just want to make a strong Backlink?