Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
busma13 committed Jan 7, 2025
1 parent d59b53e commit f5044fb
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 62 deletions.
39 changes: 9 additions & 30 deletions packages/teraslice/src/lib/cluster/services/api.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Router, Express } from 'express';
import bodyParser from 'body-parser';
import { pipeline as streamPipeline } from 'node:stream/promises';
import got, { OptionsInit } from 'got';
import { RecoveryCleanupType, TerasliceConfig } from '@terascope/job-components';
import {
parseErrorInfo, parseList, logError,
Expand All @@ -18,7 +19,6 @@ import {
createJobActiveQuery, addDeletedToQuery
} from '../../utils/api_utils.js';
import { getPackageJSON } from '../../utils/file_utils.js';
import got, { OptionsInit } from 'got';

const terasliceVersion = getPackageJSON().version;

Expand Down Expand Up @@ -136,32 +136,11 @@ export class ApiService {
throw error;
}

private queryToSearchParams(query: any) {
const searchParams: Record<string, string | number | boolean> = {};

for (const [key, value] of Object.entries(query)) {
if (typeof value === 'string' || typeof value === 'number' || typeof value === 'boolean') {
searchParams[key] = value;
} else if (Array.isArray(value)) {
searchParams[key] = value.join(',');
} else if (value === null || value === undefined) {
// Skip undefined or null values
} else {
// stringify objects
searchParams[key] = JSON.stringify(value);
}
}

return searchParams;
}

private async _redirect(req: TerasliceRequest, res: TerasliceResponse) {
const searchParams = this.queryToSearchParams(req.query);

const options: OptionsInit & { isStream: true } = {
private async _assetRedirect(req: TerasliceRequest, res: TerasliceResponse) {
const options: OptionsInit & { isStream: true } = {
prefixUrl: this.assetsUrl,
headers: req.headers,
searchParams,
searchParams: req.query as Record<string, any>,
throwHttpErrors: false,
timeout: { request: this.terasliceConfig.api_response_timeout },
decompress: false,
Expand Down Expand Up @@ -223,7 +202,7 @@ export class ApiService {
this.jobsStorage = jobsStorage;

const v1routes = Router();
const redirect = this._redirect.bind(this);
const assetRedirect = this._assetRedirect.bind(this);

this.app.use(bodyParser.json({
type(req) {
Expand Down Expand Up @@ -271,17 +250,17 @@ export class ApiService {

v1routes.route('/assets*')
.delete((req, res) => {
redirect(req as TerasliceRequest, res);
assetRedirect(req as TerasliceRequest, res);
})
.post((req, res) => {
if (req.headers['content-type'] === 'application/json' || req.headers['content-type'] === 'application/x-www-form-urlencoded') {
sendError(res, 400, '/asset endpoints do not accept json');
return;
}
redirect(req as TerasliceRequest, res);
assetRedirect(req as TerasliceRequest, res);
})
// @ts-expect-error
.get(redirect);
.get(assetRedirect);

v1routes.post('/jobs', (req, res) => {
// if no job was posted an empty object is returned, so we check if it has values
Expand Down Expand Up @@ -534,7 +513,7 @@ export class ApiService {

this.app.route('/txt/assets*')
// @ts-expect-error
.get(redirect);
.get(assetRedirect);

this.app.get('/txt/workers', (req, res) => {
const { size, from } = getSearchOptions(req as TerasliceRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ export const routing = Object.freeze({
type HookFN = () => void | null;

type ListenOptions = {
server?: string | number | HttpServer | HttpsServer;
server?: HttpServer | HttpsServer;
port?: string | number;
query?: { node_id: string };
};

Expand Down Expand Up @@ -317,20 +318,41 @@ export class Messaging {
});
}

private _createIOServer(server?: HttpServer | HttpsServer, port?: string | number) {
const opts = {
path: '/native-clustering',
pingTimeout: this.configTimeout,
pingInterval: this.configTimeout + this.networkLatencyBuffer,
perMessageDeflate: false,
serveClient: false,
}
if (server) {
this.io = socketIOServer(server, opts);
} else if (port) {
this.io = socketIOServer(port, opts);
}
this._attachRoomsSocketIO();

this.io.on('connection', (socket: any) => {
this.logger.debug('a connection to cluster_master has been made');
this._registerFns(socket);
});
}

listen(options: ListenOptions = {}) {
const { query, server } = options;
const { query, server, port } = options;
this.messsagingOnline = true;

if (this.config.clients.networkClient) {
// node_master, worker
this.io = socketIOClient(this.hostURL, {
forceNew: true,
path: '/native-clustering',
query
});

this._registerFns(this.io);

if (this.self === 'node_master') {
this.io.on('networkMessage', (networkMsg: any) => {
const { message } = networkMsg;
Expand All @@ -343,34 +365,14 @@ export class Messaging {
}
});
}

this.logger.debug('client network connection is online');
} else if (server) {
if (typeof server === 'string' || typeof server === 'number') {
// test server
this.io = socketIOServer(server, {
path: '/native-clustering',
pingTimeout: this.configTimeout,
pingInterval: this.configTimeout + this.networkLatencyBuffer,
perMessageDeflate: false,
serveClient: false,
});
} else {
// cluster_master
this.io = socketIOServer(server, {
path: '/native-clustering',
pingTimeout: this.configTimeout,
pingInterval: this.configTimeout + this.networkLatencyBuffer,
perMessageDeflate: false,
serveClient: false,
});
}
this._attachRoomsSocketIO();

this.io.on('connection', (socket: any) => {
this.logger.debug('a connection to cluster_master has been made');
this._registerFns(socket);
});
// cluster_master
this._createIOServer(server);
} else if (port) {
// test server
this._createIOServer(undefined, port);
}

// TODO: message queuing will be used until formal process lifecycles are implemented
Expand Down
2 changes: 1 addition & 1 deletion packages/teraslice/test/services/messaging-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ describe('messaging module', () => {
const messaging2 = new Messaging(testContext2, logger);

expect(() => messaging1.listen()).not.toThrow();
expect(() => messaging2.listen({ server: 45645 })).not.toThrow();
expect(() => messaging2.listen({ port: 45645 })).not.toThrow();

await messaging1.shutdown();
await messaging2.shutdown();
Expand Down

0 comments on commit f5044fb

Please sign in to comment.