Jak předat datové části rámců do temp tabulky pro proudění vzduchu úkoly

0

Otázka

Takže mám kus kódu, který má více funkcí v pythonu pro činnosti, které jsme vyvinuli. Tato skupina trvá CSV a projde to přes data-frame. Tato data-rám je pak prošel přes několik funkcí, které aplikovat různé transformace na datovém.

Nicméně, při psaní tohoto kódu do vzduchu prostředí , vzhledem ke způsobu datové části rámců práce ve virtuálním prostředí a s daty běžet na více strojích, jsem zvyklý být schopni projít mé datové části rámců skrz jednotlivé funkce a bude muset ukládat je někde?

Ví někdo, jak nastavit temp tabulky v nástroji bigquery pro procházející datovém pro každý z mých funkcí, takže můžu běžet můj ETL na ně s pomocí mého proudění vzduchu úkoly?

airflow google-bigquery python
2021-11-21 14:39:06
1

Nejlepší odpověď

1

Pokud hledáte pro proudění Vzduchu úkol začít od datovém vstupu pak používáte to špatně. Pokud chcete spustit váš skript jako jednu jednotku můžete použít PythonOperator nebo BashOperator nicméně, pokud chcete prolomit kód do více úkolů, budete pravděpodobně muset udělat nějaké refactoring.

Vytvořit BigQuery externí tabulky z csv na GCS můžete nastavit external_table v GCSToBigQueryOperator například:

from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
load_csv = GCSToBigQueryOperator(
        task_id='gcs_to_bigquery_example',
        bucket='cloud-samples-data',
        source_objects=['bigquery/us-states/us-states.csv'],
        destination_project_dataset_table=f"{DATASET_NAME}.{TABLE_NAME}",
        schema_fields=[
            {'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
            {'name': 'post_abbr', 'type': 'STRING', 'mode': 'NULLABLE'},
        ],
        write_disposition='WRITE_TRUNCATE',
        external_table=True,
    )

Nevím, co je funkčnost datové části rámců ve vaší workflow (předpokládám, že je to nějaký druh transformace na csv), pro které můžete použít GCSFileTransformOperator (viz zdrojový kód). Tento operátor kopie dat ze zdroje VPP umístění do dočasného umístění v místním souborovém systému. Provozuje transformace na tento soubor, jak je uvedeno u transformace skript a nahrávání výstup do místa určení kbelíku. Pokud výstup kbelík není uvedeno původní soubor bude přepsán.

Tak je možné vaše workflow může být:

  1. Soubor pozemků v GCS
  2. Spustit GCSFileTransformOperator zpracovat a vyčistit záznamy.
  3. Vytvoření tabulky v nástroji BigQuery s GCSToBigQueryOperator
2021-11-21 15:14:56

Takže to dělá od té doby dělat, ale bylo by to nejlepší řešení, jak s csv transformace, s každou funkci, sloupce jsou stažena, názvy sloupců jsou změněny, sloupce jsou přidány a odstraněny atd. A připojením k datové rámce tabulce s parametry sloupců se změna by mohla vést k problému, při nahrávání na velké dotaz jako na schématu je vždy v pohybu.
Mizanur Choudhury

@MizanurChoudhury, Že závisí na vaší konkrétní ETL. Můžete také čisté, co můžete před dump raw csv GCS. Tohle je spíš otázka, jaké komponenty máte kontrolu nad, a kde můžete provést změny. To jsou všechny velké otázky, i když - To je design, a celý proces, který je mimo rozsah StackOverflow otázky.
Elad

V jiných jazycích

Tato stránka je v jiných jazycích

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................