1. # coding: utf-8
    2. import os
    3. import time
    4. import json
    5. import subprocess
    6. import pandas as pd
    7. import streamlit as st
    8. st.set_page_config(
    9. page_title="Airflow Deployment Platform",
    10. page_icon="🚀",
    11. layout="wide",
    12. initial_sidebar_state="expanded",
    13. )
    14. st.markdown("""
    15. <style>
    16. .big-font {
    17. font-size:22px
    18. }
    19. </style>
    20. """, unsafe_allow_html=True)
    21. @st.cache(show_spinner=False, suppress_st_warning=True, ttl=120)
    22. def load_airflow_dags():
    23. dags_json = os.popen("airflow dags list -o json")
    24. data = dags_json.readlines()[1]
    25. return data
    26. with st.sidebar:
    27. st.title("📊 部署和更新Airflow任务")
    28. option = st.selectbox("任务方式", ('更新', '部署'))
    29. airflow_home = os.environ.get("AIRFLOW_HOME")
    30. if option == "更新":
    31. st.title("更新Airflow任务")
    32. st.markdown("[🚀访问Airflow ](http://airflow.com:9000/)")
    33. with st.spinner("🔎 后台查询任务中,请稍等..."):
    34. dags_json = os.popen("airflow dags list -o json")
    35. data = load_airflow_dags()
    36. df = pd.DataFrame.from_dict(json.loads(data))
    37. df = df.loc[df["owner"].isin(["link", "yumingmin", "mock"])].reset_index(drop=True)
    38. dag_id, owner, update_col = st.columns(3)
    39. dag_id.markdown("**DAG_ID**")
    40. owner.markdown("**OWNER**")
    41. update_col.markdown("**UPDATE**")
    42. for raw in df.values:
    43. project_path = os.path.join(airflow_home, "dags", raw[1].split("/")[0])
    44. dag_id.markdown('<p class="big-font">%s</p>' % raw[0], unsafe_allow_html=True)
    45. owner.markdown('<p class="big-font">%s</p>' % raw[2], unsafe_allow_html=True)
    46. if update_col.button("更新", key=raw[0]):
    47. update_cmd = subprocess.Popen(f"cd {project_path} && git pull", shell=True,
    48. stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    49. stdout, stderr = update_cmd.communicate()
    50. update_cmd.wait()
    51. succ_flag = 1 if 'up to date' in str(stdout) or 'done' in str(stdout) else 0
    52. if succ_flag == 1:
    53. with st.spinner("😁更新成功!!"):
    54. time.sleep(5)
    55. else:
    56. with st.spinner("😂更新失败!!"):
    57. time.sleep(5)
    58. if option == "部署":
    59. st.title("部署Airflow任务")
    60. st.markdown("[🚀访问Airflow ](http://airflow.com:9000/)")
    61. form = st.form(key="Deploy-Project")
    62. gitlab_url = form.text_input('输入 GitLab 项目地址: 例如 git@github.com:yumingmin/testisok.git')
    63. submit = form.form_submit_button("部署")
    64. if submit:
    65. if gitlab_url:
    66. with st.spinner("⏳ 后台部署中,请稍等..."):
    67. deploy_cmd = subprocess.Popen(f"cd {airflow_home}/dags && git clone {gitlab_url}",
    68. shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    69. stdout, stderr = deploy_cmd.communicate()
    70. deploy_cmd.wait()
    71. succ_flag = 1 if 'Cloning' in str(stderr) else 0
    72. st.text("out %s" % str(stderr))
    73. st.text("err %s" % str(stderr))
    74. if succ_flag == 1:
    75. with st.spinner("😁部署成功!!"):
    76. time.sleep(5)
    77. else:
    78. with st.spinner("😂部署失败!!"):
    79. time.sleep(5)
    80. else:
    81. with st.spinner("🚧 必须填写项目地址!!!"):
    82. time.sleep(5)

    终端执行:

    1. $ nohup streamlit run update_gitlab_projects_dashboard.py --server.port 9001 > streamlit_run.log &