From b3dcf3f4dc5be0397ec89f957a73bf50b24b4ed3 Mon Sep 17 00:00:00 2001 From: Terry Wang Date: Fri, 9 Aug 2024 17:47:02 +0000 Subject: [PATCH 1/5] fix block comment for parser and install tools for pg 13 --- Dockerfile | 12 ++++-- dump-parser/src/utils.rs | 81 +++++++++++++++++++++++++++++++++++----- 2 files changed, 80 insertions(+), 13 deletions(-) diff --git a/Dockerfile b/Dockerfile index 615061a4..e1d5a0ff 100644 --- a/Dockerfile +++ b/Dockerfile @@ -42,10 +42,14 @@ FROM debian:buster-slim LABEL org.opencontainers.image.source https://github.com/qovery/replibyte # Install Postgres and MySQL binaries -RUN apt-get clean && apt-get update && apt-get install -y \ - wget \ - postgresql-client \ - default-mysql-client +RUN apt-get clean && apt-get update +RUN apt-get install -y wget gnupg2 lsb-release +RUN wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | apt-key add - + +RUN echo "deb http://apt.postgresql.org/pub/repos/apt/ `lsb_release -cs`-pgdg main" | tee /etc/apt/sources.list.d/pgdg.list + +RUN apt-get update +RUN apt-get install -y postgresql-client-13 # Install MongoDB tools RUN wget https://fastdl.mongodb.org/tools/db/mongodb-database-tools-debian92-x86_64-100.5.2.deb && \ diff --git a/dump-parser/src/utils.rs b/dump-parser/src/utils.rs index c5bdc505..4f73d288 100644 --- a/dump-parser/src/utils.rs +++ b/dump-parser/src/utils.rs @@ -164,6 +164,7 @@ fn list_statements(query: &str) -> Vec { let mut is_statement_complete = true; let mut is_comment_line = false; + let mut is_block_comment_line = false; let mut is_partial_comment_line = false; let mut start_index = 0usize; let mut previous_chars_are_whitespaces = true; @@ -186,7 +187,7 @@ fn list_statements(query: &str) -> Vec { is_comment_line = false; previous_chars_are_whitespaces = true; } - b'\'' if !is_comment_line && !is_partial_comment_line => { + b'\'' if !is_comment_line && !is_partial_comment_line && !is_block_comment_line => { if stack.get(0) == Some(&b'\'') { if (query.len() > next_idx) && &query[next_idx..next_idx] == "'" { // do nothing because the ' char is escaped via a double '' @@ -204,6 +205,7 @@ fn list_statements(query: &str) -> Vec { } b'(' if !is_comment_line && !is_partial_comment_line + && !is_block_comment_line && stack.get(0) != Some(&b'\'') => { stack.insert(0, byte_char); @@ -211,7 +213,7 @@ fn list_statements(query: &str) -> Vec { is_comment_line = false; previous_chars_are_whitespaces = false; } - b')' if !is_comment_line && !is_partial_comment_line => { + b')' if !is_comment_line && !is_partial_comment_line && !is_block_comment_line => { if stack.get(0) == Some(&b'(') { let _ = stack.remove(0); } else if stack.get(0) != Some(&b'\'') { @@ -240,12 +242,49 @@ fn list_statements(query: &str) -> Vec { is_partial_comment_line = true; previous_chars_are_whitespaces = false; } - b'\n' if !is_comment_line && !is_partial_comment_line && is_statement_complete => { + b'/' if !is_block_comment_line + && previous_chars_are_whitespaces + && is_statement_complete + && next_idx < query_bytes.len() && query_bytes[next_idx] == b'*' => + { + // comment + is_block_comment_line = true; + previous_chars_are_whitespaces = false; + } + b'/' if !is_statement_complete + && next_idx < query_bytes.len() && query_bytes[next_idx] == b'*' + && stack.get(0) != Some(&b'\'') => + { + // comment + is_block_comment_line = true; + previous_chars_are_whitespaces = false; + } + + b'*' if is_block_comment_line + && previous_chars_are_whitespaces + && is_statement_complete + && next_idx < query_bytes.len() && query_bytes[next_idx] == b'/' => + { + // comment + is_block_comment_line = false; + previous_chars_are_whitespaces = false; + } + b'*' if !is_statement_complete + && next_idx < query_bytes.len() && query_bytes[next_idx] == b'/' + && stack.get(0) != Some(&b'\'') => + { + // comment + is_block_comment_line = false; + previous_chars_are_whitespaces = false; + } + + b'\n' if !is_comment_line && !is_partial_comment_line && !is_block_comment_line && is_statement_complete => { previous_chars_are_whitespaces = true; sql_statements.push(Statement::NewLine); } b';' if !is_comment_line && !is_partial_comment_line + && !is_block_comment_line && stack.get(0) != Some(&b'\'') => { // end of query @@ -310,18 +349,42 @@ mod tests { #[test] fn check_list_sql_queries_from_dump_reader() { - let r = r#"INSERT INTO public.Users(uuid, "text", name) VALUES ('a84ac0c6-2348-45c0-b86c-8d34e251a859', 'Lorem ipsum dolor sit amet, consectetur adipiscing elit. Cras eu nisi tempor, viverra turpis sit amet, sodales augue. Vivamus sit amet erat urna. Morbi porta, quam nec consequat suscipit, ante diam tempus risus, et consequat erat odio sed magna. Maecenas dignissim quam nibh, nec congue magna convallis a. - -Etiam augue augue, bibendum et molestie non, finibus non nulla. Etiam quis rhoncus leo, eu congue erat. Cras id magna ac dui convallis ultricies. Donec sed elit ac urna condimentum auctor. Nunc nec nulla id dui feugiat dictum sit amet nec orci. Vestibulum ante ipsum primis in faucibus orci luctus et ultrices posuere cubilia curae. - - -', 'some-name');"#.as_bytes(); + let r = r##"CREATE OR REPLACE FUNCTION check_mandatory_attributes_52( + listerp TEXT, + listtype TEXT, + attributes HSTORE + ) + RETURNS BOOLEAN AS $$ + SELECT CASE + WHEN listerp = 'xero' AND listtype = 'GeneralLedger' + THEN public.defined(attributes, 'AccountID'::text) AND public.defined(attributes, 'Name'::text) + /** + * Don't treat APAccount and Tax as mandatory because we can't currently differentiate between manually + * created list items and sync'ed list items and these two lists often have some manual entries. + */ + WHEN listtype IN ('APAccount', 'Tax') + THEN TRUE + ELSE ( + SELECT COUNT(*) = 0 + FROM ( + SELECT unnest(public.get_unique_attribute_keys_52(listerp, listtype)) AS key_to_check + ) AS unique_keys + WHERE NOT public.defined(attributes, key_to_check::text) + ) + END + $$ + +INSERT INTO public.exportformat (id, orgunit_id, rootou_id, name, ftpintegrated, formattemplate, filenametemplate, active, filetype, type) VALUES ('8755D03A3EAA4DE98B39370238A56829', '7EE1E394E6B64B19AFB4B126A518521A', '36CA9CB0BBB44932980AFF87ED8547BC', 'CSV', true, ' + + +', NULL, true, 'CSV', 'invoice');"##.as_bytes(); let reader = BufReader::new(r); let mut queries = vec![]; list_sql_queries_from_dump_reader(reader, |query| { queries.push(query.to_string()); + println!("list_sql_queries_from_dump_reader {}, ",query.to_string()); ListQueryResult::Continue }); From a975ac210eac51ae05d04b3ba4e2d54bd8935e0c Mon Sep 17 00:00:00 2001 From: Terry Wang Date: Wed, 21 Aug 2024 21:23:49 +0000 Subject: [PATCH 2/5] adding superset config --- Cargo.toml | 2 +- Dockerfile | 2 + replibyte/Cargo.toml | 1 + replibyte/src/commands/dump.rs | 1 + replibyte/src/config.rs | 19 +- replibyte/src/main.rs | 2 +- replibyte/src/source/mod.rs | 3 +- replibyte/src/source/mongodb.rs | 7 + replibyte/src/source/mongodb_stdin.rs | 4 + replibyte/src/source/mysql.rs | 2 + replibyte/src/source/postgres.rs | 77 +- superset/Cargo.toml | 11 + superset/README.md | 3 + superset/src/dedup.rs | 63 ++ superset/src/lib.rs | 123 ++++ superset/src/postgres.rs | 976 ++++++++++++++++++++++++++ superset/src/utils.rs | 8 + 17 files changed, 1290 insertions(+), 14 deletions(-) create mode 100644 superset/Cargo.toml create mode 100644 superset/README.md create mode 100644 superset/src/dedup.rs create mode 100644 superset/src/lib.rs create mode 100644 superset/src/postgres.rs create mode 100644 superset/src/utils.rs diff --git a/Cargo.toml b/Cargo.toml index 265fb56d..fb0fea90 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,3 +1,3 @@ [workspace] exclude = ["db/*", "assets/*"] -members = ["dump-parser", "replibyte", "subset"] +members = ["dump-parser", "replibyte", "subset", "superset"] diff --git a/Dockerfile b/Dockerfile index e1d5a0ff..af6652f4 100644 --- a/Dockerfile +++ b/Dockerfile @@ -17,6 +17,7 @@ COPY ./dump-parser ./dump-parser # subset COPY ./subset ./subset +COPY ./superset ./superset # replibyte COPY ./replibyte/Cargo.toml ./replibyte/Cargo.toml @@ -30,6 +31,7 @@ RUN rm src/*.rs COPY ./replibyte/src ./replibyte/src COPY ./dump-parser/src ./dump-parser/src COPY ./subset/src ./subset/src +COPY ./superset/src ./superset/src # build for release RUN rm ./target/release/deps/replibyte* diff --git a/replibyte/Cargo.toml b/replibyte/Cargo.toml index bd904657..fbf67372 100644 --- a/replibyte/Cargo.toml +++ b/replibyte/Cargo.toml @@ -9,6 +9,7 @@ authors = ["Qovery Team", "Fab", "Benny", "Contributos"] [dependencies] dump-parser = { path = "../dump-parser" } subset = { path = "../subset" } +superset = { path = "../superset" } rand = "0.8.5" anyhow = "1.0.56" serde_yaml = "0.8" diff --git a/replibyte/src/commands/dump.rs b/replibyte/src/commands/dump.rs index 22eb765a..cdda80d6 100644 --- a/replibyte/src/commands/dump.rs +++ b/replibyte/src/commands/dump.rs @@ -131,6 +131,7 @@ where transformers: &transformers, skip_config: &skip_config, database_subset: &source.database_subset, + database_superset: &source.database_superset, only_tables: &only_tables_config, }; diff --git a/replibyte/src/config.rs b/replibyte/src/config.rs index b6d5e2e6..3c3e89b6 100644 --- a/replibyte/src/config.rs +++ b/replibyte/src/config.rs @@ -203,6 +203,7 @@ pub struct SourceConfig { pub transformers: Option>, pub skip: Option>, pub database_subset: Option, + pub database_superset: Option, pub only_tables: Option>, } @@ -241,11 +242,21 @@ pub struct DatabaseSubsetConfig { pub database: String, pub table: String, #[serde(flatten)] - pub strategy: DatabaseSubsetConfigStrategy, + pub strategy: DatabaseScaleConfigStrategy, // copy the entire table - not affected by the subset algorithm pub passthrough_tables: Option>, } +#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] +pub struct DatabaseSupersetConfig { + pub database: String, + pub table: String, + #[serde(flatten)] + pub strategy: DatabaseScaleConfigStrategy, + // copy the entire table - not affected by the superset algorithm + pub passthrough_tables: Option>, +} + #[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] pub struct OnlyTablesConfig { pub database: String, @@ -255,12 +266,12 @@ pub struct OnlyTablesConfig { #[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] #[serde(rename_all = "kebab-case")] #[serde(tag = "strategy_name", content = "strategy_options")] -pub enum DatabaseSubsetConfigStrategy { - Random(DatabaseSubsetConfigStrategyRandom), +pub enum DatabaseScaleConfigStrategy { + Random(DatabaseScaleConfigStrategyRandom), } #[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Copy)] -pub struct DatabaseSubsetConfigStrategyRandom { +pub struct DatabaseScaleConfigStrategyRandom { pub percent: u8, } diff --git a/replibyte/src/main.rs b/replibyte/src/main.rs index 24d7b017..68f7faae 100644 --- a/replibyte/src/main.rs +++ b/replibyte/src/main.rs @@ -14,7 +14,7 @@ use migration::{migrations, Migrator}; use utils::get_replibyte_version; use crate::cli::{DumpCommand, RestoreCommand, SubCommand, TransformerCommand, CLI, SourceCommand}; -use crate::config::{Config, DatabaseSubsetConfig, DatastoreConfig}; +use crate::config::{Config, DatabaseSubsetConfig, DatabaseSupersetConfig, DatastoreConfig}; use crate::datastore::local_disk::LocalDisk; use crate::datastore::s3::S3; use crate::datastore::Datastore; diff --git a/replibyte/src/source/mod.rs b/replibyte/src/source/mod.rs index 694e3447..44836362 100644 --- a/replibyte/src/source/mod.rs +++ b/replibyte/src/source/mod.rs @@ -1,6 +1,6 @@ use std::io::Error; -use crate::config::{DatabaseSubsetConfig, OnlyTablesConfig, SkipConfig}; +use crate::config::{DatabaseSubsetConfig, DatabaseSupersetConfig, OnlyTablesConfig, SkipConfig}; use crate::connector::Connector; use crate::transformer::Transformer; use crate::types::{OriginalQuery, Query}; @@ -27,6 +27,7 @@ pub trait Source: Connector { pub struct SourceOptions<'a> { pub transformers: &'a Vec>, pub skip_config: &'a Vec, + pub database_superset: &'a Option, pub database_subset: &'a Option, pub only_tables: &'a Vec, } diff --git a/replibyte/src/source/mongodb.rs b/replibyte/src/source/mongodb.rs index 5857febd..e94ba94c 100644 --- a/replibyte/src/source/mongodb.rs +++ b/replibyte/src/source/mongodb.rs @@ -73,6 +73,10 @@ impl<'a> Source for MongoDB<'a> { todo!("database subset not supported yet for MongoDB source") } + if let Some(_database_superset) = &options.database_superset { + todo!("database superset not supported yet for MongoDB source") + } + let dump_args = vec![ "--uri", self.uri, @@ -367,6 +371,7 @@ mod tests { transformers: &transformers, skip_config: &vec![], database_subset: &None, + database_superset: &None, only_tables: &vec![], }; @@ -379,6 +384,7 @@ mod tests { transformers: &transformers, skip_config: &vec![], database_subset: &None, + database_superset: &None, only_tables: &vec![], }; @@ -394,6 +400,7 @@ mod tests { transformers: &transformers, skip_config: &vec![], database_subset: &None, + database_superset: &None, only_tables: &vec![], }; diff --git a/replibyte/src/source/mongodb_stdin.rs b/replibyte/src/source/mongodb_stdin.rs index 8c39cbbb..75a564ac 100644 --- a/replibyte/src/source/mongodb_stdin.rs +++ b/replibyte/src/source/mongodb_stdin.rs @@ -32,6 +32,10 @@ impl Source for MongoDBStdin { todo!("database subset not supported yet for MongoDB source") } + if let Some(_database_superset) = &options.database_superset { + todo!("database superset not supported yet for MongoDB source") + } + let _ = read_and_transform(reader, options, query_callback)?; Ok(()) } diff --git a/replibyte/src/source/mysql.rs b/replibyte/src/source/mysql.rs index 04dd820f..78f611bd 100644 --- a/replibyte/src/source/mysql.rs +++ b/replibyte/src/source/mysql.rs @@ -455,6 +455,7 @@ mod tests { transformers: &transformers, skip_config: &vec![], database_subset: &None, + database_superset: &None, only_tables: &vec![], }; @@ -467,6 +468,7 @@ mod tests { transformers: &transformers, skip_config: &vec![], database_subset: &None, + database_superset: &None, only_tables: &vec![], }; assert!(p diff --git a/replibyte/src/source/postgres.rs b/replibyte/src/source/postgres.rs index 60ebae72..79dc05f5 100644 --- a/replibyte/src/source/postgres.rs +++ b/replibyte/src/source/postgres.rs @@ -16,14 +16,17 @@ use dump_parser::postgres::{ use dump_parser::utils::{list_sql_queries_from_dump_reader, ListQueryResult}; use subset::postgres::{PostgresSubset, SubsetStrategy}; use subset::{PassthroughTable, Subset, SubsetOptions}; +use superset::postgres::{PostgresSuperset, SupersetStrategy}; +use superset::{PassthroughTable, Superset, SupersetOptions}; -use crate::config::DatabaseSubsetConfigStrategy; +use crate::config::DatabaseScaleConfigStrategy; use crate::connector::Connector; use crate::source::{Explain, Source}; use crate::transformer::Transformer; use crate::types::{Column, InsertIntoQuery, OriginalQuery, Query}; use crate::utils::{binary_exists, table, wait_for_command}; use crate::DatabaseSubsetConfig; +use crate::DatabaseSupersetConfig; use super::SourceOptions; @@ -155,6 +158,16 @@ impl<'a> Source for Postgres<'a> { match &options.database_subset { None => { + match &options.database_superset { + None => { + let reader = BufReader::new(stdout); + read_and_transform(reader, options, query_callback); + } + Some(superset_config) => { + let dump_reader = BufReader::new(stdout); + let reader = superset(dump_reader, superset_config)?; + read_and_transform(reader, options, query_callback); + } let reader = BufReader::new(stdout); read_and_transform(reader, options, query_callback); } @@ -178,7 +191,7 @@ pub fn subset( let _ = io::copy(&mut dump_reader, &mut temp_dump_file)?; let strategy = match subset_config.strategy { - DatabaseSubsetConfigStrategy::Random(opt) => SubsetStrategy::RandomPercent { + DatabaseScaleConfigStrategy::Random(opt) => SubsetStrategy::RandomPercent { database: subset_config.database.as_str(), table: subset_config.table.as_str(), percent: opt.percent, @@ -219,6 +232,56 @@ pub fn subset( )) } +pub fn superset( + mut dump_reader: BufReader, + superset_config: &DatabaseSupersetConfig, +) -> Result, Error> { + let mut named_temp_file = tempfile::NamedTempFile::new()?; + let mut temp_dump_file = named_temp_file.as_file_mut(); + let _ = io::copy(&mut dump_reader, &mut temp_dump_file)?; + + let strategy = match superset_config.strategy { + DatabaseScaleConfigStrategy::Random(opt) => SupersetStrategy::RandomPercent { + database: superset_config.database.as_str(), + table: superset_config.table.as_str(), + percent: opt.percent, + }, + }; + + let empty_vec = Vec::new(); + let passthrough_tables = superset_config + .passthrough_tables + .as_ref() + .unwrap_or(&empty_vec) + .iter() + .map(|table| PassthroughTable::new(superset_config.database.as_str(), table.as_str())) + .collect::>(); + + let superset_options = SupersetOptions::new(&passthrough_tables); + let superset = PostgresSuperset::new(named_temp_file.path(), strategy, superset_options)?; + + let named_superset_file = tempfile::NamedTempFile::new()?; + let mut superset_file = named_superset_file.as_file(); + + let _ = superset.read( + |row| { + match superset_file.write(format!("{}\n", row).as_bytes()) { + Ok(_) => {} + Err(err) => { + panic!("{}", err) + } + }; + }, + |progress| { + info!("Database superset completion: {}%", progress.percent()); + }, + )?; + + Ok(BufReader::new( + File::open(named_superset_file.path()).unwrap(), + )) +} + /// consume reader and apply transformation on INSERT INTO queries if needed pub fn read_and_transform( reader: BufReader, @@ -552,7 +615,7 @@ mod tests { use std::vec; use crate::config::{ - DatabaseSubsetConfig, DatabaseSubsetConfigStrategy, DatabaseSubsetConfigStrategyRandom, + DatabaseSubsetConfig, DatabaseScaleConfigStrategy, DatabaseScaleConfigStrategyRandom, SkipConfig, }; use crate::source::postgres::{to_query, Postgres}; @@ -820,8 +883,8 @@ mod tests { database_subset: &Some(DatabaseSubsetConfig { database: "public".to_string(), table: "orders".to_string(), - strategy: DatabaseSubsetConfigStrategy::Random( - DatabaseSubsetConfigStrategyRandom { percent: 50 }, + strategy: DatabaseScaleConfigStrategy::Random( + DatabaseScaleConfigStrategyRandom { percent: 50 }, ), passthrough_tables: None, }), @@ -857,8 +920,8 @@ mod tests { database_subset: &Some(DatabaseSubsetConfig { database: "public".to_string(), table: "orders".to_string(), - strategy: DatabaseSubsetConfigStrategy::Random( - DatabaseSubsetConfigStrategyRandom { percent: 30 }, + strategy: DatabaseScaleConfigStrategy::Random( + DatabaseScaleConfigStrategyRandom { percent: 30 }, ), passthrough_tables: None, }), diff --git a/superset/Cargo.toml b/superset/Cargo.toml new file mode 100644 index 00000000..c6a97b67 --- /dev/null +++ b/superset/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "superset" +version = "0.0.1" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +dump-parser = { path = "../dump-parser" } +tempfile = "3.3" +md5 = "0.7" diff --git a/superset/README.md b/superset/README.md new file mode 100644 index 00000000..3eeeeea9 --- /dev/null +++ b/superset/README.md @@ -0,0 +1,3 @@ +# Superset + +Superset is a Rust crate to scale down a database to a more reasonable size. So it can be used in staging, test and development environments. diff --git a/superset/src/dedup.rs b/superset/src/dedup.rs new file mode 100644 index 00000000..2dd02b8a --- /dev/null +++ b/superset/src/dedup.rs @@ -0,0 +1,63 @@ +use std::fs::{File, OpenOptions}; +use std::io::{BufRead, BufReader, Error, Write}; +use std::path::Path; + +pub type Line<'a> = &'a str; +pub type GroupHash = String; + +/// Create or find the appropriate file based on the `group_hash` and append the line if it does not already exist. +pub fn does_line_exist_and_set( + temp_directory: &Path, + group_hash: &GroupHash, + line: Line, +) -> Result { + if does_line_exist(temp_directory, group_hash, line)? { + return Ok(true); + } + + let file_path = temp_directory.join(group_hash); + + // append the line because it does not exist + let mut file = OpenOptions::new() + .write(true) + .append(true) + .truncate(false) + .open(file_path.as_path())?; + + let line = format!("{}\n", line.trim_start().trim_end()); + let _ = file.write(line.as_bytes())?; + + Ok(false) +} + +pub fn does_line_exist( + temp_directory: &Path, + group_hash: &GroupHash, + line: Line, +) -> Result { + let file_path = temp_directory.join(group_hash); + let file = match File::open(file_path.as_path()) { + Ok(file) => file, + Err(_) => File::create(file_path.as_path())?, + }; + + let mut buf = String::new(); + let mut reader = BufReader::new(&file); + // remove potential whitespaces and \n + let line = line.trim_start().trim_end(); + while let Ok(amount) = reader.read_line(&mut buf) { + if amount == 0 { + // EOF + break; + } + + if buf.as_str().trim_start().trim_end() == line { + // the line already exist in the file, we can stop here + return Ok(true); + } + + let _ = buf.clear(); + } + + Ok(false) +} diff --git a/superset/src/lib.rs b/superset/src/lib.rs new file mode 100644 index 00000000..08b8f327 --- /dev/null +++ b/superset/src/lib.rs @@ -0,0 +1,123 @@ +use std::collections::HashSet; +use std::io::Error; + +mod dedup; +pub mod postgres; +mod utils; + +pub type Bytes = Vec; + +pub trait Superset { + fn read(&self, data: F, progress: P) + -> Result<(), Error>; +} + +pub struct Progress { + // total data rows + pub total_rows: usize, + // total rows to processed + pub total_rows_to_process: usize, + // rows processed + pub processed_rows: usize, + // last row processed exec time + pub last_process_time: u128, +} + +impl Progress { + pub fn percent(&self) -> u8 { + ((self.processed_rows as f64 / self.total_rows_to_process as f64) * 100.0) as u8 + } +} + +#[derive(Debug, Hash, Eq, PartialEq)] +pub struct PassthroughTable<'a> { + pub database: &'a str, + pub table: &'a str, +} + +impl<'a> PassthroughTable<'a> { + pub fn new>(database: S, table: S) -> Self { + PassthroughTable { + database: database.into(), + table: table.into(), + } + } +} + +pub struct SupersetOptions<'a> { + pub passthrough_tables: &'a HashSet>, +} + +impl<'a> SupersetOptions<'a> { + pub fn new(passthrough_tables: &'a HashSet>) -> Self { + SupersetOptions { passthrough_tables } + } +} + +#[derive(Debug, Hash, Eq, PartialEq, Clone)] +pub struct SupersetTable { + pub database: String, + pub table: String, + pub relations: Vec, +} + +impl SupersetTable { + pub fn new>( + database: S, + table: S, + relations: Vec, + ) -> Self { + SupersetTable { + database: database.into(), + table: table.into(), + relations, + } + } + + pub fn related_tables(&self) -> HashSet<&str> { + self.relations + .iter() + .map(|r| r.table.as_str()) + .collect::>() + } + + pub fn find_related_superset_tables<'a>( + &self, + superset_tables: &'a Vec<&SupersetTable>, + ) -> Vec<&'a SupersetTable> { + if superset_tables.is_empty() { + return Vec::new(); + } + + let related_tables = self.related_tables(); + + superset_tables + .iter() + .filter(|superset_table| related_tables.contains(superset_table.table.as_str())) + .map(|superset_table| *superset_table) + .collect::>() + } +} + +/// Representing a query where... +/// database -> is the targeted database +/// table -> is the targeted table +/// from_property is the parent table property referencing the target table `to_property` +#[derive(Debug, Hash, Eq, PartialEq, Clone)] +pub struct SupersetTableRelation { + pub database: String, + pub table: String, + pub from_property: String, + pub to_property: String, +} + +impl SupersetTableRelation { + pub fn new>(database: S, table: S, from_property: S, to_property: S) -> Self { + SupersetTableRelation { + database: database.into(), + table: table.into(), + from_property: from_property.into(), + to_property: to_property.into(), + } + } +} diff --git a/superset/src/postgres.rs b/superset/src/postgres.rs new file mode 100644 index 00000000..e00f4ed8 --- /dev/null +++ b/superset/src/postgres.rs @@ -0,0 +1,976 @@ +use crate::dedup::does_line_exist_and_set; +use crate::postgres::SupersetStrategy::RandomPercent; +use crate::{ + utils, PassthroughTable, Progress, Superset, SupersetOptions, SupersetTable, SupersetTableRelation, +}; +use dump_parser::postgres::{ + get_column_names_from_insert_into_query, get_column_values_str_from_insert_into_query, + get_tokens_from_query_str, get_word_value_at_position, match_keyword_at_position, + trim_pre_whitespaces, Keyword, Token, +}; +use dump_parser::utils::{list_sql_queries_from_dump_reader, ListQueryResult}; +use std::collections::HashMap; +use std::fs::File; +use std::io::{BufReader, Error, ErrorKind, Read}; +use std::ops::Index; +use std::path::Path; + +type Database = String; +type Table = String; + +#[derive(Debug)] +struct ForeignKey { + from_database: String, + from_table: String, + from_property: String, + to_database: String, + to_table: String, + to_property: String, +} + +struct TableStats { + database: String, + table: String, + columns: Vec, + total_rows: usize, + first_insert_into_row_index: usize, + last_insert_into_row_index: usize, +} + +pub enum SupersetStrategy<'a> { + RandomPercent { + database: &'a str, + table: &'a str, + percent: u8, + }, +} + +impl<'a> SupersetStrategy<'a> { + pub fn random(database: &'a str, table: &'a str, percent: u8) -> Self { + RandomPercent { + database, + table, + percent, + } + } +} + +pub struct PostgresSuperset<'a> { + superset_table_by_database_and_table_name: HashMap<(Database, Table), SupersetTable>, + dump: &'a Path, + superset_strategy: SupersetStrategy<'a>, + superset_options: SupersetOptions<'a>, +} + +impl<'a> PostgresSuperset<'a> { + pub fn new( + dump: &'a Path, + superset_strategy: SupersetStrategy<'a>, + superset_options: SupersetOptions<'a>, + ) -> Result { + Ok(PostgresSuperset { + superset_table_by_database_and_table_name: get_superset_table_by_database_and_table_name( + BufReader::new(File::open(dump).unwrap()), + )?, + dump, + superset_strategy, + superset_options, + }) + } + + fn dump_reader(&self) -> BufReader { + BufReader::new(File::open(self.dump).unwrap()) + } + + fn reference_rows( + &self, + table_stats: &HashMap<(Database, Table), TableStats>, + ) -> Result, Error> { + match self.superset_strategy { + SupersetStrategy::RandomPercent { + database, + table, + percent, + } => Ok(list_percent_of_insert_into_rows( + percent, + table_stats + .get(&(database.to_string(), table.to_string())) + .unwrap(), + self.dump_reader(), + )?), + } + } + + fn visits( + &self, + row: String, + table_stats: &HashMap<(Database, Table), TableStats>, + data: &mut F, + ) -> Result<(), Error> { + data(format!("{}\n", row)); + + // tokenize `INSERT INTO ...` row + let row_tokens = get_tokens_from_query_str(row.as_str()); + + // find the database and table names from this row + let (row_database, row_table) = + get_insert_into_database_and_table_name(&row_tokens).unwrap(); + + if self.superset_options.passthrough_tables.is_empty() + || !self + .superset_options + .passthrough_tables + .contains(&PassthroughTable::new( + row_database.as_str(), + row_table.as_str(), + )) + { + // only insert if the row is not from passthrough tables list + // otherwise we'll have duplicated rows + data(format!("{}\n", row)); + } + + // find the superset table from this row + let row_superset_table = self + .superset_table_by_database_and_table_name + .get(&(row_database.to_string(), row_table.to_string())) + .unwrap(); + + let row_column_names = get_column_names_from_insert_into_query(&row_tokens); + let row_column_values = get_column_values_str_from_insert_into_query(&row_tokens); + + for row_relation in &row_superset_table.relations { + let column = row_relation.from_property.as_str(); + // find the value from the current row for the relation column + let column_idx = row_column_names.iter().position(|x| *x == column).unwrap(); // FIXME unwrap + let value = row_column_values.get(column_idx).unwrap(); + + let database_and_table_tuple = + (row_relation.database.clone(), row_relation.table.clone()); + + // find the table stats for this row + let row_relation_table_stats = table_stats.get(&database_and_table_tuple).unwrap(); + + // TODO break acyclic graph + let row_clb = |row: &str| match self.visits(row.to_string(), table_stats, data) { + Ok(_) => {} + Err(err) => { + panic!("{}", err); + } + }; + + let _ = filter_insert_into_rows( + row_relation.to_property.as_str(), + value.as_str(), + self.dump_reader(), + row_relation_table_stats, + row_clb, + )?; + } + + Ok(()) + } +} + +impl<'a> Superset for PostgresSuperset<'a> { + /// Return every superset rows + /// Algorithm used: + /// 1. find the reference table and take the X rows from this table with the appropriate SupersetStrategy + /// 2. iterate over each row and their relations (0 to N relations) + /// 3. for each rows from each relations, filter on the id from the parent related row id. (equivalent `SELECT * FROM table_1 INNER JOIN ... WHERE table_1.id = 'xxx';` + /// 4. do it recursively for table_1.relations[*].relations[*]... but the algo stops when reaching the end or reach a cyclic ref. + /// + /// Notes: + /// a. the algo must visits all the tables, even the one that has no relations. + fn read( + &self, + mut data: F, + mut progress: P, + ) -> Result<(), Error> { + let temp_dir = tempfile::tempdir()?; + + let _ = read( + self, + |line| { + if line.contains("INSERT INTO") { + // Dedup INSERT INTO queries + // check if the line has not already been sent + match does_line_exist_and_set( + temp_dir.path(), + &get_insert_into_md5_hash(line.as_str()), + line.as_str(), + ) { + Ok(does_line_exist) => { + if !does_line_exist { + data(line); + } + } + Err(err) => { + panic!("{}", err); + } + } + } else { + data(line); + } + }, + progress, + )?; + + Ok(()) + } +} + +fn read( + postgres_superset: &PostgresSuperset, + mut data: F, + mut progress: P, +) -> Result<(), Error> { + let table_stats = table_stats_by_database_and_table_name(postgres_superset.dump_reader())?; + let rows = postgres_superset.reference_rows(&table_stats)?; + + // send schema header + let table_stats_values = table_stats.values().collect::>(); + let _ = dump_header( + postgres_superset.dump_reader(), + last_header_row_idx(&table_stats_values), + |row| { + data(row.to_string()); + }, + )?; + + let total_rows = table_stats_values + .iter() + .fold(0usize, |acc, y| acc + y.total_rows); + + let total_rows_to_process = rows.len(); + let mut processed_rows = 0usize; + + progress(Progress { + total_rows, + total_rows_to_process, + processed_rows, + last_process_time: 0, + }); + + // send INSERT INTO rows + for row in rows { + let start_time = utils::epoch_millis(); + let _ = postgres_superset.visits(row, &table_stats, &mut data)?; + + processed_rows += 1; + + progress(Progress { + total_rows, + total_rows_to_process, + processed_rows, + last_process_time: utils::epoch_millis() - start_time, + }); + } + + for passthrough_table in postgres_superset.superset_options.passthrough_tables { + // copy all rows from passthrough tables + for table_stats in &table_stats_values { + if table_stats.database.as_str() == passthrough_table.database + && table_stats.table.as_str() == passthrough_table.table + { + let _ = list_insert_into_rows(postgres_superset.dump_reader(), table_stats, |row| { + data(row.to_string()); + })?; + } + } + } + + // send schema footer + let _ = dump_footer( + postgres_superset.dump_reader(), + first_footer_row_idx(&table_stats_values), + |row| { + data(row.to_string()); + }, + )?; + + Ok(()) +} + +fn get_insert_into_md5_hash(query: &str) -> String { + let tokens = get_tokens_from_query_str(query); + let tokens = trim_pre_whitespaces(tokens); + let database = get_word_value_at_position(&tokens, 4).unwrap(); + let table = get_word_value_at_position(&tokens, 6).unwrap(); + let key = format!("{}-{}", database, table); + let digest = md5::compute(key.as_bytes()); + format!("{:x}", digest) +} + +fn list_percent_of_insert_into_rows( + percent: u8, + table_stats: &TableStats, + dump_reader: BufReader, +) -> Result, Error> { + let mut insert_into_rows = vec![]; + + if percent == 0 || table_stats.total_rows == 0 { + return Ok(insert_into_rows); + } + + let percent = if percent > 100 { 100 } else { percent }; + + let total_rows_to_pick = table_stats.total_rows as f32 * percent as f32 / 100.0; + let modulo = (table_stats.total_rows as f32 / total_rows_to_pick) as usize; + + let mut counter = 1usize; + let _ = list_insert_into_rows(dump_reader, table_stats, |rows| { + if counter % modulo == 0 { + insert_into_rows.push(rows.to_string()); + } + + counter += 1; + })?; + + Ok(insert_into_rows) +} + +fn list_insert_into_rows( + dump_reader: BufReader, + table_stats: &TableStats, + mut rows: F, +) -> Result<(), Error> { + let mut query_idx = 0usize; + let _ = list_sql_queries_from_dump_reader(dump_reader, |query| { + let mut query_res = ListQueryResult::Continue; + + // optimization to avoid tokenizing unnecessary queries -- it's a 13x optim (benched) + if query_idx >= table_stats.first_insert_into_row_index + && query_idx <= table_stats.last_insert_into_row_index + { + let tokens = get_tokens_from_query_str(query); + let tokens = trim_tokens(&tokens, Keyword::Insert); + + if match_keyword_at_position(Keyword::Insert, &tokens, 0) + && match_keyword_at_position(Keyword::Into, &tokens, 2) + && get_word_value_at_position(&tokens, 4) == Some(table_stats.database.as_str()) + && get_word_value_at_position(&tokens, 6) == Some(table_stats.table.as_str()) + { + rows(query.as_ref()); + } + } + + if query_idx > table_stats.last_insert_into_row_index { + // early break to avoid parsing the dump while we have already parsed all the table rows + query_res = ListQueryResult::Break; + } + + query_idx += 1; + query_res + })?; + + Ok(()) +} + +fn filter_insert_into_rows( + column: &str, + value: &str, + dump_reader: BufReader, + table_stats: &TableStats, + mut rows: F, +) -> Result<(), Error> { + let column_idx = match table_stats + .columns + .iter() + .position(|r| r.as_str() == column) + { + Some(idx) => idx, + None => { + return Err(Error::new( + ErrorKind::Other, + format!( + "table {} does not contain column {}", + table_stats.table, column + ), + )); + } + }; + + let mut query_idx = 0usize; + let _ = list_sql_queries_from_dump_reader(dump_reader, |query| { + let mut query_res = ListQueryResult::Continue; + + // optimization to avoid tokenizing unnecessary queries -- it's a 13x optim (benched) + if query_idx >= table_stats.first_insert_into_row_index + && query_idx <= table_stats.last_insert_into_row_index + { + let tokens = get_tokens_from_query_str(query); + let tokens = trim_tokens(&tokens, Keyword::Insert); + + if match_keyword_at_position(Keyword::Insert, &tokens, 0) + && match_keyword_at_position(Keyword::Into, &tokens, 2) + && get_word_value_at_position(&tokens, 4) == Some(table_stats.database.as_str()) + && get_word_value_at_position(&tokens, 6) == Some(table_stats.table.as_str()) + { + let column_values = get_column_values_str_from_insert_into_query(&tokens); + + if *column_values.index(column_idx) == value { + rows(query) + } + } + } + + if query_idx > table_stats.last_insert_into_row_index { + // early break to avoid parsing the dump while we have already parsed all the table rows + query_res = ListQueryResult::Break; + } + + query_idx += 1; + query_res + })?; + + Ok(()) +} + +/// return the last row index from dump header (with generated table stats) +fn last_header_row_idx(table_stats_values: &Vec<&TableStats>) -> usize { + table_stats_values + .iter() + .filter(|ts| ts.first_insert_into_row_index > 0) // first_insert_into_row_index can be equals to 0 if there is no INSERT INTO... + .min_by_key(|ts| ts.first_insert_into_row_index) + .map(|ts| ts.first_insert_into_row_index) + .unwrap() + - 1 // FIXME catch this even if it should not happen +} + +/// return the first row index from dump header (with generated table stats) +fn first_footer_row_idx(table_stats_values: &Vec<&TableStats>) -> usize { + table_stats_values + .iter() + .max_by_key(|ts| ts.last_insert_into_row_index) + .map(|ts| ts.last_insert_into_row_index) + .unwrap() + + 1 // FIXME catch this even if it should not happen +} + +/// Get Postgres dump header - everything before the first `INSERT INTO ...` row +/// pg_dump export dump data in 3 phases: `CREATE TABLE ...`, `INSERT INTO ...`, and `ALTER TABLE ...`. +/// this function return all the `CREATE TABLE ...` rows. +fn dump_header( + dump_reader: BufReader, + last_header_row_idx: usize, + mut rows: F, +) -> Result<(), Error> { + let mut query_idx = 0usize; + let _ = list_sql_queries_from_dump_reader(dump_reader, |query| { + let mut query_res = ListQueryResult::Continue; + + if query_idx <= last_header_row_idx { + rows(query) + } + + if query_idx > last_header_row_idx { + query_res = ListQueryResult::Break; + } + + query_idx += 1; + query_res + })?; + + Ok(()) +} + +/// Get Postgres dump footer - everything after the last `INSERT INTO ...` row +/// pg_dump export dump data in 3 phases: `CREATE TABLE ...`, `INSERT INTO ...`, and `ALTER TABLE ...`. +/// this function return all the `ALTER TABLE ...` rows. +fn dump_footer( + dump_reader: BufReader, + first_footer_row_idx: usize, + mut rows: F, +) -> Result<(), Error> { + let mut query_idx = 0usize; + let _ = list_sql_queries_from_dump_reader(dump_reader, |query| { + if query_idx >= first_footer_row_idx { + rows(query) + } + + query_idx += 1; + ListQueryResult::Continue + })?; + + Ok(()) +} + +fn table_stats_by_database_and_table_name( + dump_reader: BufReader, +) -> Result, Error> { + let mut table_stats_by_database_and_table_name = + HashMap::<(Database, Table), TableStats>::new(); + + let mut query_idx = 0usize; + let _ = list_sql_queries_from_dump_reader(dump_reader, |query| { + let tokens = get_tokens_from_query_str(query); + + let _ = match get_create_table_database_and_table_name(&tokens) { + Some((database, table)) => { + table_stats_by_database_and_table_name.insert( + (database.clone(), table.clone()), + TableStats { + database, + table, + columns: vec![], + total_rows: 0, + first_insert_into_row_index: 0, + last_insert_into_row_index: 0, + }, + ); + } + None => {} + }; + + // remove potential whitespaces + let tokens = trim_tokens(&tokens, Keyword::Insert); + + if match_keyword_at_position(Keyword::Insert, &tokens, 0) + && match_keyword_at_position(Keyword::Into, &tokens, 2) + { + if let Some(database) = get_word_value_at_position(&tokens, 4) { + if let Some(table) = get_word_value_at_position(&tokens, 6) { + match table_stats_by_database_and_table_name + .get_mut(&(database.to_string(), table.to_string())) + { + Some(table_stats) => { + if table_stats.total_rows == 0 { + // I assume that the INSERT INTO row has all the column set + let columns = get_column_names_from_insert_into_query(&tokens) + .iter() + .map(|name| name.to_string()) + .collect::>(); + + table_stats.columns = columns; + } + + if table_stats.first_insert_into_row_index == 0 { + table_stats.first_insert_into_row_index = query_idx; + } + + table_stats.last_insert_into_row_index = query_idx; + table_stats.total_rows += 1; + } + None => { + // should not happen because INSERT INTO must come after CREATE TABLE + println!("Query: {}", query); + panic!("Unexpected: INSERT INTO happened before CREATE TABLE while creating table_stats structure") + } + } + } + } + } + + query_idx += 1; + ListQueryResult::Continue + })?; + + Ok(table_stats_by_database_and_table_name) +} + +fn trim_tokens(tokens: &Vec, keyword: Keyword) -> Vec { + tokens + .iter() + .skip_while(|token| match *token { + Token::Word(word) if word.keyword == keyword => false, + _ => true, + }) + .map(|token| token.clone()) // FIXME - do not clone token + .collect::>() +} + +fn get_superset_table_by_database_and_table_name( + dump_reader: BufReader, +) -> Result, Error> { + let mut superset_table_by_database_and_table_name = + HashMap::<(Database, Table), SupersetTable>::new(); + + list_sql_queries_from_dump_reader(dump_reader, |query| { + let tokens = get_tokens_from_query_str(query); + + if let Some((database, table)) = get_create_table_database_and_table_name(&tokens) { + // add table into index + let _ = superset_table_by_database_and_table_name.insert( + (database.clone(), table.clone()), + SupersetTable::new(database, table, vec![]), + ); + } + + if let Some(fk) = get_alter_table_foreign_key(&tokens) { + let _ = match superset_table_by_database_and_table_name + .get_mut(&(fk.from_database, fk.from_table)) + { + Some(superset_table) => { + superset_table.relations.push(SupersetTableRelation::new( + fk.to_database, + fk.to_table, + fk.from_property, + fk.to_property, + )); + } + None => {} // FIXME + }; + } + + ListQueryResult::Continue + })?; + + Ok(superset_table_by_database_and_table_name) +} + +fn get_create_table_database_and_table_name(tokens: &Vec) -> Option<(Database, Table)> { + let tokens = trim_tokens(&tokens, Keyword::Create); + + if tokens.is_empty() { + return None; + } + + if match_keyword_at_position(Keyword::Create, &tokens, 0) + && match_keyword_at_position(Keyword::Table, &tokens, 2) + { + if let Some(database) = get_word_value_at_position(&tokens, 4) { + if let Some(table) = get_word_value_at_position(&tokens, 6) { + return Some((database.to_string(), table.to_string())); + } + } + } + + None +} + +fn get_insert_into_database_and_table_name(tokens: &Vec) -> Option<(Database, Table)> { + let tokens = trim_tokens(&tokens, Keyword::Insert); + + if tokens.is_empty() { + return None; + } + + if match_keyword_at_position(Keyword::Insert, &tokens, 0) + && match_keyword_at_position(Keyword::Into, &tokens, 2) + { + if let Some(database) = get_word_value_at_position(&tokens, 4) { + if let Some(table) = get_word_value_at_position(&tokens, 6) { + return Some((database.to_string(), table.to_string())); + } + } + } + + None +} + +fn get_alter_table_foreign_key(tokens: &Vec) -> Option { + let tokens = trim_tokens(&tokens, Keyword::Alter); + + if tokens.is_empty() { + return None; + } + + if !match_keyword_at_position(Keyword::Alter, &tokens, 0) + || !match_keyword_at_position(Keyword::Table, &tokens, 2) + { + return None; + } + + let database_name_pos = if match_keyword_at_position(Keyword::Only, &tokens, 4) { + 6 + } else { + 4 + }; + + let table_name_pos = if match_keyword_at_position(Keyword::Only, &tokens, 4) { + 8 + } else { + 6 + }; + + let from_database_name = match get_word_value_at_position(&tokens, database_name_pos) { + Some(database_name) => database_name, + None => return None, + }; + + let from_table_name = match get_word_value_at_position(&tokens, table_name_pos) { + Some(table_name) => table_name, + None => return None, + }; + + let next_foreign_tokens = tokens + .iter() + .skip_while(|token| match token { + Token::Word(word) if word.keyword == Keyword::Foreign => false, + _ => true, + }) + .map(|token| token.clone()) + .collect::>(); + + let from_property = match get_word_value_at_position(&next_foreign_tokens, 5) { + Some(property) => property, + None => return None, + }; + + let to_database_name = match get_word_value_at_position(&next_foreign_tokens, 10) { + Some(database_name) => database_name, + None => return None, + }; + + let to_table_name = match get_word_value_at_position(&next_foreign_tokens, 12) { + Some(table_name) => table_name, + None => return None, + }; + + let to_property = match get_word_value_at_position(&next_foreign_tokens, 14) { + Some(property) => property, + None => return None, + }; + + Some(ForeignKey { + from_database: from_database_name.to_string(), + from_table: from_table_name.to_string(), + from_property: from_property.to_string(), + to_database: to_database_name.to_string(), + to_table: to_table_name.to_string(), + to_property: to_property.to_string(), + }) +} + +#[cfg(test)] +mod tests { + use crate::postgres::{ + dump_footer, dump_header, filter_insert_into_rows, first_footer_row_idx, + get_alter_table_foreign_key, get_create_table_database_and_table_name, + get_superset_table_by_database_and_table_name, last_header_row_idx, + list_percent_of_insert_into_rows, table_stats_by_database_and_table_name, PostgresSuperset, + SupersetStrategy, + }; + use crate::{PassthroughTable, Superset, SupersetOptions}; + use dump_parser::postgres::Tokenizer; + use std::collections::HashSet; + use std::fs::File; + use std::io::BufReader; + use std::path::{Path, PathBuf}; + + fn dump_path() -> PathBuf { + Path::new("db") + .join("postgres") + .join("fulldump-with-inserts.sql") + } + + fn dump_reader() -> BufReader { + BufReader::new(File::open(dump_path()).unwrap()) + } + + #[test] + fn check_statements_with_tokens() { + let q = "SELECT * FROM toto;"; + let tokens = Tokenizer::new(q).tokenize().unwrap(); + assert_eq!(get_create_table_database_and_table_name(&tokens), None); + + let q = r#" +CREATE TABLE public.order_details ( + order_id smallint NOT NULL, + product_id smallint NOT NULL, + unit_price real NOT NULL, + quantity smallint NOT NULL, + discount real NOT NULL +);"#; + + let tokens = Tokenizer::new(q).tokenize().unwrap(); + + assert_eq!( + get_create_table_database_and_table_name(&tokens), + Some(("public".to_string(), "order_details".to_string())) + ); + + let q = r#"ALTER TABLE public.employees OWNER TO root;"#; + let tokens = Tokenizer::new(q).tokenize().unwrap(); + assert!(get_alter_table_foreign_key(&tokens).is_none()); + + let q = r#" +ALTER TABLE ONLY public.territories + ADD CONSTRAINT fk_territories_region FOREIGN KEY (region_id) REFERENCES public.region(region_id); +"#; + + let tokens = Tokenizer::new(q).tokenize().unwrap(); + let fk = get_alter_table_foreign_key(&tokens).unwrap(); + assert_eq!(fk.from_database, "public".to_string()); + assert_eq!(fk.from_table, "territories".to_string()); + assert_eq!(fk.from_property, "region_id".to_string()); + assert_eq!(fk.to_database, "public".to_string()); + assert_eq!(fk.to_table, "region".to_string()); + assert_eq!(fk.to_property, "region_id".to_string()); + } + + #[test] + fn check_superset_table() { + let m = get_superset_table_by_database_and_table_name(dump_reader()).unwrap(); + assert!(m.len() > 0); + + let t = m + .get(&("public".to_string(), "customer_demographics".to_string())) + .unwrap(); + + assert_eq!(t.database, "public".to_string()); + assert_eq!(t.table, "customer_demographics".to_string()); + assert_eq!(t.relations.len(), 0); + + let t = m + .get(&("public".to_string(), "customer_customer_demo".to_string())) + .unwrap(); + + assert_eq!(t.database, "public".to_string()); + assert_eq!(t.table, "customer_customer_demo".to_string()); + assert_eq!(t.relations.len(), 2); + assert_eq!(t.related_tables().len(), 2); + + let t = m + .get(&("public".to_string(), "customers".to_string())) + .unwrap(); + + assert_eq!(t.database, "public".to_string()); + assert_eq!(t.table, "customers".to_string()); + assert_eq!(t.relations.len(), 0); + } + + #[test] + fn check_table_stats() { + let table_stats = table_stats_by_database_and_table_name(dump_reader()).unwrap(); + assert!(table_stats.len() > 0); + // TODO add more tests to check table.rows size + } + + #[test] + fn check_percent_of_rows() { + let table_stats = table_stats_by_database_and_table_name(dump_reader()).unwrap(); + let first_table_stats = table_stats + .get(&("public".to_string(), "order_details".to_string())) + .unwrap(); + + let rows = list_percent_of_insert_into_rows(5, first_table_stats, dump_reader()).unwrap(); + + assert!(rows.len() < first_table_stats.total_rows) + } + + #[test] + fn check_filter_insert_into_rows() { + let table_stats = table_stats_by_database_and_table_name(dump_reader()).unwrap(); + let first_table_stats = table_stats + .get(&("public".to_string(), "order_details".to_string())) + .unwrap(); + + let mut found_rows = vec![]; + filter_insert_into_rows( + "product_id", + "11", + dump_reader(), + first_table_stats, + |row| { + found_rows.push(row.to_string()); + }, + ) + .unwrap(); + + assert_eq!(found_rows.len(), 38) + } + + #[test] + fn check_header_dump() { + let table_stats = table_stats_by_database_and_table_name(dump_reader()).unwrap(); + + assert!(!table_stats.is_empty()); + + let table_stats_values = table_stats.values().collect::>(); + let idx = last_header_row_idx(&table_stats_values); + + assert!(idx > 0); + + let mut rows = vec![]; + let _ = dump_header(dump_reader(), idx, |row| { + rows.push(row.to_string()); + }) + .unwrap(); + + assert_eq!(rows.iter().filter(|x| x.contains("INSERT INTO")).count(), 0); + assert!(!rows.is_empty()); + } + + #[test] + fn check_footer_dump() { + let table_stats = table_stats_by_database_and_table_name(dump_reader()).unwrap(); + + assert!(!table_stats.is_empty()); + + let table_stats_values = table_stats.values().collect::>(); + let idx = first_footer_row_idx(&table_stats_values); + + assert!(idx > 0); + + let mut rows = vec![]; + let _ = dump_footer(dump_reader(), idx, |row| { + rows.push(row.to_string()); + }) + .unwrap(); + + assert_eq!(rows.iter().filter(|x| x.contains("INSERT INTO")).count(), 0); + assert!(!rows.is_empty()); + } + + #[test] + fn check_postgres_superset() { + let path = dump_path(); + let mut s = HashSet::new(); + s.insert(PassthroughTable::new("public", "us_states")); + + let postgres_superset = PostgresSuperset::new( + path.as_path(), + SupersetStrategy::random("public", "orders", 50), + SupersetOptions::new(&s), + ) + .unwrap(); + + let mut rows = vec![]; + let mut total_rows = 0usize; + let mut total_rows_to_process = 0usize; + let mut total_rows_processed = 0usize; + postgres_superset + .read( + |row| { + rows.push(row); + }, + |progress| { + if total_rows == 0 { + total_rows = progress.total_rows; + } + + if total_rows_to_process == 0 { + total_rows_to_process = progress.total_rows_to_process; + } + + total_rows_processed = progress.processed_rows; + + println!( + "database superset progression: {}% (last process time: {}ms)", + progress.percent(), + progress.last_process_time + ); + }, + ) + .unwrap(); + + println!( + "{}/{} total database rows", + total_rows_processed, total_rows + ); + println!( + "{}/{} rows processed", + total_rows_processed, total_rows_to_process + ); + assert!(total_rows_processed < total_rows); + assert_eq!(total_rows_processed, total_rows_to_process); + assert_eq!( + rows.iter() + .filter(|x| x.contains("INSERT INTO public.us_states")) + .count(), + 51 + ); + } +} diff --git a/superset/src/utils.rs b/superset/src/utils.rs new file mode 100644 index 00000000..93c857d6 --- /dev/null +++ b/superset/src/utils.rs @@ -0,0 +1,8 @@ +use std::time::{SystemTime, UNIX_EPOCH}; + +pub fn epoch_millis() -> u128 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() +} From ff7d4628edad101b81b6cf24e772c2e12d1b7019 Mon Sep 17 00:00:00 2001 From: Terry Wang Date: Wed, 21 Aug 2024 22:02:13 +0000 Subject: [PATCH 3/5] fix errors --- replibyte/src/source/postgres.rs | 8 ++++---- superset/src/lib.rs | 10 +++++----- superset/src/postgres.rs | 8 ++++---- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/replibyte/src/source/postgres.rs b/replibyte/src/source/postgres.rs index 79dc05f5..48da199f 100644 --- a/replibyte/src/source/postgres.rs +++ b/replibyte/src/source/postgres.rs @@ -16,8 +16,9 @@ use dump_parser::postgres::{ use dump_parser::utils::{list_sql_queries_from_dump_reader, ListQueryResult}; use subset::postgres::{PostgresSubset, SubsetStrategy}; use subset::{PassthroughTable, Subset, SubsetOptions}; + use superset::postgres::{PostgresSuperset, SupersetStrategy}; -use superset::{PassthroughTable, Superset, SupersetOptions}; +use superset::{PassthroughTableSuperSet, Superset, SupersetOptions}; use crate::config::DatabaseScaleConfigStrategy; use crate::connector::Connector; @@ -168,8 +169,7 @@ impl<'a> Source for Postgres<'a> { let reader = superset(dump_reader, superset_config)?; read_and_transform(reader, options, query_callback); } - let reader = BufReader::new(stdout); - read_and_transform(reader, options, query_callback); + } } Some(subset_config) => { let dump_reader = BufReader::new(stdout); @@ -254,7 +254,7 @@ pub fn superset( .as_ref() .unwrap_or(&empty_vec) .iter() - .map(|table| PassthroughTable::new(superset_config.database.as_str(), table.as_str())) + .map(|table| PassthroughTableSuperSet::new(superset_config.database.as_str(), table.as_str())) .collect::>(); let superset_options = SupersetOptions::new(&passthrough_tables); diff --git a/superset/src/lib.rs b/superset/src/lib.rs index 08b8f327..69b3994b 100644 --- a/superset/src/lib.rs +++ b/superset/src/lib.rs @@ -30,14 +30,14 @@ impl Progress { } #[derive(Debug, Hash, Eq, PartialEq)] -pub struct PassthroughTable<'a> { +pub struct PassthroughTableSuperSet<'a> { pub database: &'a str, pub table: &'a str, } -impl<'a> PassthroughTable<'a> { +impl<'a> PassthroughTableSuperSet<'a> { pub fn new>(database: S, table: S) -> Self { - PassthroughTable { + PassthroughTableSuperSet { database: database.into(), table: table.into(), } @@ -45,11 +45,11 @@ impl<'a> PassthroughTable<'a> { } pub struct SupersetOptions<'a> { - pub passthrough_tables: &'a HashSet>, + pub passthrough_tables: &'a HashSet>, } impl<'a> SupersetOptions<'a> { - pub fn new(passthrough_tables: &'a HashSet>) -> Self { + pub fn new(passthrough_tables: &'a HashSet>) -> Self { SupersetOptions { passthrough_tables } } } diff --git a/superset/src/postgres.rs b/superset/src/postgres.rs index e00f4ed8..1a4c7613 100644 --- a/superset/src/postgres.rs +++ b/superset/src/postgres.rs @@ -1,7 +1,7 @@ use crate::dedup::does_line_exist_and_set; use crate::postgres::SupersetStrategy::RandomPercent; use crate::{ - utils, PassthroughTable, Progress, Superset, SupersetOptions, SupersetTable, SupersetTableRelation, + utils, PassthroughTableSuperSet, Progress, Superset, SupersetOptions, SupersetTable, SupersetTableRelation, }; use dump_parser::postgres::{ get_column_names_from_insert_into_query, get_column_values_str_from_insert_into_query, @@ -120,7 +120,7 @@ impl<'a> PostgresSuperset<'a> { || !self .superset_options .passthrough_tables - .contains(&PassthroughTable::new( + .contains(&PassthroughTableSuperSet::new( row_database.as_str(), row_table.as_str(), )) @@ -742,7 +742,7 @@ mod tests { list_percent_of_insert_into_rows, table_stats_by_database_and_table_name, PostgresSuperset, SupersetStrategy, }; - use crate::{PassthroughTable, Superset, SupersetOptions}; + use crate::{PassthroughTableSuperSet, Superset, SupersetOptions}; use dump_parser::postgres::Tokenizer; use std::collections::HashSet; use std::fs::File; @@ -918,7 +918,7 @@ ALTER TABLE ONLY public.territories fn check_postgres_superset() { let path = dump_path(); let mut s = HashSet::new(); - s.insert(PassthroughTable::new("public", "us_states")); + s.insert(PassthroughTableSuperSet::new("public", "us_states")); let postgres_superset = PostgresSuperset::new( path.as_path(), From 1800eddfc1221c80c838e342de938bd480e2686a Mon Sep 17 00:00:00 2001 From: Terry Wang Date: Fri, 23 Aug 2024 19:43:33 +0000 Subject: [PATCH 4/5] superset debug --- replibyte/src/main.rs | 5 ++--- replibyte/src/source/postgres.rs | 35 ++++++++++++++++---------------- superset/src/lib.rs | 1 + superset/src/postgres.rs | 1 + 4 files changed, 22 insertions(+), 20 deletions(-) diff --git a/replibyte/src/main.rs b/replibyte/src/main.rs index 68f7faae..b6e68a54 100644 --- a/replibyte/src/main.rs +++ b/replibyte/src/main.rs @@ -152,11 +152,11 @@ fn run(config: Config, sub_commands: &SubCommand) -> anyhow::Result<()> { RestoreCommand::Remote(args) => if args.output {}, }, _ => { - let _ = thread::spawn(move || show_progress_bar(rx_pb)); + // let _ = thread::spawn(move || show_progress_bar(rx_pb)); } }, _ => { - let _ = thread::spawn(move || show_progress_bar(rx_pb)); + // let _ = thread::spawn(move || show_progress_bar(rx_pb)); } }; @@ -174,7 +174,6 @@ fn run(config: Config, sub_commands: &SubCommand) -> anyhow::Result<()> { if let Some(name) = &args.name { datastore.set_dump_name(name.to_string()); } - commands::dump::run(args, datastore, config, progress_callback) } DumpCommand::Delete(args) => commands::dump::delete(datastore, args), diff --git a/replibyte/src/source/postgres.rs b/replibyte/src/source/postgres.rs index 48da199f..4509c72e 100644 --- a/replibyte/src/source/postgres.rs +++ b/replibyte/src/source/postgres.rs @@ -160,15 +160,15 @@ impl<'a> Source for Postgres<'a> { match &options.database_subset { None => { match &options.database_superset { - None => { - let reader = BufReader::new(stdout); - read_and_transform(reader, options, query_callback); - } Some(superset_config) => { let dump_reader = BufReader::new(stdout); let reader = superset(dump_reader, superset_config)?; read_and_transform(reader, options, query_callback); } + _ => { + let reader = BufReader::new(stdout); + read_and_transform(reader, options, query_callback); + } } } Some(subset_config) => { @@ -263,19 +263,20 @@ pub fn superset( let named_superset_file = tempfile::NamedTempFile::new()?; let mut superset_file = named_superset_file.as_file(); - let _ = superset.read( - |row| { - match superset_file.write(format!("{}\n", row).as_bytes()) { - Ok(_) => {} - Err(err) => { - panic!("{}", err) - } - }; - }, - |progress| { - info!("Database superset completion: {}%", progress.percent()); - }, - )?; + // let _ = superset.read( + // |row| { + // println!("{} \n new row",row); + // match superset_file.write(format!("{}\n", row).as_bytes()) { + // Ok(_) => {} + // Err(err) => { + // panic!("{}", err) + // } + // }; + // }, + // |progress| { + // info!("Database superset completion: {}%", progress.percent()); + // }, + // )?; Ok(BufReader::new( File::open(named_superset_file.path()).unwrap(), diff --git a/superset/src/lib.rs b/superset/src/lib.rs index 69b3994b..e33dc811 100644 --- a/superset/src/lib.rs +++ b/superset/src/lib.rs @@ -1,5 +1,6 @@ use std::collections::HashSet; use std::io::Error; +use std::fmt; mod dedup; pub mod postgres; diff --git a/superset/src/postgres.rs b/superset/src/postgres.rs index 1a4c7613..c69092bc 100644 --- a/superset/src/postgres.rs +++ b/superset/src/postgres.rs @@ -192,6 +192,7 @@ impl<'a> Superset for PostgresSuperset<'a> { let _ = read( self, |line| { + println!("{}",line); if line.contains("INSERT INTO") { // Dedup INSERT INTO queries // check if the line has not already been sent From 3571b428baffd260dae1c042c8e764ca2cdd9cda Mon Sep 17 00:00:00 2001 From: Terry Wang Date: Tue, 3 Sep 2024 22:40:10 +0000 Subject: [PATCH 5/5] debug 3 --- replibyte/Cargo.toml | 2 +- replibyte/src/source/mongodb.rs | 20 +++++++-------- replibyte/src/source/postgres.rs | 36 ++++++++++++++------------ superset/Cargo.toml | 1 + superset/src/postgres.rs | 44 +++++++++++++++++++++++++++----- 5 files changed, 69 insertions(+), 34 deletions(-) diff --git a/replibyte/Cargo.toml b/replibyte/Cargo.toml index fbf67372..76efdbc8 100644 --- a/replibyte/Cargo.toml +++ b/replibyte/Cargo.toml @@ -35,7 +35,7 @@ flate2 = "1.0" bson = "2.2" aes-gcm = "0.9" which = "4.2.5" -mongodb-schema-parser = { git = "https://github.com/mongodb-rust/mongodb-schema-parser.git", rev = "2d489307dd70b63b216a9968f7dec7c217108b32" } +# mongodb-schema-parser = { git = "https://github.com/mongodb-rust/mongodb-schema-parser.git", rev = "2d489307dd70b63b216a9968f7dec7c217108b32" } url = "2.2.2" tempfile = "3.3" ctrlc = "3.2.1" diff --git a/replibyte/src/source/mongodb.rs b/replibyte/src/source/mongodb.rs index e94ba94c..f273b39c 100644 --- a/replibyte/src/source/mongodb.rs +++ b/replibyte/src/source/mongodb.rs @@ -11,7 +11,7 @@ use crate::SourceOptions; use bson::{Bson, Document}; use dump_parser::mongodb::Archive; -use mongodb_schema_parser::SchemaParser; +// use mongodb_schema_parser::SchemaParser; pub struct MongoDB<'a> { uri: &'a str, @@ -312,20 +312,20 @@ pub fn read_and_parse_schema(reader: BufReader) -> Result<(), Error> table.set_titles(row![format!("Collection {}", name)]); - let mut schema_parser = SchemaParser::new(); + // let mut schema_parser = SchemaParser::new(); - for doc in collection { - schema_parser.write_bson(doc).unwrap(); - } + // for doc in collection { + // schema_parser.write_bson(doc).unwrap(); + // } - let schema = schema_parser.flush(); + // let schema = schema_parser.flush(); - let json_data = serde_json::to_string_pretty(&schema).unwrap(); + // let json_data = serde_json::to_string_pretty(&schema).unwrap(); - table.add_row(row![name]); - table.add_row(row![json_data]); + // table.add_row(row![name]); + // table.add_row(row![json_data]); - let _ = table.printstd(); + // let _ = table.printstd(); } }); diff --git a/replibyte/src/source/postgres.rs b/replibyte/src/source/postgres.rs index 4509c72e..be9da1d4 100644 --- a/replibyte/src/source/postgres.rs +++ b/replibyte/src/source/postgres.rs @@ -163,7 +163,9 @@ impl<'a> Source for Postgres<'a> { Some(superset_config) => { let dump_reader = BufReader::new(stdout); let reader = superset(dump_reader, superset_config)?; - read_and_transform(reader, options, query_callback); + println!("superset read done"); + // read_and_transform(reader, options, query_callback); + println!("read_and_transform done"); } _ => { let reader = BufReader::new(stdout); @@ -226,7 +228,6 @@ pub fn subset( info!("Database subset completion: {}%", progress.percent()); }, )?; - Ok(BufReader::new( File::open(named_subset_file.path()).unwrap(), )) @@ -262,21 +263,22 @@ pub fn superset( let named_superset_file = tempfile::NamedTempFile::new()?; let mut superset_file = named_superset_file.as_file(); + println!("superset begin"); - // let _ = superset.read( - // |row| { - // println!("{} \n new row",row); - // match superset_file.write(format!("{}\n", row).as_bytes()) { - // Ok(_) => {} - // Err(err) => { - // panic!("{}", err) - // } - // }; - // }, - // |progress| { - // info!("Database superset completion: {}%", progress.percent()); - // }, - // )?; + let _ = superset.read( + |row| { + match superset_file.write(format!("{}\n", row).as_bytes()) { + Ok(_) => {} + Err(err) => { + panic!("{}", err) + } + }; + }, + |progress| { + info!("Database superset completion: {}%", progress.percent()); + }, + )?; + println!("superset done"); Ok(BufReader::new( File::open(named_superset_file.path()).unwrap(), @@ -361,7 +363,7 @@ pub fn read_and_transform( no_change_query_callback(query_callback.borrow_mut(), query); } } - + println!("list_sql_queries_from_dump_reader row done"); ListQueryResult::Continue }) { Ok(_) => {} diff --git a/superset/Cargo.toml b/superset/Cargo.toml index c6a97b67..a8af2db6 100644 --- a/superset/Cargo.toml +++ b/superset/Cargo.toml @@ -9,3 +9,4 @@ edition = "2021" dump-parser = { path = "../dump-parser" } tempfile = "3.3" md5 = "0.7" +peak_alloc = "0.2.0" diff --git a/superset/src/postgres.rs b/superset/src/postgres.rs index c69092bc..32e9e0f1 100644 --- a/superset/src/postgres.rs +++ b/superset/src/postgres.rs @@ -9,15 +9,19 @@ use dump_parser::postgres::{ trim_pre_whitespaces, Keyword, Token, }; use dump_parser::utils::{list_sql_queries_from_dump_reader, ListQueryResult}; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fs::File; use std::io::{BufReader, Error, ErrorKind, Read}; use std::ops::Index; use std::path::Path; +use peak_alloc::PeakAlloc; type Database = String; type Table = String; +#[global_allocator] +static PEAK_ALLOC: PeakAlloc = PeakAlloc; + #[derive(Debug)] struct ForeignKey { from_database: String, @@ -28,6 +32,7 @@ struct ForeignKey { to_property: String, } +#[derive(Debug)] struct TableStats { database: String, table: String, @@ -106,7 +111,17 @@ impl<'a> PostgresSuperset<'a> { row: String, table_stats: &HashMap<(Database, Table), TableStats>, data: &mut F, + visited: &mut HashSet, ) -> Result<(), Error> { + + // Check if the row has already been visited + if visited.contains(&row) { + return Ok(()); // If visited, skip further processing + } + + // Mark this row as visited + visited.insert(row.clone()); + data(format!("{}\n", row)); // tokenize `INSERT INTO ...` row @@ -147,18 +162,27 @@ impl<'a> PostgresSuperset<'a> { let database_and_table_tuple = (row_relation.database.clone(), row_relation.table.clone()); + // println!("row_relation {:?}", row_relation); // find the table stats for this row let row_relation_table_stats = table_stats.get(&database_and_table_tuple).unwrap(); + // println!("row_relation_table_stats {:?}", row_relation_table_stats); - // TODO break acyclic graph - let row_clb = |row: &str| match self.visits(row.to_string(), table_stats, data) { + if row_relation_table_stats.total_rows == 0usize + { + // println!("row_relation_table_stats total_rows 0"); + continue + } + // println!("row_clb visit"); + + // break acyclic graph + let row_clb = |row: &str| match self.visits(row.to_string(), table_stats, data, visited) { Ok(_) => {} Err(err) => { panic!("{}", err); } }; - + // println!("row_clb visit subdone"); let _ = filter_insert_into_rows( row_relation.to_property.as_str(), value.as_str(), @@ -166,6 +190,8 @@ impl<'a> PostgresSuperset<'a> { row_relation_table_stats, row_clb, )?; + // println!("filter_insert_into_rows done"); + } Ok(()) @@ -255,8 +281,14 @@ fn read( // send INSERT INTO rows for row in rows { + println!("filter_insert_into_rows_list_query"); + let current_mem = PEAK_ALLOC.current_usage_as_mb(); + println!("This program currently uses {} MB of RAM.", current_mem); + let peak_mem = PEAK_ALLOC.peak_usage_as_gb(); + println!("The max amount that was used {}", peak_mem); let start_time = utils::epoch_millis(); - let _ = postgres_superset.visits(row, &table_stats, &mut data)?; + let mut visited = HashSet::new(); + let _ = postgres_superset.visits(row, &table_stats, &mut data, &mut visited)?; processed_rows += 1; @@ -266,8 +298,8 @@ fn read( processed_rows, last_process_time: utils::epoch_millis() - start_time, }); + visited.clear(); } - for passthrough_table in postgres_superset.superset_options.passthrough_tables { // copy all rows from passthrough tables for table_stats in &table_stats_values {