This Banner is For Sale !!
Get your ad here for a week in 20$ only and get upto 15k traffic Daily!!!

How to keep a history of MQTT messages with Node.js


The MQTT protocol may be very standard in IoT functions. It’s a easy option to join totally different knowledge sources
together with your software by utilizing a publish/subscribe mannequin. Typically chances are you’ll wish to maintain a historical past of your MQTT knowledge to
use
it for mannequin coaching, diagnostics or metrics. In case your knowledge sources present totally different codecs of information that may
not be interpreted as time sequence of floats, Reduct Storage is what you want.

Let’s make a easy MQTT software to see the way it works.



Conditions

For this utilization instance we have now the next necessities:

  • Linux AMD64
  • Docker and Docker Compose
  • NodeJS >= 16

Should you’re an Ubuntu consumer, use this command to put in the dependencies:

$ sudo apt-get replace
$ sudo apt-get set up docker-compose nodejs
Enter fullscreen mode

Exit fullscreen mode



Run MQTT Dealer and Reduct Storage with Docker Compose

The best option to run the dealer and the storage is to make use of Docker Compose. So we must always create a docker-compose.yml
file within the instance’s folder with the companies:

model: "3"
companies:
  reduct-storage:
    picture: ghcr.io/reduct-storage/reduct-storage:newest
    volumes:
      - ./knowledge:/knowledge
    ports:
      - "8383:8383"

  mqtt-broker:
    picture: eclipse-mosquitto:1.6
    ports:
      - "1883:1883"
Enter fullscreen mode

Exit fullscreen mode

Then run the configuration:

docker-compose up
Enter fullscreen mode

Exit fullscreen mode

Docker Compose downloaded the pictures and ran the containers. Listen that we revealed ports 1883 for MQTT
protocol and 8383 for Reduct HTTP API.



Write NodeJS script

Now we’re able to make fingers soiled with code. Let’s initialize the NPM package deal and
set up MQTT Client and
JavaScript Client SDK.

$ npm init
$ npm set up --save reduct-js async-mqtt 
Enter fullscreen mode

Exit fullscreen mode

When we have now all of the dependencies put in, we are able to write the script:

const MQTT = require('async-mqtt');
const {Consumer} = require('reduct-js');

MQTT.connectAsync('tcp://localhost:1883').then(async (mqttClient) => {
  await mqttClient.subscribe('mqtt_data');

  const reductClient = new Consumer('http://localhost:8383');
  const bucket = await reductClient.getOrCreateBucket('mqtt');

  mqttClient.on('message', async (subject, msg) => {
    const knowledge = msg.toString();
    await bucket.write('mqtt_data', knowledge);
    console.log('Obtained message "%s" from subject "%s" was written', knowledge,
        subject);
  });

}).catch(error => console.error(error));
Enter fullscreen mode

Exit fullscreen mode

Let us take a look at the code intimately. First, we have now to connect with the MQTT dealer
and subscribe to a subject. The subject identify simply random string, which producers ought to know.
In our case it’s mqtt_data:


MQTT.connectAsync('tcp://localhost:1883').then(async (mqttClient) => {
  await mqttClient.subscribe('mqtt_data');

  // remainder of code
}).catch(error => console.error(error));
Enter fullscreen mode

Exit fullscreen mode

If the MQTT connection is profitable, we are able to begin coping with Reduct Storage.
To start out writing knowledge there, we’d like a bucket. We create a bucket with the identify mqtt or
get an present one:

const reductClient = new Consumer('http://localhost:8383');
const bucket = await reductClient.getOrCreateBucket('mqtt');
Enter fullscreen mode

Exit fullscreen mode

The final step is to write down the obtained message to the storage. We should use a callback
for occasion message, to catch it. Then we write the message to entry mqtt_data:

mqttClient.on('message', async (subject, msg) => {
  const knowledge = msg.toString();
  await bucket.write('mqtt_data', knowledge);
  console.log('Obtained message "%s" from subject "%s" was written', knowledge,
      subject);
});
Enter fullscreen mode

Exit fullscreen mode

Once we name bucket.write we create an entry within the bucket if it would not exist but.
Then we write knowledge to the entry with the present timestamp.
Now our MQTT knowledge is protected and sound within the storage, and we are able to entry them by utilizing
the identical SDK.



Publish knowledge to MQTT subject

If you launch the script, it does nothing as a result of there isn’t a knowledge from MQTT. It’s important to publish one thing to subject
mqtt_data. I choose to make use of mosquitto_pub. For Ubuntu customers, it’s a
a part of the mosquitto-clients package deal:

$ sudo apt-get set up mosquitto-clients
$ mosuitto_pub -t mqtt_data -m "Hey, world!"
Enter fullscreen mode

Exit fullscreen mode



Getting knowledge from Reduct Storage

Now you know the way to get knowledge from MQTT and write it to Reduct Storage, however we’d like slightly NodejS script to learn
the information from the storage:

const {Consumer} = require('reduct-js');

const shopper = new Consumer('http://localhost:8383');

shopper.getBucket('mqtt').then(async (bucket) => {
  let knowledge = await bucket.learn('mqtt_data');
  console.log('Final file: %s', knowledge);

  // Get knowledge for the final hour
  const stopTime = BigInt(Date.now() * 1000);
  const startTime = stopTime - 3_600_000_000n;

  const information = await bucket.listing('mqtt_data', startTime, stopTime);
  for (const file of information) {
    knowledge = await bucket.learn('mqtt_data', file.timestamp);
    console.log('Discovered file "%s" with timestamp "%d"', knowledge, file.timestamp);
  }

}).catch(error => console.error(error));

Enter fullscreen mode

Exit fullscreen mode

To learn the newest file within the entry may be very simple:

let knowledge = await bucket.learn('mqtt_data');
Enter fullscreen mode

Exit fullscreen mode

However to take some random file, you must know its timestamp. A typical use case could be to learn knowledge for some
timeinterval. It is best to use technique Bucket.listing to get timestamps of information for the interval. Then you’ll be able to
learn them by utilizing Bucket.learn:

const stopTime = BigInt(Date.now() * 1000);
const startTime = stopTime - 3_600_000_000n;

const information = await bucket.listing('mqtt_data', startTime, stopTime);
for (const file of information) {
  knowledge = await bucket.learn('mqtt_data', file.timestamp);
  console.log('Discovered file "%s" with timestamp "%d"', knowledge, file.timestamp);
}
Enter fullscreen mode

Exit fullscreen mode

Listen, the storage makes use of timestamps with microsecond precision, so we won’t use Date class and quantity kind.
What’s why we use BigInt.



Conclusion

As you’ll be able to see, the MQTT protocol and Reduct Storage quite simple applied sciences that can be utilized collectively very simply in NodeJS.
You’ll find the supply code of the instance here. If in case you have any
questions or issues operating it. Be at liberty to make an issue.

I hope this tutorial has been useful. Thanks!



Hyperlinks

The Article was Inspired from tech community site.
Contact us if this is inspired from your article and we will give you credit for it for serving the community.

This Banner is For Sale !!
Get your ad here for a week in 20$ only and get upto 10k Tech related traffic daily !!!

Leave a Reply

Your email address will not be published. Required fields are marked *

Want to Contribute to us or want to have 15k+ Audience read your Article ? Or Just want to make a strong Backlink?