![]() Same DAG, and each has a defined data interval, which identifies the period ofĪs an example of why this is useful, consider writing a DAG that processes aĭaily set of experimental data. If schedule is not enough to express the DAG’s schedule, see Timetables.įor more information on logical date, see Data Interval andĮvery time you run a DAG, you are creating a new instance of that DAG whichĪirflow calls a DAG Run. In the process-employees DAG’s Grid view, we see all that all tasks ran successfully in all executed runs.For more information on schedule values, see DAG Run. You can trigger the process-employees DAG by unpausing it (via the slider on the left end) and running it (via the Run button under Actions). dags/process-employees.py) and (after a brief delay), the process-employees DAG will be included in the list of available DAGs on the web UI. Save this code to a python file in the /dags folder (e.g. commit () return 0 except Exception as e : return 1 > get_data () > merge_data () dag = ProcessEmployees () ![]() commit () def merge_data (): query = """ INSERT INTO employees SELECT * FROM ( SELECT DISTINCT * FROM employees_temp ) t ON CONFLICT ("Serial Number") DO UPDATE SET "Employee Markme" = excluded."Employee Markme", "Description" = excluded."Description", "Leave" = excluded."Leave" """ try : postgres_hook = PostgresHook ( postgres_conn_id = "tutorial_pg_conn" ) conn = postgres_hook. copy_expert ( "COPY employees_temp FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE ' \" '", file, ) conn. cursor () with open ( data_path, "r" ) as file : cur. text ) postgres_hook = PostgresHook ( postgres_conn_id = "tutorial_pg_conn" ) conn = postgres_hook. request ( "GET", url ) with open ( data_path, "w" ) as file : file. dirname ( data_path ), exist_ok = True ) url = "" response = requests. timedelta ( minutes = 60 ), ) def ProcessEmployees (): create_employees_table = PostgresOperator ( task_id = "create_employees_table", postgres_conn_id = "tutorial_pg_conn", sql = """ CREATE TABLE IF NOT EXISTS employees ( "Serial Number" NUMERIC PRIMARY KEY, "Company Name" TEXT, "Employee Markme" TEXT, "Description" TEXT, "Leave" INTEGER ) """, ) create_employees_temp_table = PostgresOperator ( task_id = "create_employees_temp_table", postgres_conn_id = "tutorial_pg_conn", sql = """ DROP TABLE IF EXISTS employees_temp CREATE TABLE employees_temp ( "Serial Number" NUMERIC PRIMARY KEY, "Company Name" TEXT, "Employee Markme" TEXT, "Description" TEXT, "Leave" INTEGER ) """, ) def get_data (): # NOTE: configure this as appropriate for your airflow environment data_path = "/opt/airflow/dags/files/employees.csv" os. datetime ( 2021, 1, 1, tz = "UTC" ), catchup = False, dagrun_timeout = datetime. Import datetime import pendulum import os import requests from corators import dag, task from .postgres import PostgresHook from .postgres import PostgresOperator ( dag_id = "process-employees", schedule_interval = "0 0 * * *", start_date = pendulum. Test your connection and if the test is successful, save your connection. Note the Connection Id value, which we’ll pass as a parameter for the postgres_conn_id kwarg. To create one via the web UI, from the “Admin” menu, select “Connections”, then click the Plus sign to “Add a new record” to the list of connections.įill in the fields as shown below. ![]() We will also need to create a connection to the postgres db. Curl -LfO '' # Make expected directories and set an expected environment variableĪfter all services have started up, the web UI will be available at: The default account has the username airflow and the password airflow.
0 Comments
Leave a Reply. |
AuthorWrite something about yourself. No need to be fancy, just an overview. ArchivesCategories |