-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathindex.js
executable file
·144 lines (123 loc) · 3.86 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
/* global R5 */
module.exports = Rabbitmq;
if (!global.R5) {
global.R5 = {
out: console
};
}
const amqp = require('amqplib');
// Constructors
function Rabbitmq (host, user, pass, vhost = 'development') {
this.host = host;
this.user = user;
this.pass = pass;
this.vhost = vhost;
this.connect_retries = 0;
this.error_timeout = 10000;
this.consumers = [];
}
// Public Methods
Rabbitmq.prototype = {
connect: async function (config) {
this.config = config;
const url = `amqp://${this.user}:${this.pass}@${this.host}/${this.vhost}`;
try {
this.conn = await amqp.connect(url);
}
catch (err) {
if (this.connect_retries++ < 10) {
R5.out.error(`RabbitMQ connecting (retrying [${this.connect_retries}]): ${err.code}`);
await delay(this.error_timeout * this.connect_retries);
return this.connect(config);
}
R5.out.error(`RabbitMQ connecting: ${err.stack}`);
throw err;
}
this.ch = await this.conn.createChannel();
await this.ch.assertExchange(config.exchange_name, 'topic', { durable: false });
R5.out.log(`RabbitMQ connected to ${this.host}:${config.queue_name}`);
for (const consumer of this.consumers) {
await this.bind(consumer, true);
}
this.connect_retries = 0;
const _this = this;
this.conn.on('close', async function (err) {
if (err) {
R5.out.error(`RabbitMQ reconnecting on close`);
return _this.connect(config);
}
});
},
disconnect: async function () {
await this.ch.close();
await this.conn.close();
},
ack: function (msg, message = {}) {
this.ch.ack(msg);
R5.out.log(`RabbitMQ ACKD ${message_summary(message)}`);
},
// eslint-disable-next-line no-unused-vars
bind: async function (callback = async (msg, message) => {}, reconnecting = false) {
await this.ch.assertQueue(this.config.queue_name, { durable: true });
await this.ch.prefetch(1);
R5.out.log(`RabbitMQ waiting for messages from #${this.config.queue_name}..`);
await this.ch.consume(this.config.queue_name, function (msg) {
let message = parse_json(msg.content.toString());
return callback(msg, message);
}, { noAck: false });
if (!reconnecting) {
this.consumers.push(callback);
}
},
get: async function () {
await this.ch.assertQueue(this.config.queue_name, { durable: true });
const msg = await this.ch.get(this.config.queue_name, { noAck: false });
let message;
if (msg) { message = parse_json(msg.content.toString()); }
return { msg, message };
},
send: async function (message) {
let message_string = JSON.stringify(message);
await this.ch.assertQueue(this.config.queue_name, { durable: true });
await this.ch.sendToQueue(this.config.queue_name, Buffer.from(message_string, 'utf8'), {
persistent: true
});
R5.out.log(`RabbitMQ SENT ${message_summary(message)}`);
}
};
// Private Methods
function json_is_valid (json_str) {
try {
return (JSON.parse(json_str) && !!json_str);
}
catch (e) {
return false;
}
}
function parse_json (str) {
let message = null;
if (json_is_valid(str)) {
message = JSON.parse(str);
R5.out.log(`RabbitMQ RECV ${message_summary(message)}`);
}
else {
R5.out.error(`RabbitMQ JSON is invalid: ${str}`);
}
return message;
}
function message_summary (message) {
let summary = '';
if (message.game) {
summary = `${message.game ? `${message.game}:` : ''}${message.category}:`;
}
else if (message.match) {
summary = `${message.match.settings ? `${message.match.settings.game}:` : ''}`;
summary += message.category === 'match' ? `${message.match.id}:` : `${message.category}:`;
}
summary += message.type;
summary += message.user && message.user.name ? `:${message.user.name}` : '';
return summary;
}
function delay (ms) {
return new Promise((res) => setTimeout(res, ms));
}