Skip to content

Commit

Permalink
fix: querystream into filestream on file export (#15)
Browse files Browse the repository at this point in the history
* build: 3.0.5
  • Loading branch information
raprav authored Mar 2, 2021
1 parent 44d7c86 commit ea07f0e
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 2,421 deletions.
152 changes: 70 additions & 82 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const Excel = require('exceljs');
const csv = require('fast-csv');
const fs = require('fs');
const fsp = require('fs').promises;
const JsonStreamStringify = require('json-stream-stringify');
const path = require('path');

const Executor = require('@runnerty/module-core').Executor;
Expand Down Expand Up @@ -63,8 +64,9 @@ class mysqlExecutor extends Executor {
// ****************
if (params.xlsxFileExport) {
await fsp.access(path.dirname(params.xlsxFileExport));
const fileStreamWriter = fs.createWriteStream(params.xlsxFileExport);
const options = {
filename: params.xlsxFileExport,
stream: fileStreamWriter,
useStyles: true,
useSharedStrings: true
};
Expand All @@ -90,103 +92,88 @@ class mysqlExecutor extends Executor {
queryStream.on('end', async _ => {
await workbook.commit();
this.prepareEndOptions(firstRow, rowCounter, resultSetHeader, results);
this._end(this.endOptions);
this._end();
connection.end();
});
queryStream.on('error', err => {
this.endOptions.end = 'error';
this.endOptions.messageLog = `executeMysql: ${err}`;
this.endOptions.err_output = `executeMysql: ${err}`;
this._end(this.endOptions);
connection.end();
this._error(`executeMysql: ${err}`);
});
}
// CSV FILE EXPORT
// ***************
else if (params.csvFileExport) {
await fsp.access(path.dirname(params.csvFileExport));
const fileStreamWriter = fs.createWriteStream(params.csvFileExport);

const paramsCSV = params.csvOptions || {};
if (!paramsCSV.hasOwnProperty('headers')) paramsCSV.headers = true;
const csvStream = csv
.format(paramsCSV)
.on('error', err => {
this.endOptions.end = 'error';
this.endOptions.messageLog = `executeMysql: ${err}`;
this.endOptions.err_output = `executeMysql: ${err}`;
this._end(this.endOptions);
connection.end();
this._error(`executeMysql - csvStream: ${err}`);
})
.on('end', () => {
fileStreamWriter.end();
this.prepareEndOptions(firstRow, rowCounter, resultSetHeader, results);
this._end(this.endOptions);
this._end();
connection.end();
});

csvStream.pipe(fileStreamWriter);

queryStream.on('result', row => {
if (isFirstRow) {
firstRow = row;
if (row.constructor.name === 'ResultSetHeader') {
resultSetHeader = row;
}
isFirstRow = false;
}

rowCounter++;
csvStream.write(row);
});
queryStream.on('end', _ => {
csvStream.end();
connection.end();
});
queryStream.on('error', err => {
this.endOptions.end = 'error';
this.endOptions.messageLog = `executeMysql: ${err}`;
this.endOptions.err_output = `executeMysql: ${err}`;
this._end(this.endOptions);
connection.end();
});
// STREAMED
queryStream
.on('result', row => {
if (isFirstRow) {
firstRow = row;
if (row.constructor.name === 'ResultSetHeader') {
resultSetHeader = row;
}
isFirstRow = false;
} else rowCounter++;
csvStream.write(row);
})
.on('error', err => {
connection.end();
this._error(`executeMysql: ${err}`);
})
.stream()
.pipe(csvStream)
.pipe(fileStreamWriter);
}
// TEXT FILE EXPORT JSON
// *********************
else if (params.fileExport) {
await fsp.access(path.dirname(params.fileExport));
const fileStreamWriter = fs.createWriteStream(params.fileExport);

queryStream.on('result', row => {
if (isFirstRow) {
firstRow = row;
if (row.constructor.name === 'ResultSetHeader') {
resultSetHeader = row;
}
isFirstRow = false;
fileStreamWriter.write('[\n');
fileStreamWriter.write(JSON.stringify(row));
} else {
fileStreamWriter.write(',\n' + JSON.stringify(row));
}
rowCounter++;
});

queryStream.on('end', () => {
if (rowCounter) fileStreamWriter.write('\n]');

fileStreamWriter.end();
this.prepareEndOptions(firstRow, rowCounter, resultSetHeader, results);
this._end(this.endOptions);
connection.end();
});
fileStreamWriter
.on('finish', () => {
this._end();
})
.on('error', err => {
this._error(`ERROR executeMysql - fileStreamWriter ${err}`);
});

queryStream.on('error', err => {
this.endOptions.end = 'error';
this.endOptions.messageLog = `executeMysql: ${err}`;
this.endOptions.err_output = `executeMysql: ${err}`;
this._end(this.endOptions);
connection.end();
});
// STREAMED
new JsonStreamStringify(
queryStream
.on('result', row => {
if (isFirstRow) {
firstRow = row;
if (row.constructor.name === 'ResultSetHeader') {
resultSetHeader = row;
}
isFirstRow = false;
}
rowCounter++;
})
.on('error', err => {
connection.end();
this._error(`ERROR executeMysql - queryStream ${err}`);
})
.stream()
).pipe(fileStreamWriter);
}
// NO FILE EXPORT - DATA_OUTPUT
// ****************************
Expand All @@ -203,24 +190,18 @@ class mysqlExecutor extends Executor {
results.push(row);
});
queryStream.on('end', _ => {
this.prepareEndOptions(firstRow, rowCounter, resultSetHeader, results);
this._end(this.endOptions);
connection.end();
this.prepareEndOptions(firstRow, rowCounter, resultSetHeader, results);
this._end();
});
queryStream.on('error', err => {
this.endOptions.end = 'error';
this.endOptions.messageLog = `executeMysql: ${err}`;
this.endOptions.err_output = `executeMysql: ${err}`;
this._end(this.endOptions);
connection.end();
this._error(`executeMysql: ${err}`);
});
}
} catch (err) {
this.endOptions.end = 'error';
this.endOptions.messageLog = `executeMysql: ${err}`;
this.endOptions.err_output = `executeMysql: ${err}`;
this._end(this.endOptions);
connection.end();
this._error(`executeMysql: ${err}`);
}
}

Expand Down Expand Up @@ -280,7 +261,7 @@ class mysqlExecutor extends Executor {
this.endOptions.end = 'error';
this.endOptions.messageLog = 'executeMysql dont have command or command_file';
this.endOptions.err_output = 'executeMysql dont have command or command_file';
this._end(this.endOptions);
this._end();
}
}
const query = await this.prepareQuery(params);
Expand All @@ -296,7 +277,7 @@ class mysqlExecutor extends Executor {
this.endOptions.end = 'error';
this.endOptions.messageLog = `executeMysql reading ssl file/s: ${error}`;
this.endOptions.err_output = `executeMysql reading ssl file/s: ${error}`;
this._end(this.endOptions);
this._end();
}
}

Expand All @@ -320,7 +301,7 @@ class mysqlExecutor extends Executor {
this.endOptions.end = 'error';
this.endOptions.messageLog = `executeMysql: ${err}`;
this.endOptions.err_output = `executeMysql: ${err}`;
this._end(this.endOptions);
this._end();
});

const result = await this.executeQuery(connection, query, params);
Expand All @@ -329,12 +310,19 @@ class mysqlExecutor extends Executor {
this.endOptions.end = 'error';
this.endOptions.messageLog = `executeMysql: ${err}`;
this.endOptions.err_output = `executeMysql: ${err}`;
this._end(this.endOptions);
this._end();
}
}

_end(endOptions) {
if (!this.ended) this.end(endOptions);
_error(errMsg) {
this.endOptions.end = 'error';
this.endOptions.messageLog = errMsg;
this.endOptions.err_output = errMsg;
this._end();
}

_end() {
if (!this.ended) this.end(this.endOptions);
this.ended = true;
}
}
Expand Down
Loading

0 comments on commit ea07f0e

Please sign in to comment.