Airflow 架構
用一張圖片來解釋 Airflow 的架構,會有效地幫助了解接下來的工作究竟是在做什麼。
Airflow 元件
Airflow 的程式其實有三個主要元件:
- WebServer:Airflow 的用家可以使用的用戶介面(UI),用 Flask 平台製作,並以
gunicorn
作為伺服器平台。 - Scheduler:安排定期工作,並把工作交給 Executor 工作
- Executor:執行工作
數據庫
這三個元件都需要和數據庫(DB)進行溝通,數據庫主要儲存:
- 工作清單
- 工作執行狀態和結果
- 工作返回的資訊 (XCom)
- 和其他服務的連結資訊(Connection)
- Airflow 用 Variable
- Airflow 的用戶資料,授權
等等…
Airflow 的預設伺服器是 SQLite,但建議使用 PostgreSQL 或者 MySQL 來獲得更佳表現和穩定性。數據庫可以設在同機,或者是其他的電腦上,在這個教學,我們會在本機安裝 PostgreSQL。
DAG 檔案夾
Airflow 會讀取在 $AIRFLOW_HOME/dags
內的 DAG 設定檔,然後開始由 Scheduler 安排工作。在下面的工作中,我們會把 $AIRFLOW_HOME
設定為 ~/airflow
。
Email 伺服器
額外連結的 Email 伺服器,可以在工作重試,執行失敗或者運行過長的時候發出電郵通知。
其他虛擬機
在這些虛擬機上,我們會用 Airflow 透過 SSH 來執行指令,或者是執行 API,這部分我們會在下一篇文章來介紹。
前置要求 (Airflow 2.1.0)
- Python 3.6, 3.7, 3.8 版
先做一些 yum 的套件安裝和更新:
安裝虛擬環境
筆者非常建議安裝 virtualenv 虛擬環境包,虛擬環境可以安排一個獨立空間,以便安裝一套系統所需的所有套件,也方便用家備份套件。
下載及安裝 Airflow 套件
在前一篇我們說到,Airflow 支援不同的第三方插件,在接下來要寫的 DAG,我們會使用 SSH 插件來連結其他虛擬機並執行指令。
如果需要其他的插件,可以參考這一篇文件。假設我們額外需要用 SFTP 插件,則下載 apache-airflow[ssh, postgres, sftp]==2.1.0
即可,其他插件如此類推。
最後輸出的檔案 requirements.txt
會用離線檔案,而非在線檔案位置進行安裝,這樣可以確保這些檔案在未來不會變動。
接下來,只要在虛擬環境中執行 airflow webserver
, airflow.cfg
的設定檔也會在 ~/airflow
中生成,方便我們進行工作。不過出現的卻是錯誤訊息,這是因為 Airflow 還沒有連接數據庫,而接下來我們就會執行數據庫的設置。
題外話:離線電腦安裝套件
如果是安裝在離線電腦的話,單純的 pip download
會透過互聯網連線到 PyPI 抓取所需的套件。
因此,要成功在離線電腦安裝所需套件,就需要用可以連結互聯網的電腦下載套件,之後再傳輸到離線電腦上安裝即可。建議使用相同的 OS 類別、Python 版本、以及 CPU 架構來減少整理所需時間。
不過由於上列的 pip download
指令所抓下來的套件也未必完整,需要多花點時間等待系統出現警示,然後再在 PyPI 下載相對應的套件。
在筆者離線安裝(Airflow 插件為 SSH, SFTP, Postgres)時,有三個套件是需要手動下載的,分別是:
但手動下載的套件是根據 Airflow 所需插件有所變動,詳細可以參考安裝時的提示訊息。
設置數據庫
在這個教學我們會用 PostgreSQL 作為數據庫軟件,假設在該台電腦中已經安裝了之後,預設 Port 為 5432。那麼我們就要先設立數據庫:
sudo -u postgres psql
設立數據庫之後,我們要更改一些設定,設定檔案位於 ~/airflow/airflow.cfg
,在 Linux 內需使用 vim 或者 nano 來修改檔案:
安裝 PostgreSQL 之後,就可以使用 LocalExecutor ,可以讓 Airflow 同時執行數個工作。這個設定比起預設 SQLite 時強制使用 SequentialExecutor 效能更好。
除了 LocalExecutor 之外,Airflow 也可以更加使用其他的 Executor ,並配合更多的執行機器來更好的執行工作。但如果是行之有年的企業,通常定期工作已經放好在其他虛擬機上,而連結其他 VM 來執行指令的話 LocalExecutor 就暫時足夠了。
接下來我們只需要執行 airflow db init
指令之後,Airflow 就會在 airflow_db
數據庫裡面做好所需的設定。然後我們就開始設立第一個 Admin 帳戶吧!
雖然數據庫已經設置好了,但在 Scheduler 還沒有啟動之前,系統仍然不會讀取我們的 DAG。
不過在啟動之前,我們先把 Email 系統也設置好吧。
設置電郵系統連結
通常測試系統有否發出的電郵以及其內容,都會用類似 MailHog 等軟件來測試。但如果電腦可以連結到互聯網的話,乾脆用 Gmail 會比較簡單。現在我們再一次回到 airflow.cfg
來做更多的修改:
關於密碼,建議使用 App password 而非個人密碼,詳情可見這一篇文件。
啟動 Airflow 並匯入第一個 DAG!
現在輸入指令 airflow webserver -D
就可以在背景執行 Airflow 的 UI 頁面。到 localhost:8080
看看吧。
現在,輸入指令 airflow scheduler -D
就可以在背景執行 Airflow 的 Scheduler。
恭喜你,你的 Airflow 已經設立好了,那麼接下來我們來複製上一篇的代碼,然後匯入到我們的系統。
開啟一個位於 ~/airflow/dags
的 Python 檔案。我們把檔案命名為 ~/airflow/dags/example-dag.py
,個人建議檔案名和代碼內的 DAG 名字同名,方便搜尋。
這一篇我們認識了 Airflow 的架構,還有如何從零安裝 Airflow。我們在下一篇就正式編寫一個 DAG,使用 SSH 連結到其他系統,然後在遠端執行指令吧。