Skip to content

rs_workflows/init_pi_db_flow.md

<< Back to index

Create the database used in performance indicator

create_schema(db_url)

Creates all database tables defined in the pi_db_models.

This task initializes the database schema for the Performance Indicator (PI) database using the provided SQLAlchemy engine.

Parameters:

Name Type Description Default
engine Engine

SQLAlchemy database engine connected to the target database.

required
Source code in docs/rs-client-libraries/rs_workflows/init_pi_db_flow.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@task
def create_schema(db_url: str):
    """
    Creates all database tables defined in the pi_db_models.

    This task initializes the database schema for the Performance Indicator (PI) database
    using the provided SQLAlchemy engine.

    Args:
        engine (sqlalchemy.engine.Engine): SQLAlchemy database engine connected to the target database.
    """
    logger = get_run_logger()
    logger.info(f"Received: {db_url}")
    engine = create_engine(db_url)
    logger.info("Call the create_all")
    Base.metadata.create_all(engine)

init_pi_database(env) async

Initializes the Performance Indicator (PI) database (named performance) schema and populates default categories.

This Prefect flow
  • Creates all required tables for the PI database.
  • Inserts default PI categories if none exist.

Parameters:

Name Type Description Default
env FlowEnvArgs

Prefect flow environment configuration, including runtime context variables.

required
Environment Variables Required

POSTGRES_USER (str): PostgreSQL username. POSTGRES_PASSWORD (str): PostgreSQL password. POSTGRES_HOST (str): PostgreSQL host address. POSTGRES_PORT (str): PostgreSQL port. POSTGRES_PI_DB (str): Name of the Performance Indicator database.

Flow Steps
  1. Initialize flow environment and tracing span.
  2. Build database connection URL.
  3. Create database schema via create_schema task.
  4. Insert default PI categories via insert_pi_categories task.
Source code in docs/rs-client-libraries/rs_workflows/init_pi_db_flow.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
@flow(name="PI db init")
async def init_pi_database(env: FlowEnvArgs):
    """
    Initializes the Performance Indicator (PI) database (named `performance`) schema and populates default categories.

    This Prefect flow:
      - Creates all required tables for the PI database.
      - Inserts default PI categories if none exist.

    Args:
        env (FlowEnvArgs): Prefect flow environment configuration, including runtime context variables.

    Environment Variables Required:
        POSTGRES_USER (str): PostgreSQL username.
        POSTGRES_PASSWORD (str): PostgreSQL password.
        POSTGRES_HOST (str): PostgreSQL host address.
        POSTGRES_PORT (str): PostgreSQL port.
        POSTGRES_PI_DB (str): Name of the Performance Indicator database.

    Flow Steps:
        1. Initialize flow environment and tracing span.
        2. Build database connection URL.
        3. Create database schema via `create_schema` task.
        4. Insert default PI categories via `insert_pi_categories` task.
    """
    logger = get_run_logger()

    # Init flow environment and opentelemetry span
    flow_env = FlowEnv(env)
    with flow_env.start_span(__name__, "init-pi-database"):

        logger.info("Starting the initialization of the tables for the performance indicator database...")

        db_url = (
            f"postgresql+psycopg2://{os.environ['POSTGRES_USER']}:"
            f"{os.environ['POSTGRES_PASSWORD']}@{os.environ['POSTGRES_HOST']}:"
            f"{os.environ['POSTGRES_PORT']}/{os.environ['POSTGRES_PI_DB']}"
        )
        # Prefect tasks attempt to serialize inputs (for caching, retries, mapping, etc.) and
        # also generate a cache key by hashing them. SQLAlchemy Engine contains locks
        # and connection pools (thread.RLock, weakref.ReferenceType, etc.), which are not serializable.
        # That's why instead of passing the engine object, pass only the db_url, a string, which is serializable.
        # Each task can then create its own engine locally.
        create_schema(db_url)  # type: ignore[unused-coroutine]
        insert_pi_categories(db_url)  # type: ignore[unused-coroutine]

        logger.info("The initialization of the tables for the performance indicator database finished")

insert_pi_categories(db_url)

Inserts default Performance Indicator (PI) categories into the database if none exist.

This task checks whether the pi_category table is empty and, if so, inserts predefined categories from PI_CATEGORY_DATA.

Parameters:

Name Type Description Default
engine Engine

SQLAlchemy database engine connected to the target database.

required
Notes
  • If categories already exist, no action is taken.
  • Commits the transaction only if new data is inserted.
Source code in docs/rs-client-libraries/rs_workflows/init_pi_db_flow.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
@task
def insert_pi_categories(db_url: str):
    """
    Inserts default Performance Indicator (PI) categories into the database if none exist.

    This task checks whether the `pi_category` table is empty and, if so, inserts predefined
    categories from `PI_CATEGORY_DATA`.

    Args:
        engine (sqlalchemy.engine.Engine): SQLAlchemy database engine connected to the target database.

    Notes:
        - If categories already exist, no action is taken.
        - Commits the transaction only if new data is inserted.
    """
    engine = create_engine(db_url)
    own_session_maker = sessionmaker(bind=engine)
    session = own_session_maker()
    try:
        if session.query(PiCategory).count() == 0:
            for mission, name, desc, max_delay in PI_CATEGORY_DATA:
                session.add(PiCategory(mission=mission, name=name, description=desc, max_delay_seconds=max_delay))
            session.commit()
    finally:
        session.close()