-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Adds necessary files for elasticsearch interactions
- Loading branch information
frankcash
committed
Aug 31, 2021
1 parent
b5903db
commit 9406cda
Showing
3 changed files
with
63 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
from airflow import DAG | ||
|
||
from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchHook | ||
from airflow.operators.python_operator import PythonOperator | ||
from datetime import datetime, timedelta | ||
|
||
|
||
def my_custom_function(**kwargs): | ||
es = ElasticsearchHook(elasticsearch_conn_id='production-es') | ||
es_conn = es.get_conn() | ||
tables = es_conn.execute('SHOW TABLES') | ||
for table, *_ in tables: | ||
print(f"table: {table}") | ||
rows = es_conn.execute(f'SELECT COUNT(*) FROM {table}') | ||
print([row for row in rows]) | ||
|
||
|
||
|
||
return | ||
|
||
|
||
# Default settings applied to all tasks | ||
default_args = { | ||
'owner': 'airflow', | ||
'depends_on_past': False, | ||
'email_on_failure': False, | ||
'email_on_retry': False, | ||
'retries': 1, | ||
'retry_delay': timedelta(minutes=5) | ||
} | ||
|
||
# Using a DAG context manager, you don't have to specify the dag property of each task | ||
with DAG('elasticsearch_dag', | ||
start_date=datetime(2021, 8, 30), | ||
max_active_runs=1, | ||
schedule_interval=timedelta(days=1), # https://airflow.apache.org/docs/stable/scheduler.html#dag-runs | ||
default_args=default_args, | ||
catchup=False # enable if you don't want historical dag runs to run | ||
) as dag: | ||
|
||
|
||
tn = PythonOperator( | ||
task_id=f'python_print_date', | ||
python_callable=my_custom_function, | ||
op_kwargs={'task_number': 0}, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
apache-airflow==2.1.2 | ||
apache-airflow-providers-postgres==2.1.0 | ||
apache-airflow-providers-amazon==2.1.0 | ||
apache-airflow-providers-postgres==2.0.0 | ||
apache-airflow-providers-amazon==2.1.0 | ||
apache-airflow-providers-elasticsearch |