Skip to content

Commit

Permalink
tar processing
Browse files Browse the repository at this point in the history
  • Loading branch information
spolu committed Jan 4, 2024
1 parent 4fc797b commit 784ea95
Show file tree
Hide file tree
Showing 4 changed files with 245 additions and 13 deletions.
27 changes: 27 additions & 0 deletions connectors/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions connectors/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"@types/uuid": "^9.0.2",
"axios": "^1.5.1",
"body-parser": "^1.20.2",
"blake3": "^2.1.7",
"dd-trace": "^3.16.0",
"eventsource-parser": "^1.0.0",
"express": "^4.18.2",
Expand Down Expand Up @@ -62,6 +63,7 @@
"@types/fast-levenshtein": "^0.0.2",
"@types/node": "^18.15.5",
"@types/p-queue": "^3.2.1",
"@types/tar": "^6.1.10",
"@typescript-eslint/eslint-plugin": "^5.56.0",
"@typescript-eslint/parser": "^5.56.0",
"eslint": "^8.36.0",
Expand Down
52 changes: 52 additions & 0 deletions connectors/src/admin/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ import { NotionDatabase, NotionPage } from "@connectors/lib/models/notion";
import { SlackConfiguration } from "@connectors/lib/models/slack";
import { nango_client } from "@connectors/lib/nango_client";
import { Result } from "@connectors/lib/result";
import {
cleanUpProcessRepository,
processRepository,
} from "@connectors/connectors/github/lib/github_api";

const { NANGO_SLACK_CONNECTOR_ID } = process.env;

Expand Down Expand Up @@ -99,6 +103,51 @@ const connectors = async (command: string, args: parseArgs.ParsedArgs) => {
}
};

const github = async (command: string, args: parseArgs.ParsedArgs) => {
switch (command) {
case "test-repo": {
if (!args.wId) {
throw new Error("Missing --wId argument");
}
if (!args.dataSourceName) {
throw new Error("Missing --dataSourceName argument");
}
if (!args.owner) {
throw new Error("Missing --owner argument");
}
if (!args.repo) {
throw new Error("Missing --repo argument");
}

const connector = await Connector.findOne({
where: {
type: "github",
workspaceId: args.wId,
dataSourceName: args.dataSourceName,
},
});

if (!connector) {
throw new Error(
`Could not find connector for workspace ${args.wId}, data source ${args.dataSourceName}`
);
}

const installationId = connector.connectionId;
const { tempDir, files } = await processRepository(
installationId,
args.owner,
args.repo,
"999"
);

console.log(files);

await cleanUpProcessRepository(tempDir);
}
}
};

const notion = async (command: string, args: parseArgs.ParsedArgs) => {
switch (command) {
case "restart-all": {
Expand Down Expand Up @@ -568,6 +617,9 @@ const main = async () => {
case "notion":
await notion(command, argv);
return;
case "github":
await github(command, argv);
return;
case "google":
await google(command, argv);
return;
Expand Down
177 changes: 164 additions & 13 deletions connectors/src/connectors/github/lib/github_api.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import { createAppAuth } from "@octokit/auth-app";
import { hash as blake3 } from "blake3";
import { isLeft } from "fp-ts/lib/Either";
import { createWriteStream } from "fs";
import { mkdtemp, readdir, rmdir } from "fs/promises";
import { mkdtemp, readdir, rm } from "fs/promises";
import fs from "fs-extra";
import * as reporter from "io-ts-reporters";
import { Octokit } from "octokit";
import { tmpdir } from "os";
import { join, resolve } from "path";
import { pipeline } from "stream";
import { Readable } from "stream";
import { extract } from "tar";
import { promisify } from "util";

Expand Down Expand Up @@ -534,48 +536,197 @@ async function getOctokit(installationId: string): Promise<Octokit> {
});
}

// Repository processing

const asyncPipeline = promisify(pipeline);

export async function downloadRepository(
const EXTENSION_WHITELIST = [
".js",
".ts",
".tsx",
".jsx",
".rb",
".py",
".rs",
".go",
".swift",
".css",
".html",
".less",
".sass",
".scss",
".php",
".java",
".yaml",
".yml",
".md",
];

const FILENAME_WHITELIST = [
"README",
"Dockerfile",
"package.json",
"Cargo.toml",
];

const DIRECTORY_BLACKLIST = [
"node_modules",
"vendor",
"dist",
"build",
"coverage",
"pkg",
"bundle",
"built",
"eggs",
"downloads",
"env",
"venv",
"tmp",
"temp",
"debug",
"target",
];

async function* getFiles(dir: string): AsyncGenerator<string> {
const dirents = await readdir(dir, { withFileTypes: true });
for (const dirent of dirents) {
const res = resolve(dir, dirent.name);
if (dirent.isDirectory()) {
// blacklist
if (DIRECTORY_BLACKLIST.includes(dirent.name)) {
continue;
}
yield* getFiles(res);
} else {
yield res;
}
}
}

export async function processRepository(
installationId: string,
login: string,
repoName: string
repoName: string,
repoId: string
) {
const octokit = await getOctokit(installationId);

const { data: tarballStream } = await octokit.request(
"GET /repos/{owner}/{repo}/tarball",
const { data } = await octokit.rest.repos.get({
owner: login,
repo: repoName,
});
const defaultBranch = data.default_branch;

let { data: tarballStream } = await octokit.request(
"GET /repos/{owner}/{repo}/tarball/{ref}",
{
owner: login,
repo: repoName,
ref: defaultBranch,
}
);

// Create a temp directory.
const tempDir = await mkdtemp(join(tmpdir(), "repo-"));
const tarPath = resolve(tempDir, "repo.tar.gz");

// Convert ArrayBuffer to stream if necessary
if (tarballStream instanceof ArrayBuffer) {
// Wrap ArrayBuffer with a stream
const stream = new Readable();
stream.push(Buffer.from(tarballStream));
stream.push(null); // Signal the end of the stream
tarballStream = stream;
}

// Save the tarball to the temp directory.
await asyncPipeline(tarballStream, createWriteStream(tarPath));
console.log("Downloaded: ", tarPath);
await asyncPipeline(tarballStream as Readable, createWriteStream(tarPath));

// Extract the tarball.
await extract({
file: tarPath,
cwd: tempDir,
});
console.log("Extracted: ", tarPath);

// Delete the tarball.
await fs.unlink(tarPath);

const files: {
fileName: string;
filePath: string[];
sourceUrl: string;
sizeBytes: number;
documentId: string;
parentInternalId: string | null;
parents: string[];
localFilePath: string;
}[] = [];

// Iterate over the files in the temp directory.
const files = await readdir(tempDir);
for (const file of files) {
console.log("FILE: ", file);
for await (const file of getFiles(tempDir)) {
console.log(file);
// get file extension
const ext = file.split(".").pop();
// get file size
const { size } = await fs.stat(file);

const isWithelisted =
EXTENSION_WHITELIST.includes(`.${ext}`) ||
FILENAME_WHITELIST.includes(file);

const isUnderLimite = size < 1024 * 1024;

if (isWithelisted && isUnderLimite) {
const path = file
.substring(tempDir.length + 1)
.split("/")
.slice(1, -1);

const pathInternalIds = [];
for (let i = 0; i < path.length; i++) {
const p = `github-code-${repoId}-dir-${path.slice(0, i + 1).join("/")}`;
pathInternalIds.push(
`github-code-${repoId}-dir-${blake3(p)
.toString("hex")
.substring(0, 16)}`
);
}

const documentId = `github-code-${repoId}-file-${blake3(
`github-code-${repoId}-file-${path.join("/")}/${file}`
)
.toString("hex")
.substring(0, 16)}`;

const fileName = file.split("/").pop() || "";
const parentInternalId =
pathInternalIds.length === 0
? null
: (pathInternalIds[pathInternalIds.length - 1] as string);

files.push({
fileName,
filePath: path,
sourceUrl: `https://github.com/${login}/${repoName}/blob/${defaultBranch}/${path.join(
"/"
)}/${fileName}`,
sizeBytes: size,
documentId,
parentInternalId,
parents: pathInternalIds,
localFilePath: file,
});
}
}

return {
tempDir,
files,
};
}

export async function cleanUpProcessRepository(tempDir: string) {
// Delete the temp directory.
await rmdir(tempDir, { recursive: true });
console.log("Cleaned up: ", tempDir);
await rm(tempDir, { recursive: true });
}

0 comments on commit 784ea95

Please sign in to comment.