Skip to content

Commit

Permalink
Merge pull request #7 from piotr-oles/feature/process-sandbox
Browse files Browse the repository at this point in the history
feat: process driver
  • Loading branch information
piotr-oles authored Feb 28, 2021
2 parents ff20589 + 08b91ff commit 69cee28
Show file tree
Hide file tree
Showing 6 changed files with 356 additions and 78 deletions.
25 changes: 13 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,22 @@ Example:
import {
createSandbox,
Sandbox,
packLocalPackage,
externalPackage,
Package
packLocalPackage
} from "karton";
import path from 'path';

describe('my-package', () => {
let sandbox: Sandbox;
let myPackage: Package;

beforeAll(async () => {
myPackage = await packLocalPackage(
path.resolve(__dirname, '../../')
);
sandbox = await createSandbox();
sandbox = await createSandbox({
lockDirectory: path.resolve(__dirname, '__locks__'),
fixedDependencies: {
'my-package': `file:${await packLocalPackage(
path.resolve(__dirname, '../../')
)}`
}
});
});
afterEach(async () => {
await sandbox.reset();
Expand All @@ -51,11 +52,11 @@ describe('my-package', () => {
})

it.each([
externalPackage('webpack', '^4.0.0'),
externalPackage('webpack', '^5.0.0')
])('works with webpack %p', async (webpack) => {
[{ 'webpack': '^4.0.0' }],
[{ 'webpack': '^5.0.0' }]
])('works with %p', async (dependencies) => {
await sandbox.load(path.join(__dirname, 'fixtures/basic'));
await sandbox.install('yarn', [myPackage, webpack]);
await sandbox.install('yarn', dependencies);
const result = await sandbox.exec('node src/test.js');

expect(result).toEqual('my-package awesome output');
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ export * from "./sandbox";
export * from "./logger";
export * from "./async";
export * from "./package";
export * from "./process-driver";
133 changes: 133 additions & 0 deletions src/listener.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
interface Listener<TValue = void> {
resolve(value: TValue): void;
reject(error: unknown): void;
}

type ListenerStatus = "pending" | "resolved" | "rejected";

interface QueuedListener<TValue = void> extends Listener<TValue> {
apply<TListener extends Listener<TValue>>(listener: TListener): void;
resolved: TValue | undefined;
rejected: unknown | undefined;
status: ListenerStatus;
}

interface BufferedListener<TChunk = void, TValue = void>
extends QueuedListener<TValue> {
apply<TListener extends Listener<TValue>>(
listener: TListener,
ingest?: (chunk: TChunk, listener: Listener<TValue>) => TChunk | undefined
): void;
ingest(chunk: TChunk): void;
buffer: TChunk[];
}

function createBufferedListener<TChunk = void, TValue = void>(
buffer: TChunk[] = []
): BufferedListener<TChunk, TValue> {
let resolve: (value: TValue) => void | undefined;
let reject: (error: unknown) => void | undefined;

const bufferedListener: BufferedListener<TChunk, TValue> = {
resolve(value) {
if (bufferedListener.status === "pending") {
bufferedListener.resolved = value;
bufferedListener.status = "resolved";

if (resolve) {
resolve(value);
}
}
},
reject(error) {
if (bufferedListener.status === "pending") {
bufferedListener.rejected = error;
bufferedListener.status = "rejected";

if (reject) {
reject(error);
}
}
},
ingest(chunk: TChunk) {
bufferedListener.buffer.push(chunk);
},
apply(listener, ingest) {
switch (bufferedListener.status) {
case "resolved":
listener.resolve(bufferedListener.resolved!);
break;
case "rejected":
listener.reject(bufferedListener.rejected);
break;
case "pending":
resolve = listener.resolve;
reject = listener.reject;
break;
}

if (ingest) {
const ingestListener: Listener<TValue> = {
resolve(value) {
if (bufferedListener.status === "pending") {
bufferedListener.resolved = value;
bufferedListener.status = "resolved";
}
},
reject(error) {
if (bufferedListener.status === "pending") {
bufferedListener.rejected = error;
bufferedListener.status = "rejected";
}
},
};

const applyStatusChange = () => {
switch (bufferedListener.status) {
case "resolved":
listener.resolve(bufferedListener.resolved!);
break;
case "rejected":
listener.reject(bufferedListener.rejected);
break;
}
};

bufferedListener.ingest = (chunk: TChunk) => {
if (bufferedListener.status === "pending") {
const ingestedChunk = ingest(chunk, ingestListener);

if (ingestedChunk) {
bufferedListener.buffer.push(ingestedChunk);
}

applyStatusChange();
} else {
bufferedListener.buffer.push(chunk);
}
};

// process buffer
bufferedListener.buffer = bufferedListener.buffer
.map((chunk) => {
if (bufferedListener.status === "pending") {
return ingest(chunk, ingestListener);
} else {
return chunk;
}
})
.filter((chunk) => chunk !== undefined) as TChunk[];

applyStatusChange();
}
},
resolved: undefined,
rejected: undefined,
status: "pending",
buffer: [...buffer],
};

return bufferedListener;
}

export { Listener, QueuedListener, BufferedListener, createBufferedListener };
30 changes: 4 additions & 26 deletions src/package.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,7 @@ import path from "path";
import fs from "fs-extra";
import os from "os";

interface Package {
name: string;
version: string;
immutable?: boolean;
}

function externalPackage(name: string, version: string): Package {
const pkg = { name, version, immutable: true };
Object.assign(pkg, {
toString: () => `${pkg.name}@${pkg.version}`,
});
return pkg;
}

async function packLocalPackage(directory: string): Promise<Package> {
async function packLocalPackage(directory: string): Promise<string> {
const packageJSONPath = path.resolve(directory, "package.json");
if (!(await fs.pathExists(packageJSONPath))) {
throw new Error(
Expand All @@ -30,7 +16,7 @@ async function packLocalPackage(directory: string): Promise<Package> {
);
const npmPacked = path.resolve(directory, `${name}-${version}.tgz`);

return new Promise<Package>((resolve, reject) => {
return new Promise<string>((resolve, reject) => {
const childProcess = exec(
"npm pack",
{
Expand All @@ -52,15 +38,7 @@ async function packLocalPackage(directory: string): Promise<Package> {

await fs.move(npmPacked, packagePath);

const pkg = {
name,
version: `file:${packagePath}`,
immutable: false,
};
Object.assign(pkg, {
toString: () => `${name}@${version}`,
});
resolve(pkg);
resolve(packagePath);
} else {
reject(
new Error(`Cannot find 'npm pack' output file: ${npmPacked}`)
Expand All @@ -79,4 +57,4 @@ async function packLocalPackage(directory: string): Promise<Package> {
});
}

export { externalPackage, packLocalPackage, Package };
export { packLocalPackage };
104 changes: 104 additions & 0 deletions src/process-driver.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import { ChildProcess } from "child_process";
import { BufferedListener, createBufferedListener } from "./listener";
import stripAnsi from "strip-ansi";

interface ProcessDriver {
process: ChildProcess;
waitForStdoutIncludes(
pattern: string | string[],
timeout?: number
): Promise<void>;
waitForStderrIncludes(
pattern: string | string[],
timeout?: number
): Promise<void>;
}

type ProcessOutput = "stdout" | "stderr";

function createProcessDriver(
process: ChildProcess,
defaultTimeout = 30000
): ProcessDriver {
const listeners: Record<ProcessOutput, BufferedListener<string>> = {
stdout: createBufferedListener(),
stderr: createBufferedListener(),
};

if (process.stdout) {
process.stdout.on("data", (data) => {
const content = stripAnsi(data.toString());
listeners.stdout.ingest(content);
});
}

if (process.stderr) {
process.stderr.on("data", (data) => {
const content = stripAnsi(data.toString());
listeners.stderr.ingest(content);
});
}

const waitForOutputIncludes = (
output: ProcessOutput,
pattern: string | string[],
timeout: number
) =>
new Promise<void>((resolve, reject) => {
const timeoutId = setTimeout(() => {
reject(
new Error(
`Exceeded time on waiting for "${pattern}" to appear in the ${output}.`
)
);
}, timeout);

listeners[output].apply(
{
resolve: () => {
clearTimeout(timeoutId);
listeners[output] = createBufferedListener(
listeners[output].buffer
);
resolve();
},
reject: (error) => {
clearTimeout(timeoutId);
listeners[output] = createBufferedListener(
listeners[output].buffer
);
reject(error);
},
string: pattern,
},
(chunk, listener) => {
let index = -1;
const strings = Array.isArray(pattern) ? pattern : [pattern];

strings.forEach((string) => {
const stringIndex = chunk.indexOf(string);

index = Math.max(
index,
stringIndex === -1 ? -1 : stringIndex + string.length
);
});

if (index !== -1) {
listener.resolve();
return chunk.slice(index);
}
}
);
});

return {
process,
waitForStdoutIncludes: (string, timeout = defaultTimeout) =>
waitForOutputIncludes("stdout", string, timeout),
waitForStderrIncludes: (string, timeout = defaultTimeout) =>
waitForOutputIncludes("stderr", string, timeout),
};
}

export { createProcessDriver, ProcessDriver };
Loading

0 comments on commit 69cee28

Please sign in to comment.