-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathetl-otimizado.py
134 lines (104 loc) · 4.93 KB
/
etl-otimizado.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import broadcast
#spark = SparkSession.builder.appName("ETL Example").getOrCreate()
## INICIAR SESSÃO SPARK (exemplos MySQL e Postgresql)
##Certifique-se de que o JAR do driver MySQL está acessível no caminho especificado
##e que as credenciais e a URL estão corretas para a sua configuração do banco de dados.'''
# MySQL
spark = SparkSession.builder \
.appName("ETL Example") \
.config("spark.jars", "/path/to/mysql-<version>.jar") \
.getOrCreate() # Caminho para o jar do driver MySQL
#Postgresql
'''
spark = SparkSession.builder \
.appName("ETL Example") \
.config("spark.jars", "/path/to/postgresql-<version>.jar") \
.getOrCreate() # Caminho para o jar do driver PostgreSQL
'''
## EXTRACT
# Leitura de arquivos CSV
df_csv = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)
# Leitura de banco de dados SQL (mysql)
df_sql = spark.read.format("jdbc").options(
url="jdbc:mysql://localhost:3306/dbname", # Altere para a URL do MySQL
driver="com.mysql.jdbc.Driver", # Altere para o driver do MySQL
dbtable="table_name",
user="username",
password="password"
).load()
# Leitura de banco de dados SQL (postgresql
'''
df_sql = spark.read.format("jdbc").options(
url="jdbc:postgresql://localhost:5432/dbname", # Altere para a URL do PostgreSQL
driver="org.postgresql.Driver", # Altere para o driver do PostgreSQL
dbtable="table_name",
user="username",
password="password"
).load()
'''
## TRANSFORM
# Filtros e seleções: Filtrar e selecionar colunas específicas.
# Filtrar dados
df_filtered = df_csv.filter(df_csv['age'] > 30)
# Selecionar colunas
df_selected = df_csv.select('name', 'age', 'salary')
# Agregação e agregações avançadas: Como contagem, soma e médias.
# Agregação por grupo
df_aggregated = df_csv.groupBy("department").agg(F.avg("salary").alias("average_salary"))
# Junções (Joins): O Spark é otimizado para junções distribuídas.
df_joined = df_csv.join(df_sql, df_csv["id"] == df_sql["id"], "inner")
# Manipulações de dados: Como adicionar novas colunas ou transformar dados.
# Criar uma nova coluna
df_transformed = df_csv.withColumn('salary_increase', df_csv['salary'] * 1.1)
## LOAD
# Carregar para S3 (formato Parquet)
df_filtered.write.parquet("s3://bucket-name/path/", mode="overwrite")
# Carregar para um banco MySQL
df_transformed.write.format("jdbc").options(
url="jdbc:mysql://localhost:3306/dbname", # Altere para a URL do MySQL
driver="com.mysql.jdbc.Driver", # Altere para o driver do MySQL
dbtable="output_table",
user="username",
password="password"
).mode("overwrite").save()
# Carregar para um banco PostgreSQL
'''
df_transformed.write.format("jdbc").options(
url="jdbc:postgresql://localhost:5432/dbname", # Altere para a URL do PostgreSQL
driver="org.postgresql.Driver", # Altere para o driver do PostgreSQL
dbtable="output_table",
user="username",
password="password"
).mode("overwrite").save()
'''
## 4. OTIMIZAÇÃO PYSPARK
# Uso de Parquet: Persistir DataFrame na memória caso seja utilizado em múltiplas operações para evitar reprocessamentos desnecessários.
df_cached = df_transformed.cache()
# Particionamento: Dividir o DataFrame em partições para otimizar a leitura e escrita de dados.
df_transformed.write.partitionBy("year").parquet("s3://bucket-name/path/")
# Reparticionamento: Antes de operações como agregações e joins, o reparticionamento pode melhorar a performance.
df_repartitioned = df_transformed.repartition(10)
# Filtragem de dados antes de transformação: Evite carregar dados desnecessários.
# Aplique filtros e projeções (seleção de colunas) o mais cedo possível no pipeline.
# Broadcast Join: Se você está fazendo um join entre uma grande tabela e uma pequena tabela,
# você pode usar broadcast para a tabela pequena, o que pode melhorar a performance.
# Teste com DataFrames
df_large = spark.createDataFrame([(1, "A"), (2, "B")], ["id", "value"])
df_small = spark.createDataFrame([(1, "X")], ["id"])
# Join com broadcast
df_joined = df_large.join(broadcast(df_small), "id")
df_joined.show()
#Persistência em disco: Quando os dados são grandes e você não quer que a memória se esgote,
# use disk para persistir dados intermediários.'''
df_repartitioned.write.mode("overwrite").parquet("s3://bucket-name/path/")
# Registrando DataFrame como tabela temporária
df_csv.createOrReplaceTempView("df_table")
## 5. USO DE PYSPARK SQL
#A utilização de SQL em PySpark é poderosa e pode ser feita tanto através da API SQL do PySpark
#quanto através da execução de queries SQL diretamente no SparkSession.'''
# Registrando DataFrame como tabela temporária
df_csv.createOrReplaceTempView("df_table")
# Executando query SQL
df_sql_query = spark.sql("SELECT department, AVG(salary) FROM df_table GROUP BY department")