Skip to content

Commit

Permalink
feat: add process driver for spawn processes
Browse files Browse the repository at this point in the history
  • Loading branch information
piotr-oles committed Feb 28, 2021
1 parent 90e6bfc commit 08b91ff
Show file tree
Hide file tree
Showing 3 changed files with 238 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ export * from "./sandbox";
export * from "./logger";
export * from "./async";
export * from "./package";
export * from "./process-driver";
133 changes: 133 additions & 0 deletions src/listener.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
interface Listener<TValue = void> {
resolve(value: TValue): void;
reject(error: unknown): void;
}

type ListenerStatus = "pending" | "resolved" | "rejected";

interface QueuedListener<TValue = void> extends Listener<TValue> {
apply<TListener extends Listener<TValue>>(listener: TListener): void;
resolved: TValue | undefined;
rejected: unknown | undefined;
status: ListenerStatus;
}

interface BufferedListener<TChunk = void, TValue = void>
extends QueuedListener<TValue> {
apply<TListener extends Listener<TValue>>(
listener: TListener,
ingest?: (chunk: TChunk, listener: Listener<TValue>) => TChunk | undefined
): void;
ingest(chunk: TChunk): void;
buffer: TChunk[];
}

function createBufferedListener<TChunk = void, TValue = void>(
buffer: TChunk[] = []
): BufferedListener<TChunk, TValue> {
let resolve: (value: TValue) => void | undefined;
let reject: (error: unknown) => void | undefined;

const bufferedListener: BufferedListener<TChunk, TValue> = {
resolve(value) {
if (bufferedListener.status === "pending") {
bufferedListener.resolved = value;
bufferedListener.status = "resolved";

if (resolve) {
resolve(value);
}
}
},
reject(error) {
if (bufferedListener.status === "pending") {
bufferedListener.rejected = error;
bufferedListener.status = "rejected";

if (reject) {
reject(error);
}
}
},
ingest(chunk: TChunk) {
bufferedListener.buffer.push(chunk);
},
apply(listener, ingest) {
switch (bufferedListener.status) {
case "resolved":
listener.resolve(bufferedListener.resolved!);
break;
case "rejected":
listener.reject(bufferedListener.rejected);
break;
case "pending":
resolve = listener.resolve;
reject = listener.reject;
break;
}

if (ingest) {
const ingestListener: Listener<TValue> = {
resolve(value) {
if (bufferedListener.status === "pending") {
bufferedListener.resolved = value;
bufferedListener.status = "resolved";
}
},
reject(error) {
if (bufferedListener.status === "pending") {
bufferedListener.rejected = error;
bufferedListener.status = "rejected";
}
},
};

const applyStatusChange = () => {
switch (bufferedListener.status) {
case "resolved":
listener.resolve(bufferedListener.resolved!);
break;
case "rejected":
listener.reject(bufferedListener.rejected);
break;
}
};

bufferedListener.ingest = (chunk: TChunk) => {
if (bufferedListener.status === "pending") {
const ingestedChunk = ingest(chunk, ingestListener);

if (ingestedChunk) {
bufferedListener.buffer.push(ingestedChunk);
}

applyStatusChange();
} else {
bufferedListener.buffer.push(chunk);
}
};

// process buffer
bufferedListener.buffer = bufferedListener.buffer
.map((chunk) => {
if (bufferedListener.status === "pending") {
return ingest(chunk, ingestListener);
} else {
return chunk;
}
})
.filter((chunk) => chunk !== undefined) as TChunk[];

applyStatusChange();
}
},
resolved: undefined,
rejected: undefined,
status: "pending",
buffer: [...buffer],
};

return bufferedListener;
}

export { Listener, QueuedListener, BufferedListener, createBufferedListener };
104 changes: 104 additions & 0 deletions src/process-driver.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import { ChildProcess } from "child_process";
import { BufferedListener, createBufferedListener } from "./listener";
import stripAnsi from "strip-ansi";

interface ProcessDriver {
process: ChildProcess;
waitForStdoutIncludes(
pattern: string | string[],
timeout?: number
): Promise<void>;
waitForStderrIncludes(
pattern: string | string[],
timeout?: number
): Promise<void>;
}

type ProcessOutput = "stdout" | "stderr";

function createProcessDriver(
process: ChildProcess,
defaultTimeout = 30000
): ProcessDriver {
const listeners: Record<ProcessOutput, BufferedListener<string>> = {
stdout: createBufferedListener(),
stderr: createBufferedListener(),
};

if (process.stdout) {
process.stdout.on("data", (data) => {
const content = stripAnsi(data.toString());
listeners.stdout.ingest(content);
});
}

if (process.stderr) {
process.stderr.on("data", (data) => {
const content = stripAnsi(data.toString());
listeners.stderr.ingest(content);
});
}

const waitForOutputIncludes = (
output: ProcessOutput,
pattern: string | string[],
timeout: number
) =>
new Promise<void>((resolve, reject) => {
const timeoutId = setTimeout(() => {
reject(
new Error(
`Exceeded time on waiting for "${pattern}" to appear in the ${output}.`
)
);
}, timeout);

listeners[output].apply(
{
resolve: () => {
clearTimeout(timeoutId);
listeners[output] = createBufferedListener(
listeners[output].buffer
);
resolve();
},
reject: (error) => {
clearTimeout(timeoutId);
listeners[output] = createBufferedListener(
listeners[output].buffer
);
reject(error);
},
string: pattern,
},
(chunk, listener) => {
let index = -1;
const strings = Array.isArray(pattern) ? pattern : [pattern];

strings.forEach((string) => {
const stringIndex = chunk.indexOf(string);

index = Math.max(
index,
stringIndex === -1 ? -1 : stringIndex + string.length
);
});

if (index !== -1) {
listener.resolve();
return chunk.slice(index);
}
}
);
});

return {
process,
waitForStdoutIncludes: (string, timeout = defaultTimeout) =>
waitForOutputIncludes("stdout", string, timeout),
waitForStderrIncludes: (string, timeout = defaultTimeout) =>
waitForOutputIncludes("stderr", string, timeout),
};
}

export { createProcessDriver, ProcessDriver };

0 comments on commit 08b91ff

Please sign in to comment.