Օդային հոսք. Ինչպե՞ս օգտագործել հազարավոր խնդիրներ մեկ կրակոցով պլանավորելու համար

Այս գրառման մեջ ես կքննարկեմ, թե ինչպես կարող ենք հազարավոր առաջադրանքներ պլանավորել մեկ դագաղի ներսում: Ես չեմ պատրաստվում կենտրոնանալ այն մասին, թե որն է Airflow- ը և ինչպես կարող եք այն տեղադրել, բայց դրա փոխարեն ես կքննարկեմ այն ​​մասին, թե ինչպես կարող ենք մեծ քանակությամբ առաջադրանքներ պլանավորել մեկ դագաղի ներսում:

Ըստ էության, Airflow- ը նախատեսված է բազմաթիվ DAG- ներ ունենալու համար, և այդ DAG- ի ներսում կարող են լինել հարյուր կամ հազար առաջադրանք: Ի՞նչ է պատահում, երբ մենք ուզում ենք պլանավորել մեծ թվով առաջադրանքներ, ասենք, ավելի քան 60000 կամ ավելին: Սա ես բացատրել եմ այս բլոգում:

Ես աշխատում եմ Airflow- ի հետ `աշխատանքս ավտոմատացնելու համար: Բայց իմ ընկերությունում մենք ունենք իսկապես մեծ քանակությամբ տվյալներ, և ես փորձեցի ՝ օգտագործելով Airflow- ի տարբեր վարկածներ, և իսկապես հսկայական տվյալների պատճառով ես գրեթե 70000 առաջադրանք ունեմ մեկ DAG- ի ներսում: Ես փորձել եմ Airflow- ի տարբեր տարբերակներ, և վերջին տարբերակը կարող է 5000 առաջադրանք նշանակել, բայց եթե մենք ուզում ենք ավելի շատ ժամանակացույց անել, քան որ ժամանակացույցը մնում է ընթացիկ վիճակում, առանց առաջադրանքների պլանավորման: Ես գտա յուրաքանչյուր խնդիր և ստուգեցի, թե ինչպես լուծել այն, որն է իրական պատճառը և, վերջապես, այս բլոգը գրելիս:

Սա Օդային հոսքի օգտագործման դեպքերից մեկն է, երբ DAG- ի ներսում հազարավոր խնդիրներ ունեք: Դրանից սկսելու համար մենք պետք է օգտագործենք Airflow 1.10.3 տարբերակը, դրանից հետո չկենտրոնանալ մեծ թվով խնդիրների վրա, ուստի մենք պետք է օգտագործենք Airflow տարբերակը 1.10.3: Այս տարբերակը տեղադրելու համար հետևեք հետևյալ քայլերին.

  • Նախ պետք է ստեղծել նոր միջավայր և ակտիվացնել այդ միջավայրը ՝ օգտագործելով հետևյալ հրամանը.
կոնդա ստեղծել-օդային հոսք_3
կոնդա ակտիվացնել օդափոխումը_3
  • 1.10.3-ի հատուկ տարբերակով օդային հոսք տեղադրելու համար օգտագործեք հետևյալ հրամանը.
conda տեղադրում -c conda-forge օդափոխություն == 1.10.3
  • Պետք է համոզվեք որոշ հատուկ պահանջների մասին, որ այս տարբերակը չի գործում flask≥1.0.9- ով, այնպես որ, եթե այս տարբերակից ավելի մեծ փաթիլ ունեք, օգտագործեք հետևյալ հրամանը.
pip տեղադրել flask == 1.0.4
խողովակների տեղադրման գործառույթներ == 1.0.0 (սա ևս մեկ պահանջ է, որը պետք է տեղադրվի)
  • Եվ առաջարկվում է օգտագործել նեխուրի գործադիրը, երբ մենք աշխատում ենք այս շատ մեծ քանակությամբ խնդիրների հետ, քանի որ մենք պետք է զուգահեռ անցկացնենք այդ խնդիրները, և դա հնարավոր է հասնել `օգտագործելով նեխուրի գործադիր: Նեխուր օգտագործելու համար տեղադրեք հետևյալ հրամանը.
pip տեղադրել նեխուր
  • Դուք պետք է աշխատողներ օգտագործեք և բրոքեր սահմանեք նեխուրի գործադիր օգտագործելու համար, ես RabbitMQ- ն օգտագործում եմ որպես բրոքեր: Բրոքերային URL տեղադրելու համար կարող եք օգտագործել հետևյալ կառուցվածքը.
broker_url = amqp: // «օգտվողի անուն»: «գաղտնաբառ» @ «host_name». «port» /

օրինակ

broker_url = amqp: // հյուր: հյուր @ localhost: 5672 /
  • Celery- ի Կատարողի UI- ի օգտագործման համար մենք կարող ենք օգտագործել Flower- ը, այդ օգտագործումը տեղադրելու համար հետևյալ հրամանը.
կոնդա տեղադրում -c կոնդա-դարբնոցային ծաղիկ
  • Դա անելուց հետո մենք պետք է փոխենք որոշ կազմաձևեր, զուգահեռ անցկացնելու համար հազարավոր առաջադրանքներ և մեկ կրակոցով պլանավորելու հազարավոր առաջադրանքներ:
[հիմնական]
կատարող = CeleryExector զուգահեռություն = 200000 non_pooled_task_slot_count = 100000 dag_concurrency = 100000 max_active_runs_per_dag = 2
[ժամանակացույցը]
max_thread = 10 (Կարող եք օգտագործել թեմաները ըստ ձեր ծրագրի ՝ ավելացնելով կամ նվազեցնելով)

Սրանք հիմնական պարամետրերն են, երբ ուզում ես մեկ կրակոցով պլանավորել հազարավոր առաջադրանքներ: Դուք պետք է այն ճշգրտեք համաձայն այն բանի, թե որքան առավելագույն DAG- ներ եք ուզում զուգահեռ գործարկել և քանի՞ խնդիր ունեք մեկ DAG- ի ներսում:

Հիմնական պարամետրը «Non_pooled_task_slot_count» է, որը հանվել է Airflow 1.10.4 տարբերակից, այնպես որ ես օգտագործում եմ 1.10.3, քանի որ այս պարամետրը շատ կարևոր դեր է խաղում առաջադրանքների պլանավորման մեջ:

«Non_pooled_task_slot_count» - ը հանելուց հիմնական տարբերությունն այն է, որ այն օգտագործում է default_pool, որն ըստ լռելյայն սահմանվում է 128-ի վրա (կարող է այն մեծացնել ըստ պահանջի): «Non_pooled_task_slot_count» - ի հիմնական աշխատանքը առաջադրանքների պլանավորումն է, և այն միացված չէ default_pool- ին կամ տվյալների բազայից որևէ այլ կապի համար, այնպես որ մենք կարող ենք այդ թիվը ավելացնել այնքանով, որքան ուզում եք, բայց եթե «default_pool» - ում ավելացնեք բլոկների քանակը: այնուհետև այն միացված է նաև ձեր ունեցած տվյալների բազայի կապերին, և զուգահեռ վարվելիս չեք կարող ունենալ 100000 տվյալների բազա: Ըստ էության, «Non_pooled_task_slot_count» - ը հանվեց հօգուտ «default_pool» - ի:

Այս գրառումը պարունակում է այն հարցի պատասխանը, թե ինչու ժամանակացույցը դառնում է խճճված, այն խրված է, այն չի նախատեսում մեծ թվով առաջադրանքներ, կամ այն ​​վարում է ամբողջ օրվա ընթացքում ՝ առանց որևէ բան անելու: Այս բոլոր պատասխանը ունի մեկ պատասխան `օգտագործելու Airflow 1.10.3 տարբերակը:

Երբ դուք օգտագործում եք Airflow 1.10.3 -ը, մենք պետք է նշենք, թե որ լողավազան պետք է օգտագործվի DAG- ի կողմից, քանի որ այն լռելյայնորեն չի օգտագործում "default_pool", այնպես որ առաջադրանքներ ստեղծելիս մենք պետք է անցնենք para mater pool = 'defautl_pool': Կարող եք ստեղծել 'default_pool' ՝ օգտագործելով UI (Admin -> լողավազաններ) կամ կարող է իրականացվել հրամանի տողով.

օդափոխման լողավազան `default_pool 128« լռելյայն լողավազան »:

Ահա DAG նմուշի օրինակը.

ներմուծել os ժամանակավոր ներմուծումից datetime, timedelta ներմուծել օդային հոսք օդային հոսքից DAG- ից airflow.operators.dummy_operator ներմուծումից DummyOperator
default_args = {'սեփականատեր': 'Airflow', 'կախված_on_past': Կեղծ, 'start_date': airflow.utils.dates.days_ago (2), 'փորձարկում': 1, 'retry_delay': timedelta (րոպե = 1),}
dag = DAG ('dummy_try1', default_args = default_args, chart_interval = Ոչ մեկը)
i- ի միջակայքում (50000). առաջադրանքներ = DummyOperator (task_id = '{}'. ձևաչափ (i), dag = dag, pool = 'default_pool)

Կարող եք ստուգել տարբերությունը բոլոր տարբերակների միջև ստորև նշված հղումով.

  • https://github.com/apache/airflow/blob/master/UPDATING.md#airflow-1104