-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathapp.js
77 lines (56 loc) · 2.12 KB
/
app.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
const path = require("path");
const process = require("process");
const fs = require("fs");
const logger = require("./lib/logger.js");
const nconf = require("nconf");
const StatsReporter = require("./lib/stats.js").StatsReporter;
const MqttClient = require("./lib/mqtt.js").MqttClient;
const InfluxClient = require("./lib/influx.js").InfluxClient;
const MessageParser = require('./lib/parser.js').MessageParser;
const log = logger("app");
// ---
log.info("Application start");
// Configuration --------------------------------------------------------------
nconf
.argv()
.env()
.defaults({config: 'config.ini'});
const configFile = path.resolve(process.cwd(), nconf.get("config"));
// check if config is accessible
fs.accessSync(configFile, fs.R_OK);
log.info("Config file: ", configFile);
nconf.file({ file: configFile, format: nconf.formats.ini });
nconf.required(['mqtt:host', 'influxdb:host']);
if (nconf.get("verbose")) {
logger.level = logger.levels.trace;
} else {
logger.level = logger.levels.info;
}
// Application core -----------------------------------------------------------
const mqttClient = new MqttClient(nconf.get("mqtt"));
const influxClient = new InfluxClient(nconf.get("influxdb"))
const parser = new MessageParser(nconf.get("conversion"));
const stats = new StatsReporter(nconf.get("global"), mqttClient, influxClient);
// subscribe
for (var i in nconf.get("topics")) {
mqttClient.subscribe(i);
}
// on new message
mqttClient.on("message", (id, topic, payload) => {
// convert
var result = parser.parse(id, topic, payload);
// store into influx
influxClient.store(id, topic, result);
});
// send errors to MQTT (if teher is desired topic in config)
var errTopic = nconf.get("global:mqtt_error_topic");
if (errTopic) {
log.info("Error topic: %s", errTopic);
influxClient.on("error", (topic, message) => {
mqttClient.publish(errTopic, "Error on '" + topic + "': " + message);
});
}
// let's get started
stats.start();
mqttClient.connect();
log.debug("Application up and running...");