Editor’s note: Kenten Danas is a speaker for ODSC AI West this October 28th-30th. Be sure to check out her talk, Orchestrating Workflows for GenAI Applications with Apache Airflow, there!
Productionizing GenAI applications often relies on multi-stage, complex pipelines that manage data ingestion, creation of vector embeddings, vector storage, querying, and response construction. Apache Airflow, a leading open-source orchestration framework, provides the structure and flexibility required to implement complex GenAI workflows with relatively basic Python code.
In this tutorial, you’ll learn how to use Airflow to orchestrate a basic RAG pipeline that includes embedding book descriptions with the OpenAI API, ingesting the embeddings into a PostgreSQL database with pgvector installed, and querying the database for books that match a user-provided mood. To learn about a more advanced implementation of this topic, join my workshop at ODSC West where I’ll cover:
- More on Airflow and orchestration basics
- Writing GenAI pipelines in Airflow, including how to use the Airflow AI SDK for interacting with LLMs
- Airflow features for running GenAI pipelines in production that optimize reliability and performance
In-person conference | October 28th-30th, 2025 | San Francisco, CA
Over 100 workshops and trainings will help you:
✅ Identify high-impact AI opportunities for your organization
✅ Build AI-ready teams, tools, and infrastructure
✅ Scale AI projects with governance and measurable ROI
✅ Learn directly from companies applying AI at scale
Why use Airflow for GenAI Applications?
The pipelines that help power GenAI applications need to run reliably, scale efficiently, and recover from failure without manual intervention. When using Airflow for orchestration, actions in your AI pipelines become aware of each other, and you have a central location to monitor, edit, and troubleshoot all your workflows, from the ETL/ELT pipelines that provide data for your models, to RAG, to prompt engineering, to fine tuning.
Airflow provides many benefits for GenAI workflows, including:
- Reliable task execution: APIs of AI models often come with transient errors and rate limits. Airflow can automatically retry individual tasks, send notifications in case of issues and handle timeouts.
- Tool agnosticism: Airflow can connect to any application in your data ecosystem that allows connections through an API. This means you are never locked into one model vendor and can quickly switch individual tasks to the latest and best model and AI tooling. Prebuilt provider packages exist to connect to many common data tools. You can find a comprehensive catalogue of provider packages in the Astronomer registry.
- High extensibility: Since Airflow pipelines are written in Python, you can build on top of the existing codebase and extend the functionality of Airflow to meet your needs. Almost anything you can do in Python, you can do in Airflow.Infin
- ite scalability: Given enough computing power, you can orchestrate as many processes as you need, no matter the complexity of your pipelines.
- Observability: The Airflow UI provides an immediate overview of all your data pipelines and can provide the source of truth for workflows in your whole data ecosystem.
Prerequisites
This tutorial requires the following tools:
- The Astro CLI. This is a free tool provided by Astronomer used to run Airflow locally. If you have a different way you are already running Airflow, you can use that as well.
- An OpenAI API key of at least tier 1 if you want to use OpenAI for vectorization. If you do not want to use OpenAI, you can adapt the create_embeddings function at the start of the DAG to use a different vectorizer.
This tutorial uses a local PostgreSQL database created as a Docker container. The image comes with pgvector preinstalled.
Step 1: Configure your Astro project
1. Create a new Astro project:
$ mkdir astro-pgvector-tutorial && cd astro-pgvector-tutorial $ astro dev init
2. Add the following two packages to your requirements.txt file to install the pgvector Airflow provider and the OpenAI Python client in your Astro project:
apache-airflow-providers-pgvector==1.0.0 openai==1.3.2
3. This tutorial uses a local PostgreSQL database running in a Docker container. To add a second PostgreSQL container to your Astro project, create a new file in your project’s root directory called docker-compose.override.yml and add the following. The ankane/pgvector image builds a PostgreSQL database with pgvector preinstalled.
services: postgres_pgvector: image: ankane/pgvector volumes: - ${PWD}/include/postgres:/var/lib/postgresql/data - ${PWD}/include:/include networks: - airflow ports: - 5433:5432 environment: - POSTGRES_USER=postgres - POSTGRES_PASSWORD=postgres # Airflow containers scheduler: networks: - airflow api-server: networks: - airflow triggerer: networks: - airflow postgres: networks: - airflow
4. To create an Airflow connection to the PostgreSQL database, add the following to your .env file. If you are using the OpenAI API for embeddings you will need to update the OPENAI_API_KEY environment variable.
AIRFLOW_CONN_POSTGRES_DEFAULT='{ "conn_type": "postgres", "login": "postgres", "password": "postgres", "host": "host.docker.internal", "port": 5433, "schema": "postgres" }' OPENAI_API_KEY=""
Step 2: Add your data
The dag in this tutorial runs a query on vectorized book descriptions from Goodreads, but you can adjust the dag to use any data you want.
1. Create a new file called book_data.txt in the include directory.
2. Copy the book description from the book_data.txt file in Astronomer’s GitHub for a list of great books.
Note: If you want to add your own books make sure the data is in the following format:
integer> ::: ( of publication>) ::: :::
One book corresponds to one line in the file.
Step 3: Create your DAG
1. In your dags folder, create a file called query_book_vectors.py.
2. Copy the following code into the file. If you want to use a vectorizer other than OpenAI, make sure to adjust both the create_embeddings function at the start of the DAG and provide the correct MODEL_VECTOR_LENGTH.
""" ## Vectorize book descriptions with OpenAI and store them in Postgres with pgvector This DAG shows how to use the OpenAI API 1.0+ to vectorize book descriptions and store them in Postgres with the pgvector extension. It will also help you pick your next book to read based on a mood you describe. You will need to set the following environment variables: - `AIRFLOW_CONN_POSTGRES_DEFAULT`: an Airflow connection to your Postgres database that has pgvector installed - `OPENAI_API_KEY`: your OpenAI API key """ from airflow.sdk import dag, task from airflow.models.baseoperator import chain from airflow.models.param import Param from airflow.providers.pgvector.operators.pgvector import PgVectorIngestOperator from airflow.providers.postgres.operators.postgres import PostgresOperator from airflow.exceptions import AirflowSkipException from pendulum import datetime from openai import OpenAI import uuid import re import os POSTGRES_CONN_ID = "postgres_default" TEXT_FILE_PATH = "include/book_data.txt" TABLE_NAME = "Book" OPENAI_MODEL = "text-embedding-ada-002" MODEL_VECTOR_LENGTH = 1536 def create_embeddings(text: str, model: str): """Create embeddings for a text with the OpenAI API.""" client = OpenAI(api_key=os.environ["OPENAI_API_KEY"]) response = client.embeddings.create(input=text, model=model) embeddings = response.data[0].embedding return embeddings @dag( start_date=datetime(2025, 8, 1), schedule=None, tags=["pgvector"], params={ "book_mood": Param( "A philosophical book about consciousness.", type="string", description="Describe the kind of book you want to read.", ), }, ) def query_book_vectors(): enable_vector_extension_if_not_exists = PostgresOperator( task_id="enable_vector_extension_if_not_exists", postgres_conn_id=POSTGRES_CONN_ID, sql="CREATE EXTENSION IF NOT EXISTS vector;", ) create_table_if_not_exists = PostgresOperator( task_id="create_table_if_not_exists", postgres_conn_id=POSTGRES_CONN_ID, sql=f""" CREATE TABLE IF NOT EXISTS {TABLE_NAME} ( book_id UUID PRIMARY KEY, title TEXT, year INTEGER, author TEXT, description TEXT, vector VECTOR(%(vector_length)s) ); """, parameters={"vector_length": MODEL_VECTOR_LENGTH}, ) get_already_imported_book_ids = PostgresOperator( task_id="get_already_imported_book_ids", postgres_conn_id=POSTGRES_CONN_ID, sql=f""" SELECT book_id FROM {TABLE_NAME}; """, ) @task def import_book_data(text_file_path: str, table_name: str) -> list: "Read the text file and create a list of dicts from the book information." with open(text_file_path, "r") as f: lines = f.readlines() num_skipped_lines = 0 list_of_params = [] for line in lines: parts = line.split(":::") title_year = parts[1].strip() match = re.match(r"(.+)", title_year) try: title, year = match.groups() year = int(year) # skip malformed lines except: num_skipped_lines += 1 continue author = parts[2].strip() description = parts[3].strip() list_of_params.append( { "book_id": str( uuid.uuid5( name=" ".join([title, str(year), author, description]), namespace=uuid.NAMESPACE_DNS, ) ), "title": title, "year": year, "author": author, "description": description, } ) print( f"Created a list with {len(list_of_params)} elements " " while skipping {num_skipped_lines} lines." ) return list_of_params @task def create_embeddings_book_data( book_data: dict, model: str, already_imported_books: list ) -> dict: "Create embeddings for a book description and add them to the book data." already_imported_books_ids = [x[0] for x in already_imported_books] if book_data["book_id"] in already_imported_books_ids: raise AirflowSkipException("Book already imported.") embeddings = create_embeddings(text=book_data["description"], model=model) book_data["vector"] = embeddings return book_data @task def create_embeddings_query(model: str, **context) -> list: "Create embeddings for the user provided book mood." query = context["params"]["book_mood"] embeddings = create_embeddings(text=query, model=model) return embeddings book_data = import_book_data(text_file_path=TEXT_FILE_PATH, table_name=TABLE_NAME) book_embeddings = create_embeddings_book_data.partial( model=OPENAI_MODEL, already_imported_books=get_already_imported_book_ids.output, ).expand(book_data=book_data) query_vector = create_embeddings_query(model=OPENAI_MODEL) import_embeddings_to_pgvector = PgVectorIngestOperator.partial( task_id="import_embeddings_to_pgvector", trigger_rule="none_failed", conn_id=POSTGRES_CONN_ID, sql=( f"INSERT INTO {TABLE_NAME} " "(book_id, title, year, author, description, vector) " "VALUES (%(book_id)s, %(title)s, %(year)s, " "%(author)s, %(description)s, %(vector)s) " "ON CONFLICT (book_id) DO NOTHING;" ), ).expand(parameters=book_embeddings) get_a_book_suggestion = PostgresOperator( task_id="get_a_book_suggestion", postgres_conn_id=POSTGRES_CONN_ID, trigger_rule="none_failed", sql=f""" SELECT title, year, author, description FROM {TABLE_NAME} ORDER BY vector <-> CAST(%(query_vector)s AS VECTOR) LIMIT 1; """, parameters={"query_vector": query_vector}, ) @task def print_suggestion(query_result, **context): "Print the book suggestion." query = context["params"]["book_mood"] book_title = query_result[0][0] book_year = query_result[0][1] book_author = query_result[0][2] book_description = query_result[0][3] print(f"Book suggestion for '{query}':") print( f"You should read {book_title} by {book_author}, published in {book_year}!" ) print(f"Goodreads describes the book as: {book_description}") chain( enable_vector_extension_if_not_exists, create_table_if_not_exists, get_already_imported_book_ids, import_embeddings_to_pgvector, get_a_book_suggestion, print_suggestion(query_result=get_a_book_suggestion.output), ) chain(query_vector, get_a_book_suggestion) chain(get_already_imported_book_ids, book_embeddings) query_book_vectors()
This DAG consists of nine tasks to make a simple RAG orchestration pipeline.
– The enable_vector_extension_if_not_exists task uses a PostgresOperator to enable the pgvector extension in the PostgreSQL database.
– The create_table_if_not_exists task creates the Book table in PostgreSQL. Note the VECTOR() datatype used for the vector column. This datatype is added to PostgreSQL by the pgvector extension and needs to be defined with the vector length of the vectorizer you use as an argument. This example uses the OpenAI’s text-embedding-ada-002 to create 1536-dimensional vectors, so we define the columns with the type VECTOR(1536) using parameterized SQL.
– The get_already_imported_book_ids task queries the Book table to return all book_id values of books that were already stored with their vectors in previous DAG runs.
– The import_book_data task uses the @task decorator to read the book data from the book_data.txt file and return it as a list of dictionaries with keys corresponding to the columns of the Book table.
– The create_embeddings_book_data task is dynamically mapped over the list of dictionaries returned by the import_book_data task to parallelize vector embedding of all book descriptions that have not been added to the Book table in previous DAG runs. The create_embeddings function defines how the embeddings are computed and can be modified to use other embedding models. If all books in the list have already been added to the Book table, then all mapped task instances are skipped.
– The create_embeddings_query task applies the same create_embeddings function to the desired book mood the user provided via Airflow params.
– The import_embeddings_to_pgvector task uses the PgVectorIngestOperator to insert the book data including the embedding vectors into the PostgreSQL database. This task is dynamically mapped to import the embeddings from one book at a time. The dynamically mapped task instances of books that have already been imported in previous DAG runs are skipped.
– The get_a_book_suggestion task queries the PostgreSQL database for the book that is most similar to the user-provided mood using nearest neighbor search. Note how the vector of the user-provided book mood (query_vector) is cast to the VECTOR datatype before similarity search: ORDER BY vector <-> CAST(%(query_vector)s AS VECTOR).
– The print_book_suggestion task prints the book suggestion to the task logs.
Your dag should look like this in the Airflow UI:
Step 4: Run your DAG
1. Run astro dev start in your Astro project to start Airflow and open the Airflow UI at localhost:8080.
2. In the Airflow UI, run the query_book_vectors DAG by clicking the Trigger button. Then, provide the Airflow param for the desired book_mood.
3. View your book suggestion in the task logs of the print_book_suggestion task:
[2025-08-27, 09:45:54] INFO - Book suggestion for 'A philosophical book about consciousness.':: chan="stdout": source="task" [2025-08-27, 09:45:54] INFO - You should read The Idea of the World by Bernardo Kastrup, published in 2019!: chan="stdout": source="task" [2025-08-27, 09:45:54] INFO - Goodreads describes the book as: A rigorous case for the primacy of mind in nature, from philosophy to neuroscience, psychology and physics. The Idea of the World offers a grounded alternative to the frenzy of unrestrained abstractions and unexamined assumptions in philosophy and science today. [...]
Conclusion
Congratulations, in this tutorial you used Apache Airflow to orchestrate a simple RAG pipeline! Airflow can be used for any GenAI workflow orchestration, including batch inference jobs, multi-agent workflows, and more. To learn more about the tools and patterns needed to deploy and maintain AI systems at scale using Airflow, join my workshop at ODSC West; I hope to see you there!
In-person conference | October 28th-30th, 2025 | San Francisco, CA
Over 100 workshops and trainings will help you:
✅ Identify high-impact AI opportunities for your organization
✅ Build AI-ready teams, tools, and infrastructure
✅ Scale AI projects with governance and measurable ROI
✅ Learn directly from companies applying AI at scale
Author bio:
Kenten leads the Developer Relations team at Astronomer and has a background in field engineering, data engineering, and consulting. She has first-hand experience adopting and running Airflow as a consultant, and is passionate about helping other data engineers scale and get the most out of their Airflow experience. When she isn’t working with data she’s typically outside trail running or skiing.
LinkedIn: https://www.linkedin.com/in/kentendanas/
Astronomer website: https://www.astronomer.io/
For more info visit at Times Of Tech