The MQTT protocol may be very standard in IoT functions. It’s a easy option to join totally different knowledge sources
together with your software by utilizing a publish/subscribe mannequin. Typically chances are you’ll wish to maintain a historical past of your MQTT knowledge to
use
it for mannequin coaching, diagnostics or metrics. In case your knowledge sources present totally different codecs of information that may
not be interpreted as time sequence of floats, Reduct Storage is what you want.
Let’s make a easy MQTT software to see the way it works.
Conditions
For this utilization instance we have now the next necessities:
- Linux AMD64
- Docker and Docker Compose
- NodeJS >= 16
Should you’re an Ubuntu consumer, use this command to put in the dependencies:
$ sudo apt-get replace
$ sudo apt-get set up docker-compose nodejs
Run MQTT Dealer and Reduct Storage with Docker Compose
The best option to run the dealer and the storage is to make use of Docker Compose. So we must always create a docker-compose.yml
file within the instance’s folder with the companies:
model: "3"
companies:
reduct-storage:
picture: ghcr.io/reduct-storage/reduct-storage:newest
volumes:
- ./knowledge:/knowledge
ports:
- "8383:8383"
mqtt-broker:
picture: eclipse-mosquitto:1.6
ports:
- "1883:1883"
Then run the configuration:
docker-compose up
Docker Compose downloaded the pictures and ran the containers. Listen that we revealed ports 1883 for MQTT
protocol and 8383 for Reduct HTTP API.
Write NodeJS script
Now we’re able to make fingers soiled with code. Let’s initialize the NPM package deal and
set up MQTT Client and
JavaScript Client SDK.
$ npm init
$ npm set up --save reduct-js async-mqtt
When we have now all of the dependencies put in, we are able to write the script:
const MQTT = require('async-mqtt');
const {Consumer} = require('reduct-js');
MQTT.connectAsync('tcp://localhost:1883').then(async (mqttClient) => {
await mqttClient.subscribe('mqtt_data');
const reductClient = new Consumer('http://localhost:8383');
const bucket = await reductClient.getOrCreateBucket('mqtt');
mqttClient.on('message', async (subject, msg) => {
const knowledge = msg.toString();
await bucket.write('mqtt_data', knowledge);
console.log('Obtained message "%s" from subject "%s" was written', knowledge,
subject);
});
}).catch(error => console.error(error));
Let us take a look at the code intimately. First, we have now to connect with the MQTT dealer
and subscribe to a subject. The subject identify simply random string, which producers ought to know.
In our case it’s mqtt_data
:
MQTT.connectAsync('tcp://localhost:1883').then(async (mqttClient) => {
await mqttClient.subscribe('mqtt_data');
// remainder of code
}).catch(error => console.error(error));
If the MQTT connection is profitable, we are able to begin coping with Reduct Storage.
To start out writing knowledge there, we’d like a bucket. We create a bucket with the identify mqtt
or
get an present one:
const reductClient = new Consumer('http://localhost:8383');
const bucket = await reductClient.getOrCreateBucket('mqtt');
The final step is to write down the obtained message to the storage. We should use a callback
for occasion message
, to catch it. Then we write the message to entry mqtt_data
:
mqttClient.on('message', async (subject, msg) => {
const knowledge = msg.toString();
await bucket.write('mqtt_data', knowledge);
console.log('Obtained message "%s" from subject "%s" was written', knowledge,
subject);
});
Once we name bucket.write
we create an entry within the bucket if it would not exist but.
Then we write knowledge to the entry with the present timestamp.
Now our MQTT knowledge is protected and sound within the storage, and we are able to entry them by utilizing
the identical SDK.
Publish knowledge to MQTT subject
If you launch the script, it does nothing as a result of there isn’t a knowledge from MQTT. It’s important to publish one thing to subjectmqtt_data
. I choose to make use of mosquitto_pub. For Ubuntu customers, it’s a
a part of the mosquitto-clients
package deal:
$ sudo apt-get set up mosquitto-clients
$ mosuitto_pub -t mqtt_data -m "Hey, world!"
Getting knowledge from Reduct Storage
Now you know the way to get knowledge from MQTT and write it to Reduct Storage, however we’d like slightly NodejS script to learn
the information from the storage:
const {Consumer} = require('reduct-js');
const shopper = new Consumer('http://localhost:8383');
shopper.getBucket('mqtt').then(async (bucket) => {
let knowledge = await bucket.learn('mqtt_data');
console.log('Final file: %s', knowledge);
// Get knowledge for the final hour
const stopTime = BigInt(Date.now() * 1000);
const startTime = stopTime - 3_600_000_000n;
const information = await bucket.listing('mqtt_data', startTime, stopTime);
for (const file of information) {
knowledge = await bucket.learn('mqtt_data', file.timestamp);
console.log('Discovered file "%s" with timestamp "%d"', knowledge, file.timestamp);
}
}).catch(error => console.error(error));
To learn the newest file within the entry may be very simple:
let knowledge = await bucket.learn('mqtt_data');
However to take some random file, you must know its timestamp. A typical use case could be to learn knowledge for some
timeinterval. It is best to use technique Bucket.listing
to get timestamps of information for the interval. Then you’ll be able to
learn them by utilizing Bucket.learn
:
const stopTime = BigInt(Date.now() * 1000);
const startTime = stopTime - 3_600_000_000n;
const information = await bucket.listing('mqtt_data', startTime, stopTime);
for (const file of information) {
knowledge = await bucket.learn('mqtt_data', file.timestamp);
console.log('Discovered file "%s" with timestamp "%d"', knowledge, file.timestamp);
}
Listen, the storage makes use of timestamps with microsecond precision, so we won’t use Date
class and quantity
kind.
What’s why we use BigInt
.
Conclusion
As you’ll be able to see, the MQTT protocol and Reduct Storage quite simple applied sciences that can be utilized collectively very simply in NodeJS.
You’ll find the supply code of the instance here. If in case you have any
questions or issues operating it. Be at liberty to make an issue.
I hope this tutorial has been useful. Thanks!