Airflow2.2.3 + Celery + MySQL 8构建一个健壮的分布式调度集群

Airflow2.2.3 + Celery + mysql 8构建一个健壮的分布式调度集群

作者: Marionxue 2022-01-05 19:34:18

数据库

MySQL

分布式 今天我们就再来看看如何通过Airflow和celery构建一个健壮的分布式调度集群。

创新互联是专业的绿园网站建设公司,绿园接单;提供成都网站制作、成都网站建设,网页设计,网站设计,建网站,PHP网站建设等专业做网站服务;采用PHP框架,可快速的进行绿园网站开发网页制作和功能扩展;专业做搜索引擎喜爱的网站,专业的做网站团队,希望更多企业前来合作!

前面聊了Airflow基础架构??,以及又讲了如何在容器化内部署Airflow??,今天我们就再来看看如何通过Airflow和celery构建一个健壮的分布式调度集群。

1集群环境

同样是在Ubuntu 20.04.3 LTS机器上安装Airflow集群,这次我们准备三台同等配置服务器,进行测试,前篇文章??[1]中,我们已经在Bigdata1服务器上安装了airflow的所有组件,没看过的可以点击链接先看下之前的文章,现在只需要在其他两个节点安装worker组件即可。

 Bigdata1(A)Bigdata2(B)Bigdata3(C)
Webserver  
Scheduler  
Worker

在上篇文章中的docker-compose.yml中没有对部署文件以及数据目录进行的分离,这样在后期管理的时候不太方便,因此我们可以把服务停止后,将数据库以及数据目录与部署文件分开

  • 部署文件:docker-compose.yaml/.env 存放在/apps/airflow目录下
  • MySQL以及配置文件: 放在/data/mysql
  • airflow数据目录: 放在/data/airflow

这样拆分开就方便后期的统一管理了。

2部署worker服务

前期准备

  
 
 
 
  1. mkdir /data/airflow/{dags,plugins} -pv 
  2. mkdir -pv /apps/airflow 
  3. mkdir -pv /logs/airflow 

worker的部署文件:

  
 
 
 
  1. --- 
  2. version: '3' 
  3. x-airflow-common: 
  4.   &airflow-common 
  5.   # In order to add custom dependencies or upgrade provider packages you can use your extended image. 
  6.   # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml 
  7.   # and uncomment the "build" line below, Then run `docker-compose build` to build the images. 
  8.   image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.2.3} 
  9.   # build: . 
  10.   environment: 
  11.     &airflow-common-env 
  12.     AIRFLOW__CORE__EXECUTOR: CeleryExecutor 
  13.     AIRFLOW__CORE__SQL_ALCHEMY_CONN: mysql+mysqldb://airflow:aaaa@$${MYSQL_HOST}:3306/airflow #修改MySQL对应的账号和密码 
  14.     AIRFLOW__CELERY__RESULT_BACKEND: db+mysql://airflow:aaaa@$${MYSQL_HOST}:3306/airflow #修改MySQL对应的账号和密码 
  15.     AIRFLOW__CELERY__BROKER_URL: redis://:xxxx@$${REDIS_HOST}:7480/0 #修改Redis的密码 
  16.     AIRFLOW__CORE__FERNET_KEY: '' 
  17.     AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true' 
  18.     AIRFLOW__CORE__LOAD_EXAMPLES: 'true' 
  19.     AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth' 
  20.     _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-} 
  21.   volumes: 
  22.     - /data/airflow/dags:/opt/airflow/dags 
  23.     - /logs/airflow:/opt/airflow/logs 
  24.     - /data/airflow/plugins:/opt/airflow/plugins 
  25.     - /data/airflow/airflow.cfg:/opt/airflow/airflow.cfg 
  26.   user: "${AIRFLOW_UID:-50000}:0" 
  27.  
  28. services: 
  29.   airflow-worker: 
  30.     <<: *airflow-common 
  31.     command: celery worker 
  32.     healthcheck: 
  33.       test: 
  34.         - "CMD-SHELL" 
  35.         - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"' 
  36.       interval: 10s 
  37.       timeout: 10s 
  38.       retries: 5 
  39.     environment: 
  40.       <<: *airflow-common-env 
  41.       # Required to handle warm shutdown of the celery workers properly 
  42.       # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation 
  43.       DUMB_INIT_SETSID: "0" 
  44.     restart: always 
  45.     hostname: bigdata-20-194 # 此处设置容器的主机名,便于在flower中查看是哪个worker 
  46.     depends_on: 
  47.       airflow-init: 
  48.         condition: service_completed_successfully 
  49.  
  50.   airflow-init: 
  51.     <<: *airflow-common 
  52.     entrypoint: /bin/bash 
  53.     # yamllint disable rule:line-length 
  54.     command: 
  55.       - -c 
  56.       - | 
  57.         function ver() { 
  58.           printf "%04d%04d%04d%04d" $${1//./ } 
  59.         } 
  60.         airflow_version=$$(gosu airflow airflow version) 
  61.         airflow_version_comparable=$$(ver $${airflow_version}) 
  62.         min_airflow_version=2.2.0 
  63.         min_airflow_version_comparable=$$(ver $${min_airflow_version}) 
  64.         if (( airflow_version_comparable < min_airflow_version_comparable )); then 
  65.           echo 
  66.           echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m" 
  67.           echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!" 
  68.           echo 
  69.           exit 1 
  70.         fi 
  71.         if [[ -z "${AIRFLOW_UID}" ]]; then 
  72.           echo 
  73.           echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m" 
  74.           echo "If you are on Linux, you SHOULD follow the instructions below to set " 
  75.           echo "AIRFLOW_UID environment variable, otherwise files will be owned by root." 
  76.           echo "For other operating systems you can get rid of the warning with manually created .env file:" 
  77.           echo "    See: https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#setting-the-right-airflow-user" 
  78.           echo 
  79.         fi 
  80.         one_meg=1048576 
  81.         mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg)) 
  82.         cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat) 
  83.         disk_available=$$(df / | tail -1 | awk '{print $$4}') 
  84.         warning_resources="false" 
  85.         if (( mem_available < 4000 )) ; then 
  86.           echo 
  87.           echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m" 
  88.           echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))" 
  89.           echo 
  90.           warning_resources="true" 
  91.         fi 
  92.         if (( cpus_available < 2 )); then 
  93.           echo 
  94.           echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m" 
  95.           echo "At least 2 CPUs recommended. You have $${cpus_available}" 
  96.           echo 
  97.           warning_resources="true" 
  98.         fi 
  99.         if (( disk_available < one_meg * 10 )); then 
  100.           echo 
  101.           echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m" 
  102.           echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))" 
  103.           echo 
  104.           warning_resources="true" 
  105.         fi 
  106.         if [[ $${warning_resources} == "true" ]]; then 
  107.           echo 
  108.           echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m" 
  109.           echo "Please follow the instructions to increase amount of resources available:" 
  110.           echo "   https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#before-you-begin" 
  111.           echo 
  112.         fi 
  113.         mkdir -p /sources/logs /sources/dags /sources/plugins 
  114.         chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins} 
  115.         exec /entrypoint airflow version 
  116.     # yamllint enable rule:line-length 
  117.     environment: 
  118.       <<: *airflow-common-env 
  119.       _AIRFLOW_DB_UPGRADE: 'true' 
  120.       _AIRFLOW_WWW_USER_CREATE: 'true' 
  121.       _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow} 
  122.       _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow} 
  123.     user: "0:0" 
  124.     volumes: 
  125.       - .:/sources 
  126.  
  127.   airflow-cli: 
  128.     <<: *airflow-common 
  129.     profiles: 
  130.       - debug 
  131.     environment: 
  132.       <<: *airflow-common-env 
  133.       CONNECTION_CHECK_MAX_COUNT: "0" 
  134.     # Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252 
  135.     command: 
  136.       - bash 
  137.       - -c 
  138.       - airflow 

初始化检测,检查环境是否满足:

  
 
 
 
  1. cd /apps/ariflow/ 
  2. echo -e "AIRFLOW_UID=$(id -u)" > .env # 注意,此处一定要保证AIRFLOW_UID是普通用户的UID,且保证此用户有创建这些持久化目录的权限 
  3. docker-compose up airflow-init 

如果数据库已经存在,初始化检测不影响已有的数据库,接下来就运行airflow-worker服务

  
 
 
 
  1. docker-compose up -d 

接下来,按照同样的方式在bigdata3节点上安装airflow-worker服务就可以了。部署完成之后,就可以通过flower查看broker的状态:

3持久化配置文件

大多情况下,使用airflow多worker节点的集群,我们就需要持久化airflow的配置文件,并且将airflow同步到所有的节点上,因此这里需要修改一下docker-compose.yaml中x-airflow-common的volumes,将airflow.cfg通过挂载卷的形式挂载到容器中,配置文件可以在容器中拷贝一份出来,然后在修改;

前期使用的时候,我们需要将docker-compose文件中的一些环境变量的值写入到airflow.cfg文件中,例如以下信息:

  
 
 
 
  1. [core] 
  2. dags_folder = /opt/airflow/dags 
  3. hostname_callable = socket.getfqdn 
  4. default_timezone = Asia/Shanghai # 修改时区 
  5. executor = CeleryExecutor 
  6. sql_alchemy_conn = mysql+mysqldb://airflow:aaaa@$${MYSQL_HOST}:3306/airflow 
  7. sql_engine_encoding = utf-8 
  8. sql_alchemy_pool_enabled = True 
  9. sql_alchemy_pool_size = 5 
  10. sql_alchemy_max_overflow = 10 
  11. sql_alchemy_pool_recycle = 1800 
  12. sql_alchemy_pool_pre_ping = True 
  13. sql_alchemy_schema = 
  14. parallelism = 32 
  15. max_active_tasks_per_dag = 16 
  16. dags_are_paused_at_creation = True 
  17. max_active_runs_per_dag = 16 
  18. load_examples = True 
  19. load_default_connections = True 
  20. plugins_folder = /opt/airflow/plugins 
  21. execute_tasks_new_python_interpreter = False 
  22. fernet_key = 
  23. donot_pickle = True 
  24. dagbag_import_timeout = 30.0 
  25. dagbag_import_error_tracebacks = True 
  26. dagbag_import_error_traceback_depth = 2 
  27. dag_file_processor_timeout = 50 
  28. task_runner = StandardTaskRunner 
  29. default_impersonation = 
  30. security = 
  31. unit_test_mode = False 
  32. enable_xcom_pickling = False 
  33. killed_task_cleanup_time = 60 
  34. dag_run_conf_overrides_params = True 
  35. dag_discovery_safe_mode = True 
  36. default_task_retries = 0 
  37. default_task_weight_rule = downstream 
  38. min_serialized_dag_update_interval = 30 
  39. min_serialized_dag_fetch_interval = 10 
  40. max_num_rendered_ti_fields_per_task = 30 
  41. check_slas = True 
  42. xcom_backend = airflow.models.xcom.BaseXCom 
  43. lazy_load_plugins = True 
  44. lazy_discover_providers = True 
  45. max_db_retries = 3 
  46. hide_sensitive_var_conn_fields = True 
  47. sensitive_var_conn_names = 
  48. default_pool_task_slot_count = 128 
  49. [logging] 
  50. base_log_folder = /opt/airflow/logs 
  51. remote_logging = False 
  52. remote_log_conn_id = 
  53. google_key_path = 
  54. remote_base_log_folder = 
  55. encrypt_s3_logs = False 
  56. logging_level = INFO 
  57. fab_logging_level = WARNING 
  58. logging_config_class = 
  59. colored_console_log = True 
  60. colored_log_format = [%%(blue)s%%(asctime)s%%(reset)s] {%%(blue)s%%(filename)s:%%(reset)s%%(lineno)d} %%(log_color)s%%(levelname)s%%(reset)s - %%(log_color)s%%(message)s%%(reset)s 
  61. colored_formatter_class = airflow.utils.log.colored_log.CustomTTYColoredFormatter 
  62. log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s 
  63. simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s 
  64. task_log_prefix_template = 
  65. log_filename_template = {{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log 
  66. log_processor_filename_template = {{ filename }}.log 
  67. dag_processor_manager_log_location = /opt/airflow/logs/dag_processor_manager/dag_processor_manager.log 
  68. task_log_reader = task 
  69. extra_logger_names = 
  70. worker_log_server_port = 8793 
  71. [metrics] 
  72. statsd_on = False 
  73. statsd_host = localhost 
  74. statsd_port = 8125 
  75. statsd_prefix = airflow 
  76. statsd_allow_list = 
  77. stat_name_handler = 
  78. statsd_datadog_enabled = False 
  79. statsd_datadog_tags = 
  80. [secrets] 
  81. backend = 
  82. backend_kwargs = 
  83. [cli] 
  84. api_client = airflow.api.client.local_client 
  85. endpoint_url = http://localhost:8080 
  86. [debug] 
  87. fail_fast = False 
  88. [api] 
  89. enable_experimental_api = False 
  90. auth_backend = airflow.api.auth.backend.deny_all 
  91. maximum_page_limit = 100 
  92. fallback_page_limit = 100 
  93. google_oauth2_audience = 
  94. google_key_path = 
  95. access_control_allow_headers = 
  96. access_control_allow_methods = 
  97. access_control_allow_origins = 
  98. [lineage] 
  99. backend = 
  100. [atlas] 
  101. sasl_enabled = False 
  102. host = 
  103. port = 21000 
  104. username = 
  105. password = 
  106. [operators] 
  107. default_owner = airflow 
  108. default_cpus = 1 
  109. default_ram = 512 
  110. default_disk = 512 
  111. default_gpus = 0 
  112. default_queue = default 
  113. allow_illegal_arguments = False 
  114. [hive] 
  115. default_hive_mapred_queue = 
  116. [webserver] 
  117. base_url = https://devopsman.cn/airflow #自定义airflow域名 
  118. default_ui_timezone = Asia/Shanghai # 设置默认的时区 
  119. web_server_host = 0.0.0.0 
  120. web_server_port = 8080 
  121. web_server_ssl_cert = 
  122. web_server_ssl_key = 
  123. web_server_master_timeout = 120 
  124. web_server_worker_timeout = 120 
  125. worker_refresh_batch_size = 1 
  126. worker_refresh_interval = 6000 
  127. reload_on_plugin_change = False 
  128. secret_key = emEfndkf3QWZ5zVLE1kVMg== 
  129. workers = 4 
  130. worker_class = sync 
  131. access_logfile = - 
  132. error_logfile = - 
  133. access_logformat = 
  134. expose_config = False 
  135. expose_hostname = True 
  136. expose_stacktrace = True 
  137. dag_default_view = tree 
  138. dag_orientation = LR 
  139. log_fetch_timeout_sec = 5 
  140. log_fetch_delay_sec = 2 
  141. log_auto_tailing_offset = 30 
  142. log_animation_speed = 1000 
  143. hide_paused_dags_by_default = False 
  144. page_size = 100 
  145. navbar_color = #fff 
  146. default_dag_run_display_number = 25 
  147. enable_proxy_fix = False 
  148. proxy_fix_x_for = 1 
  149. proxy_fix_x_proto = 1 
  150. proxy_fix_x_host = 1 
  151. proxy_fix_x_port = 1 
  152. proxy_fix_x_prefix = 1 
  153. cookie_secure = False 
  154. cookie_samesite = Lax 
  155. default_wrap = False 
  156. x_frame_enabled = True 
  157. show_recent_stats_for_completed_runs = True 
  158. update_fab_perms = True 
  159. session_lifetime_minutes = 43200 
  160. auto_refresh_interval = 3 
  161. [email] 
  162. email_backend = airflow.utils.email.send_email_smtp 
  163. email_conn_id = smtp_default 
  164. default_email_on_retry = True 
  165. default_email_on_failure = True 
  166. [smtp] # 邮箱配置 
  167. smtp_host = localhost 
  168. smtp_starttls = True 
  169. smtp_ssl = False 
  170. smtp_port = 25 
  171. smtp_mail_from = airflow@example.com 
  172. smtp_timeout = 30 
  173. smtp_retry_limit = 5 
  174. [sentry] 
  175. sentry_on = false 
  176. sentry_dsn = 
  177. [celery_kubernetes_executor] 
  178. kubernetes_queue = kubernetes 
  179. [celery] 
  180. celery_app_name = airflow.executors.celery_executor 
  181. worker_concurrency = 16 
  182. worker_umask = 0o077 
  183. broker_url = redis://:xxxx@$${REDIS_HOST}:7480/0 
  184. result_backend = db+mysql://airflow:aaaa@$${MYSQL_HOST}:3306/airflow 
  185. flower_host = 0.0.0.0 
  186. flower_url_prefix = 
  187. flower_port = 5555 
  188. flower_basic_auth = 
  189. sync_parallelism = 0 
  190. celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG 
  191. ssl_active = False 
  192. ssl_key = 
  193. ssl_cert = 
  194. ssl_cacert = 
  195. pool = prefork 
  196. operation_timeout = 1.0 
  197. task_track_started = True 
  198. task_adoption_timeout = 600 
  199. task_publish_max_retries = 3 
  200. worker_precheck = False 
  201. [celery_broker_transport_options] 
  202. [dask] 
  203. cluster_address = 127.0.0.1:8786 
  204. tls_ca = 
  205. tls_cert = 
  206. tls_key = 
  207. [scheduler] 
  208. job_heartbeat_sec = 5 
  209. scheduler_heartbeat_sec = 5 
  210. num_runs = -1 
  211. scheduler_idle_sleep_time = 1 
  212. min_file_process_interval = 30 
  213. dag_dir_list_interval = 300 
  214. print_stats_interval = 30 
  215. pool_metrics_interval = 5.0 
  216. scheduler_health_check_threshold = 30 
  217. orphaned_tasks_check_interval = 300.0 
  218. child_process_log_directory = /opt/airflow/logs/scheduler 
  219. scheduler_zombie_task_threshold = 300 
  220. catchup_by_default = True 
  221. max_tis_per_query = 512 
  222. use_row_level_locking = True 
  223. max_dagruns_to_create_per_loop = 10 
  224. max_dagruns_per_loop_to_schedule = 20 
  225. schedule_after_task_execution = True 
  226. parsing_processes = 2 
  227. file_parsing_sort_mode = modified_time 
  228. use_job_schedule = True 
  229. allow_trigger_in_future = False 
  230. dependency_detector = airflow.serialization.serialized_objects.DependencyDetector 
  231. trigger_timeout_check_interval = 15 
  232. [triggerer] 
  233. default_capacity = 1000 
  234. [kerberos] 
  235. ccache = /tmp/airflow_krb5_ccache 
  236. principal = airflow 
  237. reinit_frequency = 3600 
  238. kinit_path = kinit 
  239. keytab = airflow.keytab 
  240. forwardable = True 
  241. include_ip = True 
  242. [github_enterprise] 
  243. api_rev = v3 
  244. [elasticsearch] 
  245. host = 
  246. log_id_template = {dag_id}-{task_id}-{execution_date}-{try_number} 
  247. end_of_log_mark = end_of_log 
  248. frontend = 
  249. write_stdout = False 
  250. json_format = False 
  251. json_fields = asctime, filename, lineno, levelname, message 
  252. host_field = host 
  253. offset_field = offset 
  254. [elasticsearch_configs] 
  255. use_ssl = False 
  256. verify_certs = True 
  257. [kubernetes] 
  258. pod_template_file = 
  259. worker_container_repository = 
  260. worker_container_tag = 
  261. namespace = default 
  262. delete_worker_pods = True 
  263. delete_worker_pods_on_failure = False 
  264. worker_pods_creation_batch_size = 1 
  265. multi_namespace_mode = False 
  266. in_cluster = True 
  267. kube_client_request_args = 
  268. delete_option_kwargs = 
  269. enable_tcp_keepalive = True 
  270. tcp_keep_idle = 120 
  271. tcp_keep_intvl = 30 
  272. tcp_keep_cnt = 6 
  273. verify_ssl = True 
  274. worker_pods_pending_timeout = 300 
  275. worker_pods_pending_timeout_check_interval = 120 
  276. worker_pods_queued_check_interval = 60 
  277. worker_pods_pending_timeout_batch_size = 100 
  278. [smart_sensor] 
  279. use_smart_sensor = False 
  280. shard_code_upper_limit = 10000 
  281. shards = 5 
  282. sensors_enabled = NamedHivePartitionSensor 

修改完成之后,重启一下服务。

  
 
 
 
  1. docker-compose restart 

4数据同步

因为airflow使用了三个worker节点,每个节点修改配置,其他节点都要同步,同时DAGS目录以及plugins目录也需要实时进行同步,在scheduler将信息调度到某个节点后,如果找不到对应的DAGS文件,就会报错,因此我们使用lsyncd进行数据实时同步:

  
 
 
 
  1. apt-get install lsyncd -y 

配置节点之间通过公钥连接

  
 
 
 
  1. ssh-keygen -t rsa -C "airflow-sync" -b 4096 #生成一对名为airflow-sync的密钥 
  2. for ip in 100 200;do ssh-copy-id -i ~/.ssh/airflow-sync.pub ${USERNAME}@192.168.0.$ip -P12022;done  

然后我们就可以通过私钥访问了其它节点了。

编辑同步的配置文件,lsyncd配置的更多参数学习,可以直达官方文档[2]

  
 
 
 
  1. settings { 
  2.     logfile = "/var/log/lsyncd.log", # 日志文件 
  3.     statusFile = "/var/log/lsyncd.status", # 同步状态信息 
  4.     pidfile = "/var/run/lsyncd.pid", 
  5.     statusInterval = 1, 
  6.     nodaemon = false, # 守护进程 
  7.     inotifyMode  = "CloseWrite", 
  8.     maxProcesses = 1, 
  9.     maxDelays = 1, 
  10. sync { 
  11.     default.rsync, 
  12.     source = "/data/airflow", 
  13.     target = "192.168.0.100:/data/airflow", 
  14.  
  15.     rsync = { 
  16.        binary = "/usr/bin/rsync", 
  17.        compress = false, 
  18.        archive = true, 
  19.        owner = true, 
  20.        perms = true, 
  21.        --delete =  true, 
  22.        whole_file = false, 
  23.        rsh = "/usr/bin/ssh -p 12022 -l suoper -o StrictHostKeyChecking=no -i /home/username/.ssh/airflow-rsync" 
  24.     }, 
  25. sync { 
  26.     default.rsync, 
  27.     source = "/data/airflow", 
  28.     target = "192.168.0.200:/data/airflow", 
  29.  
  30.     rsync = { 
  31.        binary = "/usr/bin/rsync", 
  32.        compress = false, 
  33.        archive = true, 
  34.        owner = true, 
  35.        perms = true, 
  36.        --delete =  true, 
  37.        whole_file = false, 
  38.        rsh = "/usr/bin/ssh -p 12022 -l suoper -o StrictHostKeyChecking=no -i /home/username/.ssh/airflow-rsync" 
  39.     }, 

以上的参数是什么意思,可以访问官网查看,此处是通过rsync的rsh定义ssh命令,能够解决使用了私钥,自定义端口等安全措施的场景,当然你也可以使用配置无密访问,然后使用default.rsync或者default.rsyncssh等进行配置。

配置lsyncd的服务托管

  
 
 
 
  1. cat << EOF > /etc/systemd/system/lsyncd.service 
  2. [Unit] 
  3. Description=lsyncd 
  4. ConditionFileIsExecutable=/usr/bin/lsyncd 
  5.  
  6. After=network-online.target 
  7. Wants=network-online.target 
  8.  
  9. [Service] 
  10. StartLimitBurst=10 
  11. ExecStart=/usr/bin/lsyncd /etc/lsyncd.conf 
  12. Restart=on-failure 
  13. RestartSec=120 
  14. EnvironmentFile=-/etc/sysconfig/aliyun 
  15. KillMode=process 
  16. [Install] 
  17. WantedBy=multi-user.target 
  18. EOF 
  19.  
  20. systemctl daemon-reload 
  21. systemctl enable --now lsyncd.service #启动服务并配置开启自启 

这样就完成了数据(dags,plugins,airflow.cfg)的同步问题,后期使用CICD场景的时候,便可以直接将dag文件上传到Bigdata1节点上即可,其他两个节点就会自动同步了。如果出现问题,可以通过查看日志进行debug

  
 
 
 
  1. lsyncd -log all /etc/lsyncd.conf 
  2. tail -f /var/log/lsyncd.log 

5反向代理[3]

如果你需要将airflow放在反向代理之后,如https://lab.mycompany.com/myorg/airflow/你可以通过一下配置完成:

在airflow.cfg中配置base_url

  
 
 
 
  1. base_url = http://my_host/myorg/airflow 
  2. enable_proxy_fix = True 

nginx的配置

  
 
 
 
  1. server { 
  2.   listen 80; 
  3.   server_name lab.mycompany.com; 
  4.  
  5.   location /myorg/airflow/ { 
  6.       proxy_pass http://localhost:8080; 
  7.       proxy_set_header Host $http_host; 
  8.       proxy_redirect off; 
  9.       proxy_http_version 1.1; 
  10.       proxy_set_header Upgrade $http_upgrade; 
  11.       proxy_set_header Connection "upgrade"; 
  12.   } 

到这里就基本上完成的airflow分布式调度集群的安装了.看下具体效果如下。

看到这里说明你也正在使用或对Airflow感兴趣,顺便送你一个学习Airflow资料;

https://livebook.manning.com/book/data-pipelines-with-apache-airflow/chapter-12/1

参考资料

[1]Airflow 2.2.3 + MySQL8.0.27: https://mp.weixin.qq.com/s/VncpyXcTtlvnDkFrsAZ5lQ

[2]lsyncd config file: https://lsyncd.github.io/lsyncd/manual/config/file/

[3]airflow-behind-proxy: https://airflow.apache.org/docs/apache-airflow/stable/howto/run-behind-proxy.html

分享文章:Airflow2.2.3 + Celery + MySQL 8构建一个健壮的分布式调度集群
本文URL:http://www.mswzjz.com/qtweb/news46/178146.html

网站建设、网络推广公司-创新互联,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联