Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TCP transport #490

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
"@types/ws": "^7.4.0",
"async": "^3.2.1",
"buffer": "^6.0.3",
"net": "^1.0.2",
"node-fetch": "^2.6.1",
"process": "^0.11.10",
"punycode": "^2.1.1",
"react-native-randombytes": "^3.6.0",
"readable-stream": "^2.3.6",
"sdp": "^3.0.2",
"tls": "^0.0.1",
"stanza-shims": "^1.1.2",
"tslib": "^2.2.0",
"ws": "^7.4.4"
Expand Down
4 changes: 3 additions & 1 deletion rollup.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ export default {
'async',
'crypto',
'events',
'net',
'tls',
'node-fetch',
'punycode',
'sdp',
Expand All @@ -18,5 +20,5 @@ export default {
file: 'dist/es/index.module.js',
format: 'es'
},
plugins: [resolve({ browser: true })]
plugins: [resolve({ browser: true, preferBuiltins: true })]
};
14 changes: 12 additions & 2 deletions scripts/build.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import { execSync as Child } from 'child_process';
import { execSync } from 'child_process';
import FS from 'fs';

const Child = (command: string) => {
const ret = execSync(command, { stdio: 'inherit' });
return ret;
}

const Pkg = JSON.parse(FS.readFileSync('package.json').toString());

function fileReplace(fileName: string, placeholder: string, value: string) {
Expand All @@ -19,7 +24,12 @@ fileReplace('dist/es/Constants.js', '__STANZAJS_VERSION__', Pkg.version);

Child('npm run compile:rollup');

Child('mkdir dist/npm');
if (!FS.existsSync('dist')) {
FS.mkdirSync('dist');
}
if (!FS.existsSync('dist/npm')) {
FS.mkdirSync('dist/npm');
}
Child('cp -r dist/cjs/* dist/npm/');
Child('cp dist/es/index.module.js dist/npm/module.js');
Child(`cp ${__dirname}/../*.md dist/npm`);
Expand Down
112 changes: 71 additions & 41 deletions src/Client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { core as corePlugins } from './plugins';
import Protocol, { IQ, Message, Presence, StreamError, Stream } from './protocol';
import BOSH from './transports/bosh';
import WebSocket from './transports/websocket';
import TCP from './transports/tcp';
import { timeoutPromise, uuid } from './Utils';

interface StreamData {
Expand Down Expand Up @@ -102,8 +103,11 @@ export default class Client extends EventEmitter {

this.transports = {
bosh: BOSH,
websocket: WebSocket
websocket: WebSocket,
};
if (typeof window === 'undefined') {
this.transports.tcp = TCP;
}

this.incomingDataQueue = priorityQueue<StreamData>(async (task, done) => {
const { kind, stanza } = task;
Expand Down Expand Up @@ -186,7 +190,7 @@ export default class Client extends EventEmitter {
);
});

this.on('--transport-disconnected', async () => {
const dcHandler = async () => {
const drains: Array<Promise<void>> = [];
if (!this.incomingDataQueue.idle()) {
drains.push(this.incomingDataQueue.drain());
Expand All @@ -213,7 +217,9 @@ export default class Client extends EventEmitter {
}

this.emit('disconnected');
});
};
this.on('--transport-disconnected', dcHandler);
this.on('--transport-error', dcHandler);

this.on('iq', (iq: IQ) => {
const iqType = iq.type;
Expand Down Expand Up @@ -284,10 +290,12 @@ export default class Client extends EventEmitter {
jid: '',
transports: {
bosh: true,
websocket: true
websocket: true,
tcp: true,
},
useStreamManagement: true,
transportPreferenceOrder: ['websocket', 'bosh'],
transportPreferenceOrder: ['tcp', 'websocket', 'bosh'],
requireSecureTransport: true,
...currConfig,
...opts
};
Expand Down Expand Up @@ -349,65 +357,87 @@ export default class Client extends EventEmitter {
}

const transportPref = this.config.transportPreferenceOrder ?? [];
let endpoints: { [key: string]: string[] } | undefined;
const transportEndpoints: Array<[Transport, TransportConfig]> = [];
let endpoints: { [key: string]: string[] } = {};
try {
endpoints = await (this as unknown as Agent).discoverBindings(
this.config.server!
);
} catch (e) {
console.error(e);
}
for (const name of transportPref) {
const settings = this.config.transports![name];
if (!settings) {
if (!settings || !this.transports![name]) {
continue;
}
const transport = new this.transports![name](
this as unknown as Agent,
this.sm,
this.stanzas
);

let config: TransportConfig = {
acceptLanguages: this.config.acceptLanguages || [this.config.lang ?? 'en'],
jid: this.config.jid!,
lang: this.config.lang ?? 'en',
server: this.config.server!
server: this.config.server!,
};
const transport = new this.transports[name](
this as unknown as Agent,
this.sm,
this.stanzas
);

if (typeof settings === 'string') {
config.url = settings;
} else if (settings == true) {
transportEndpoints.push([transport, { ...config, url: settings }]);
} else if (settings === true) {
if (transport.discoverBindings) {
const discovered = await transport.discoverBindings(this.config.server!);
if (!discovered) {
continue;
for (const ep of (await transport.discoverBindings(this.config.server!) ?? [])) {
transportEndpoints.push([transport, { ...config, ...ep }]);
}
config = {
...config,
...discovered
};
} else {
if (!endpoints) {
try {
endpoints = await (this as unknown as Agent).discoverBindings(
this.config.server!
);
} catch (err) {
console.error(err);
continue;
}
}
endpoints[name] = (endpoints[name] || []).filter(
url => url.startsWith('wss:') || url.startsWith('https:')
);
if (!endpoints[name] || !endpoints[name].length) {
continue;
for (const ep of (endpoints[name] ?? [])) {
transportEndpoints.push([transport, { ...config, url: ep }]);
}
config.url = endpoints[name][0];
}
} else if (typeof settings === 'object') {
transportEndpoints.push([transport, { ...config, ...settings }]);
}
}

const secureOptions: Array<[Transport, TransportConfig]> = [];
const insecureOptions: Array<[Transport, TransportConfig]> = [];

// secureOptions + insecureOptions will be sorted as transportEndpoints
// is created in priority order.
for (const [transport, endpoint] of transportEndpoints) {
if (
endpoint.url?.startsWith('https://') ||
endpoint.url?.startsWith('wss://') ||
transport instanceof TCP
) {
secureOptions.push([transport, endpoint]);
} else {
insecureOptions.push([transport, endpoint]);
}
}

const options =
this.config.requireSecureTransport
? secureOptions
: secureOptions.concat(insecureOptions);

for (const [transport, endpoint] of options) {
this.transport = transport;
this.transport.connect(config);
return;
const dcPromise = new Promise(resolve => this.once('disconnected', resolve));
try {
await this.transport.connect(endpoint);
return;
} catch (_) {
await this.disconnect();
this.transport = undefined;
}
await dcPromise;
}

console.error('No endpoints found for the requested transports.');
this.emit('--transport-disconnected');
throw 'No adequate endpoints found for the requested transports.';
}

public async disconnect(): Promise<void> {
Expand Down
33 changes: 29 additions & 4 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ export interface AgentEvents {
// Any "--" prefixed events are for internal use only
'--reset-stream-features': void;
'--transport-disconnected': void;
'--transport-connected': void;
'--transport-error': Error;

'*': (...args: any[]) => void;
}
Expand Down Expand Up @@ -200,7 +202,10 @@ export interface AgentConfig {
*
* If a transport is set to a string, that will be used as the connection URL.
*
* @default { websocket: true, bosh: true }
* If a transport is set to an object, it MUST include a <code>url</code> value for
* the connection URL.
*
* @default { websocket: true, bosh: true, tcp: true }
*/
transports?: { [key: string]: boolean | string | Partial<TransportConfig> };

Expand All @@ -211,10 +216,24 @@ export interface AgentConfig {
*
* If a configured transport type is not listed, it will be skipped.
*
* @default ['websocket', 'bosh']
* @default ['tcp', 'websocket', 'bosh']
*/
transportPreferenceOrder?: string[];

/**
* Require Secure Transport
*
* Guarantees that any transport chosen will use TLS or equivalent. If
* no secure transport is available, connection will fail.
*
* If a transport is specified which is not secure, this option will
* prevent it from being used. If ws:// and https:// is available, but
* websocket is higher priority than BOSH, BOSH will still be used.
*
* @default true
*/
requireSecureTransport?: boolean;

/**
* Account Password
*
Expand Down Expand Up @@ -242,9 +261,9 @@ export interface Transport {
stream?: Stream;
authenticated?: boolean;

discoverBindings?(host: string): Promise<Partial<TransportConfig> | null>;
discoverBindings?(host: string): Promise<Array<Partial<TransportConfig>> | null>;

connect(opts: TransportConfig): void;
connect(opts: TransportConfig): Promise<void>;
disconnect(cleanly?: boolean): void;
restart(): void;
send(name: string, data?: JXT.JSONData): Promise<void>;
Expand All @@ -265,6 +284,11 @@ export interface TransportConfig {
maxRetries?: number;
wait?: number;
maxHoldOpen?: number;

// TCP/TLS settings
directTLS?: boolean,
port?: number,
pubkey?: Buffer,
}

import * as RSM from './helpers/RSM';
Expand All @@ -288,6 +312,7 @@ export {
export const VERSION = Constants.VERSION;

import Plugins from './plugins';
import { TlsOptions } from 'tls';
export * from './plugins';

export function createClient(opts: AgentConfig): Agent {
Expand Down
Loading