Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: limit concurrent file reads per hard drive #785

Merged
merged 13 commits into from
Oct 28, 2023
16 changes: 8 additions & 8 deletions src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ export default class Constants {
static readonly GLOBAL_TEMP_DIR = GLOBAL_TEMP_DIR;

/**
* A sane max of filesystem threads for operations such as:
* A reasonable max of filesystem threads for operations such as:
* @example
* Promise.all([].map(async (file) => fs.lstat(file));
*/
Expand All @@ -73,20 +73,20 @@ export default class Constants {
static readonly DAT_DEFAULT_THREADS = 3;

/**
* Max number of archive entries to process (possibly extract & MD5/SHA1 checksum) at once.
* A reasonable max number of files to write at once.
*/
static readonly ARCHIVE_ENTRY_SCANNER_THREADS_PER_ARCHIVE = 5;
static readonly FILE_READER_DEFAULT_THREADS = 10;

/**
* A sane max number of files to write at once.
* Max number of archive entries to process (possibly extract & MD5/SHA1 checksum) at once.
*/
static readonly FILE_READER_DEFAULT_THREADS = 20;
static readonly ARCHIVE_ENTRY_SCANNER_THREADS_PER_ARCHIVE = 5;

/**
* A sane max number of ROM release candidates to write at once. This will be the limiting factor
* for consoles with many small ROMs.
* A reasonable max number of ROM release candidates to write at once. This will be the limiting
* factor for consoles with many small ROMs.
*/
static readonly ROM_WRITER_DEFAULT_THREADS = 20;
static readonly ROM_WRITER_DEFAULT_THREADS = 10;

/**
* Max number of files to recycle/delete at once.
Expand Down
100 changes: 100 additions & 0 deletions src/driveSemaphore.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import path from 'node:path';

import async, { AsyncResultCallback } from 'async';
import { Mutex, Semaphore } from 'async-mutex';

import Constants from './constants.js';
import FsPoly from './polyfill/fsPoly.js';
import File from './types/files/file.js';

/**
* Wrapper for an `async-mutex` {@link Semaphore} that limits how many files can be processed at
* once per hard drive.
*/
export default class DriveSemaphore {
private readonly keySemaphores = new Map<string, Semaphore>();

private readonly keySemaphoresMutex = new Mutex();

private readonly threads: number;

private readonly threadsSemaphore: Semaphore;

constructor(threads = 1) {
this.threads = threads;
this.threadsSemaphore = new Semaphore(threads);
}

/**
* Run some {@link runnable} for every value in {@link files}.
*/
async map<K extends File | string, V>(
files: K[],
runnable: (file: K) => (V | Promise<V>),
): Promise<V[]> {
const disks = await FsPoly.disks();

// Limit the number of ongoing threads to something reasonable
return async.mapLimit(
files,
Constants.MAX_FS_THREADS,
async (file, callback: AsyncResultCallback<V, Error>) => {
try {
const val = await this.processFile(file, runnable, disks);
callback(undefined, val);
} catch (error) {
if (error instanceof Error) {
callback(error);
} else if (typeof error === 'string') {
callback(new Error(error));
} else {
callback(new Error('failed to execute runnable'));
}
}
},
);
}

private async processFile<K extends File | string, V>(
file: K,
runnable: (file: K) => (V | Promise<V>),
disks: string[],
): Promise<V> {
const filePath = file instanceof File ? file.getFilePath() : file as string;
const filePathNormalized = filePath.replace(/[\\/]/g, path.sep);
const filePathResolved = path.resolve(filePathNormalized);

// Try to get the path of the drive this file is on
let filePathDisk = disks.find((disk) => filePathResolved.startsWith(disk)) ?? '';

if (!filePathDisk) {
// If a drive couldn't be found, try to parse a samba server name
const sambaMatches = filePathNormalized.match(/^([\\/]{2}[^\\/]+)/);
if (sambaMatches !== null) {
[, filePathDisk] = sambaMatches;
}
}

const keySemaphore = await this.keySemaphoresMutex.runExclusive(async () => {
if (!this.keySemaphores.has(filePathDisk)) {
let { threads } = this;
if (await FsPoly.isSamba(filePathDisk)) {
// Forcefully limit the number of files to be processed concurrently from a single
// Samba network share
threads = 1;
}
this.keySemaphores.set(filePathDisk, new Semaphore(threads));
}
return this.keySemaphores.get(filePathDisk) as Semaphore;
});

// First, limit the number of threads per drive, which will better balance the processing of
// files on different drives vs. processing files sequentially
return keySemaphore.runExclusive(
// Second, limit the overall number of threads
async () => this.threadsSemaphore.runExclusive(
async () => runnable(file),
),
);
}
}
2 changes: 1 addition & 1 deletion src/modules/argumentsParser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ export default class ArgumentsParser {
})
.option('reader-threads', {
group: groupHelpDebug,
description: 'Maximum number of ROMs to read in parallel',
description: 'Maximum number of ROMs to read in parallel per disk',
type: 'number',
coerce: (val: number) => Math.max(val, 1),
requiresArg: true,
Expand Down
9 changes: 4 additions & 5 deletions src/modules/datScanner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ import * as child_process from 'node:child_process';
import path from 'node:path';

import { parse } from '@fast-csv/parse';
import async, { AsyncResultCallback } from 'async';
import xml2js from 'xml2js';

import ProgressBar, { ProgressBarSymbol } from '../console/progressBar.js';
import DriveSemaphore from '../driveSemaphore.js';
import ArrayPoly from '../polyfill/arrayPoly.js';
import bufferPoly from '../polyfill/bufferPoly.js';
import fsPoly from '../polyfill/fsPoly.js';
Expand Down Expand Up @@ -101,10 +101,9 @@ export default class DATScanner extends Scanner {
this.progressBar.logDebug(`parsing ${datFiles.length.toLocaleString()} DAT file${datFiles.length !== 1 ? 's' : ''}`);
await this.progressBar.setSymbol(ProgressBarSymbol.PARSING_CONTENTS);

const results = (await async.mapLimit(
const results = (await new DriveSemaphore(this.options.getReaderThreads()).map(
datFiles,
this.options.getReaderThreads(),
async (datFile: File, callback: AsyncResultCallback<DAT | undefined, Error>) => {
async (datFile) => {
await this.progressBar.incrementProgress();
const waitingMessage = `${datFile.toString()} ...`;
this.progressBar.addWaitingMessage(waitingMessage);
Expand All @@ -118,7 +117,7 @@ export default class DATScanner extends Scanner {

await this.progressBar.incrementDone();
this.progressBar.removeWaitingMessage(waitingMessage);
return callback(undefined, dat);
return dat;
},
)).filter(ArrayPoly.filterNotNullish);

Expand Down
13 changes: 5 additions & 8 deletions src/modules/patchScanner.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import async, { AsyncResultCallback } from 'async';

import ProgressBar, { ProgressBarSymbol } from '../console/progressBar.js';
import DriveSemaphore from '../driveSemaphore.js';
import ArrayPoly from '../polyfill/arrayPoly.js';
import File from '../types/files/file.js';
import Options from '../types/options.js';
Expand Down Expand Up @@ -38,20 +37,18 @@ export default class PatchScanner extends Scanner {
this.options.getReaderThreads(),
);

const patches = (await async.mapLimit(
const patches = (await new DriveSemaphore(this.options.getReaderThreads()).map(
files,
this.options.getReaderThreads(),
async (file, callback: AsyncResultCallback<Patch | undefined, Error>) => {
async (file) => {
await this.progressBar.incrementProgress();
const waitingMessage = `${file.toString()} ...`;
this.progressBar.addWaitingMessage(waitingMessage);

try {
const patch = await this.patchFromFile(file);
callback(undefined, patch);
return await this.patchFromFile(file);
} catch (error) {
this.progressBar.logWarn(`${file.toString()}: failed to parse patch: ${error}`);
callback(undefined, undefined);
return undefined;
} finally {
await this.progressBar.incrementDone();
this.progressBar.removeWaitingMessage(waitingMessage);
Expand Down
10 changes: 4 additions & 6 deletions src/modules/romHeaderProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import async, { AsyncResultCallback } from 'async';

import ProgressBar, { ProgressBarSymbol } from '../console/progressBar.js';
import DriveSemaphore from '../driveSemaphore.js';
import ArchiveEntry from '../types/files/archives/archiveEntry.js';
import File from '../types/files/file.js';
import ROMHeader from '../types/files/romHeader.js';
Expand Down Expand Up @@ -34,10 +33,9 @@ export default class ROMHeaderProcessor extends Module {
await this.progressBar.setSymbol(ProgressBarSymbol.HASHING);
await this.progressBar.reset(inputRomFiles.length);

const parsedFiles = await async.mapLimit(
const parsedFiles = await new DriveSemaphore(this.options.getReaderThreads()).map(
inputRomFiles,
this.options.getReaderThreads(),
async (inputFile, callback: AsyncResultCallback<File, Error>) => {
async (inputFile) => {
await this.progressBar.incrementProgress();
const waitingMessage = `${inputFile.toString()} ...`;
this.progressBar.addWaitingMessage(waitingMessage);
Expand All @@ -53,7 +51,7 @@ export default class ROMHeaderProcessor extends Module {
this.progressBar.removeWaitingMessage(waitingMessage);
await this.progressBar.incrementDone();

return callback(undefined, fileWithHeader);
return fileWithHeader;
},
);

Expand Down
10 changes: 4 additions & 6 deletions src/modules/scanner.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import async, { AsyncResultCallback } from 'async';

import ProgressBar from '../console/progressBar.js';
import Constants from '../constants.js';
import DriveSemaphore from '../driveSemaphore.js';
import ElasticSemaphore from '../elasticSemaphore.js';
import fsPoly from '../polyfill/fsPoly.js';
import File from '../types/files/file.js';
Expand Down Expand Up @@ -31,10 +30,9 @@ export default abstract class Scanner extends Module {
threads: number,
filterUnique = true,
): Promise<File[]> {
const foundFiles = (await async.mapLimit(
const foundFiles = (await new DriveSemaphore(threads).map(
filePaths,
threads,
async (inputFile, callback: AsyncResultCallback<File[], Error>) => {
async (inputFile) => {
await this.progressBar.incrementProgress();
const waitingMessage = `${inputFile} ...`;
this.progressBar.addWaitingMessage(waitingMessage);
Expand All @@ -43,7 +41,7 @@ export default abstract class Scanner extends Module {

this.progressBar.removeWaitingMessage(waitingMessage);
await this.progressBar.incrementDone();
callback(undefined, files);
return files;
},
))
.flat();
Expand Down
30 changes: 30 additions & 0 deletions src/polyfill/fsPoly.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import crypto from 'node:crypto';
import fs, { MakeDirectoryOptions, PathLike, RmOptions } from 'node:fs';
import os from 'node:os';
import path from 'node:path';
import util from 'node:util';

Expand Down Expand Up @@ -57,6 +58,15 @@ export default class FsPoly {
}
}

static async disks(): Promise<string[]> {
const disks = await nodeDiskInfo.getDiskInfo();
return disks
.filter((drive) => drive.available > 0)
.map((drive) => drive.mounted)
// Sort by mount points with the deepest number of subdirectories first
.sort((a, b) => b.split(/[\\/]/).length - a.split(/[\\/]/).length);
}

/**
* There is no promise version of existsSync()
*/
Expand All @@ -81,6 +91,26 @@ export default class FsPoly {
}
}

static async isSamba(filePath: string): Promise<boolean> {
const normalizedPath = filePath.replace(/[\\/]/g, path.sep);
if (normalizedPath.startsWith(`${path.sep}${path.sep}`) && normalizedPath !== os.devNull) {
return true;
}

const resolvedPath = path.resolve(normalizedPath);
const drives = await nodeDiskInfo.getDiskInfo();
const filePathDrive = drives
// Sort by mount points with the deepest number of subdirectories first
.sort((a, b) => b.mounted.split(/[\\/]/).length - a.mounted.split(/[\\/]/).length)
.find((drive) => resolvedPath.startsWith(drive.mounted));

if (!filePathDrive) {
// Assume 'false' by default
return false;
}
return filePathDrive.filesystem.replace(/[\\/]/g, path.sep).startsWith(`${path.sep}${path.sep}`);
}

static async isSymlink(pathLike: PathLike): Promise<boolean> {
try {
return (await util.promisify(fs.lstat)(pathLike)).isSymbolicLink();
Expand Down
8 changes: 4 additions & 4 deletions test/modules/argumentsParser.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ describe('options', () => {
expect(options.getPreferParent()).toEqual(false);

expect(options.getDatThreads()).toEqual(3);
expect(options.getReaderThreads()).toEqual(20);
expect(options.getWriterThreads()).toEqual(20);
expect(options.getReaderThreads()).toEqual(10);
expect(options.getWriterThreads()).toEqual(10);
expect(options.getLogLevel()).toEqual(LogLevel.WARN);
expect(options.getHelp()).toEqual(false);
});
Expand Down Expand Up @@ -898,15 +898,15 @@ describe('options', () => {
});

it('should parse "reader-threads"', () => {
expect(argumentsParser.parse(dummyCommandAndRequiredArgs).getWriterThreads()).toEqual(20);
expect(argumentsParser.parse(dummyCommandAndRequiredArgs).getWriterThreads()).toEqual(10);
expect(argumentsParser.parse([...dummyCommandAndRequiredArgs, '--reader-threads', '-1']).getReaderThreads()).toEqual(1);
expect(argumentsParser.parse([...dummyCommandAndRequiredArgs, '--reader-threads', '0']).getReaderThreads()).toEqual(1);
expect(argumentsParser.parse([...dummyCommandAndRequiredArgs, '--reader-threads', '1']).getReaderThreads()).toEqual(1);
expect(argumentsParser.parse([...dummyCommandAndRequiredArgs, '--reader-threads', '2']).getReaderThreads()).toEqual(2);
});

it('should parse "writer-threads"', () => {
expect(argumentsParser.parse(dummyCommandAndRequiredArgs).getWriterThreads()).toEqual(20);
expect(argumentsParser.parse(dummyCommandAndRequiredArgs).getWriterThreads()).toEqual(10);
expect(argumentsParser.parse([...dummyCommandAndRequiredArgs, '--writer-threads', '-1']).getWriterThreads()).toEqual(1);
expect(argumentsParser.parse([...dummyCommandAndRequiredArgs, '--writer-threads', '0']).getWriterThreads()).toEqual(1);
expect(argumentsParser.parse([...dummyCommandAndRequiredArgs, '--writer-threads', '1']).getWriterThreads()).toEqual(1);
Expand Down
19 changes: 19 additions & 0 deletions test/polyfill/fsPoly.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os from 'node:os';
import path from 'node:path';

import Constants from '../../src/constants.js';
Expand Down Expand Up @@ -28,6 +29,24 @@ describe('isDirectory', () => {
});
});

describe('isSamba', () => {
test.each([
'.',
os.devNull,
'test',
path.resolve('test'),
])('should return false: %s', async (filePath) => {
await expect(fsPoly.isSamba(filePath)).resolves.toEqual(false);
});

test.each([
'//foo/bar',
'\\\\foo\\bar',
])('should return true: %s', async (filePath) => {
await expect(fsPoly.isSamba(filePath)).resolves.toEqual(true);
});
});

describe('isSymlink', () => {
it('should return true for a symlink', async () => {
const tempFile = await fsPoly.mktemp(path.join(Constants.GLOBAL_TEMP_DIR, 'temp'));
Expand Down