Pokud hledáte pro proudění Vzduchu úkol začít od datovém vstupu pak používáte to špatně. Pokud chcete spustit váš skript jako jednu jednotku můžete použít PythonOperator
nebo BashOperator
nicméně, pokud chcete prolomit kód do více úkolů, budete pravděpodobně muset udělat nějaké refactoring.
Vytvořit BigQuery
externí tabulky z csv
na GCS
můžete nastavit external_table
v GCSToBigQueryOperator
například:
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
load_csv = GCSToBigQueryOperator(
task_id='gcs_to_bigquery_example',
bucket='cloud-samples-data',
source_objects=['bigquery/us-states/us-states.csv'],
destination_project_dataset_table=f"{DATASET_NAME}.{TABLE_NAME}",
schema_fields=[
{'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'post_abbr', 'type': 'STRING', 'mode': 'NULLABLE'},
],
write_disposition='WRITE_TRUNCATE',
external_table=True,
)
Nevím, co je funkčnost datové části rámců ve vaší workflow (předpokládám, že je to nějaký druh transformace na csv
), pro které můžete použít GCSFileTransformOperator
(viz zdrojový kód). Tento operátor kopie dat ze zdroje VPP umístění do dočasného umístění v místním souborovém systému. Provozuje transformace na tento soubor, jak je uvedeno u
transformace skript a nahrávání výstup do místa určení kbelíku. Pokud výstup kbelík není uvedeno původní soubor bude přepsán.
Tak je možné vaše workflow může být:
- Soubor pozemků v GCS
- Spustit
GCSFileTransformOperator
zpracovat a vyčistit záznamy.
- Vytvoření tabulky v nástroji BigQuery s
GCSToBigQueryOperator